diff --git a/flake.lock b/flake.lock index 45da053e..001f1feb 100644 --- a/flake.lock +++ b/flake.lock @@ -133,6 +133,21 @@ "type": "github" } }, + "flake-utils_7": { + "locked": { + "lastModified": 1644229661, + "narHash": "sha256-1YdnJAsNy69bpcjuoKdOYQX0YxZBiCYZo4Twxerqv7k=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "3cecb5b042f7f209c56ffd8371b2711a290ec797", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, "haskell-flake-utils": { "inputs": { "flake-utils": "flake-utils" @@ -229,6 +244,24 @@ "inputs": { "flake-utils": "flake-utils_6" }, + "locked": { + "lastModified": 1698938553, + "narHash": "sha256-oXpTKXioqFbl2mhhvpJIAvgNd+wYyv4ekI+YnJHEJ6s=", + "owner": "ivanovs-4", + "repo": "haskell-flake-utils", + "rev": "19b273b5dc401a0a565e7f75cf50a593871b80c9", + "type": "github" + }, + "original": { + "owner": "ivanovs-4", + "repo": "haskell-flake-utils", + "type": "github" + } + }, + "haskell-flake-utils_7": { + "inputs": { + "flake-utils": "flake-utils_7" + }, "locked": { "lastModified": 1672412555, "narHash": "sha256-Kaa8F7nQFR3KuS6Y9WRUxeJeZlp6CCubyrRfmiEsW4k=", @@ -264,6 +297,27 @@ "type": "github" } }, + "lsm": { + "inputs": { + "haskell-flake-utils": "haskell-flake-utils_6", + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1711033804, + "narHash": "sha256-z9cb5yuWfuZmGukxsZebXhc6KUZoPVT60oXxQ6j6ML8=", + "ref": "refs/heads/master", + "rev": "0e8286a43da5b9e54c4f3ecdb994173fe77351db", + "revCount": 26, + "type": "git", + "url": "https://git.hbs2.net/5BCaH95cWsVKBmWaDNLWQr2umxzzT5kqRRKNTm2J15Ls" + }, + "original": { + "type": "git", + "url": "https://git.hbs2.net/5BCaH95cWsVKBmWaDNLWQr2umxzzT5kqRRKNTm2J15Ls" + } + }, "nixpkgs": { "locked": { "lastModified": 1707451808, @@ -286,6 +340,7 @@ "fixme": "fixme", "haskell-flake-utils": "haskell-flake-utils_4", "hspup": "hspup", + "lsm": "lsm", "nixpkgs": "nixpkgs", "saltine": "saltine", "suckless-conf": "suckless-conf_2" @@ -332,7 +387,7 @@ }, "suckless-conf_2": { "inputs": { - "haskell-flake-utils": "haskell-flake-utils_6", + "haskell-flake-utils": "haskell-flake-utils_7", "nixpkgs": [ "nixpkgs" ] diff --git a/flake.nix b/flake.nix index 8373d009..51294f2f 100644 --- a/flake.nix +++ b/flake.nix @@ -18,6 +18,9 @@ inputs = { db-pipe.url = "git+https://git.hbs2.net/5xrwbTzzweS9yeJQnrrUY9gQJfhJf84pbyHhF2MMmSft"; db-pipe.inputs.nixpkgs.follows = "nixpkgs"; + lsm.url = "git+https://git.hbs2.net/5BCaH95cWsVKBmWaDNLWQr2umxzzT5kqRRKNTm2J15Ls"; + lsm.inputs.nixpkgs.follows = "nixpkgs"; + saltine = { url = "github:tel/saltine/3d3a54cf46f78b71b4b55653482fb6f4cee6b77d"; flake = false; diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 1493aeec..b56ef460 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -105,6 +105,7 @@ library , HBS2.Net.Messaging.UDP , HBS2.Net.Messaging.TCP , HBS2.Net.Messaging.Unix + , HBS2.Net.Messaging.Pipe , HBS2.Net.Messaging.Stream , HBS2.Net.Messaging.Encrypted.RandomPrefix , HBS2.Net.Messaging.Encrypted.ByPass @@ -196,6 +197,7 @@ library , time , transformers , uniplate + , unix , unordered-containers , unliftio , unliftio-core diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs new file mode 100644 index 00000000..4a132315 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE NumericUnderscores #-} +module HBS2.Net.Messaging.Pipe where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Types +import HBS2.Actors.Peer.Types +import HBS2.Net.Messaging + +import Control.Concurrent.STM qualified as STM +import Control.Monad.Reader +import Data.ByteString.Builder qualified as B +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.Hashable +import Network.ByteOrder hiding (ByteString) +import System.IO.Unsafe (unsafePerformIO) +import System.Posix.IO +import UnliftIO + +-- define new transport protocol type +data PIPE = PIPE + deriving (Eq,Ord,Show,Generic) + + +-- address for the new protocol +newtype PipeAddr = PipeAddr Handle + deriving newtype (Eq,Show) + +-- the protocol work data +data MessagingPipe = + MessagingPipe + { pipeIn :: Handle + , pipeOut :: Handle + , inQ :: TQueue ByteString + } + +remotePeer :: MessagingPipe -> Peer PIPE +remotePeer = PeerPIPE . PipeAddr . pipeOut + +localPeer :: MessagingPipe -> Peer PIPE +localPeer = PeerPIPE . PipeAddr . pipeIn + +newMessagingPipe :: MonadIO m => (Handle, Handle) -> m MessagingPipe +newMessagingPipe (pIn,pOut) = do + MessagingPipe pIn pOut + <$> newTQueueIO + +instance Hashable PipeAddr where + hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd) + where + fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word) + +instance HasPeer PIPE where + newtype instance Peer PIPE = PeerPIPE { _fromPeerPipe :: PipeAddr } + deriving stock (Eq,Show,Generic) + deriving newtype (Hashable) + +instance Pretty (Peer PIPE) where + pretty (PeerPIPE p) = parens ("pipe" <+> viaShow p) + +-- Messaging definition for protocol +instance Messaging MessagingPipe PIPE ByteString where + + sendTo bus _ _ msg = liftIO do + LBS.hPutStr (pipeOut bus) (B.toLazyByteString frame <> msg) + hFlush (pipeOut bus) + + where + frame = B.word32BE (fromIntegral $ LBS.length msg) + + receive bus _ = do + msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q + for msg $ \m -> pure (From (PeerPIPE (PipeAddr who)), m) + + where + q = inQ bus + who = pipeIn bus + +runMessagingPipe :: MonadIO m => MessagingPipe -> m () +runMessagingPipe bus = liftIO do + fix \next -> do + frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict + piece <- LBS.hGet who (fromIntegral frame) + atomically (writeTQueue (inQ bus) piece) + next + + where + who = pipeIn bus + +instance (MonadIO m, Messaging MessagingPipe PIPE (Encoded PIPE)) + => HasFabriq PIPE (ReaderT MessagingPipe m) where + getFabriq = asks Fabriq + +instance MonadIO m => HasOwnPeer PIPE (ReaderT MessagingPipe m) where + ownPeer = asks localPeer + + diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index 08d30384..08e5d423 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -4,6 +4,7 @@ module HBS2.Net.Messaging.Unix ( module HBS2.Net.Messaging.Unix , module HBS2.Net.Messaging , module HBS2.Net.Proto.Types + , SocketClosedException ) where import HBS2.Prelude.Plated @@ -220,12 +221,23 @@ runMessagingUnix env = do atomically $ writeTVar seen now next + + clientLoop m = fix \next -> do + m + if not (MUDontRetry `elem` msgUnixOpts env) then do + debug "LOOP!" + next + else do + debug "LOOP EXIT" + handleClient | MUDontRetry `elem` msgUnixOpts env = \_ w -> handleAny throwStopped w - | otherwise = handleAny + | otherwise = handleAny throwStopped _ = throwIO UnixMessagingStopped - runClient = liftIO $ forever $ handleClient logAndRetry $ flip runContT pure $ do + runClient = liftIO $ clientLoop $ handleClient logAndRetry $ flip runContT pure $ do + + debug "HERE WE GO AGAIN!" let sa = SockAddrUnix (msgUnixSockPath env) let p = msgUnixSockPath env @@ -335,6 +347,7 @@ runMessagingUnix env = do pause (msgUnixRetryTime env) + logAndRetry :: SomeException -> IO () logAndRetry e = do warn $ "MessagingUnix. runClient failed, probably server is gone. Retrying:" <+> pretty (msgUnixSelf env) diff --git a/hbs2-fixer/app/Main.hs b/hbs2-fixer/app/Main.hs index f246e39e..81635cb3 100644 --- a/hbs2-fixer/app/Main.hs +++ b/hbs2-fixer/app/Main.hs @@ -56,6 +56,8 @@ import System.Exit qualified as Exit import Data.Cache qualified as Cache import Data.Cache (Cache) +import System.Exit + {- HLINT ignore "Functor law" -} @@ -158,67 +160,80 @@ withApp cfgPath action = do setLogging @WARN warnPrefix setLogging @NOTICE noticePrefix - soname <- detectRPC - `orDie` "can't detect RPC" + fix \next -> do - flip runContT pure do + flip runContT pure do - client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) - >>= orThrowUser ("can't connect to" <+> pretty soname) + soname' <- lift detectRPC - void $ ContT $ withAsync $ runMessagingUnix client + soname <- ContT $ maybe1 soname' (pure ()) - peerAPI <- makeServiceCaller @PeerAPI (fromString soname) - refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) - storageAPI <- makeServiceCaller @StorageAPI (fromString soname) - lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) + client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) + >>= orThrowUser ("can't connect to" <+> pretty soname) - let endpoints = [ Endpoint @UNIX peerAPI - , Endpoint @UNIX refLogAPI - , Endpoint @UNIX lwwAPI - , Endpoint @UNIX storageAPI - ] + mess <- ContT $ withAsync $ runMessagingUnix client - void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + peerAPI <- makeServiceCaller @PeerAPI (fromString soname) + refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) + storageAPI <- makeServiceCaller @StorageAPI (fromString soname) + lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) - let o = [MUWatchdog 20, MUDontRetry] - clientN <- newMessagingUnixOpts o False 1.0 soname + let endpoints = [ Endpoint @UNIX peerAPI + , Endpoint @UNIX refLogAPI + , Endpoint @UNIX lwwAPI + , Endpoint @UNIX storageAPI + ] - void $ ContT $ withAsync $ runMessagingUnix clientN + mn <- ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client - sink <- newNotifySink + let o = [MUWatchdog 20,MUDontRetry] + clientN <- newMessagingUnixOpts o False 1.0 soname - void $ ContT $ withAsync $ flip runReaderT clientN $ do - debug $ red "notify restarted!" - runNotifyWorkerClient sink + notif <- ContT $ withAsync (runMessagingUnix clientN) - void $ ContT $ withAsync $ flip runReaderT clientN $ do - runProto @UNIX - [ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink) - ] - env <- FixerEnv Nothing - lwwAPI - refLogAPI - sink - peerAPI - (AnyStorage (StorageClient storageAPI)) - <$> newTVarIO mempty - <*> newTVarIO 30 - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO 0 - <*> newTVarIO mempty - <*> newTQueueIO + sink <- newNotifySink - lift $ runReaderT (runFixerM $ withConfig cfgPath action) env - `finally` do - setLoggingOff @DEBUG - setLoggingOff @INFO - setLoggingOff @ERROR - setLoggingOff @WARN - setLoggingOff @NOTICE + void $ ContT $ withAsync $ flip runReaderT clientN $ do + debug $ red "notify restarted!" + runNotifyWorkerClient sink + + p1 <- ContT $ withAsync $ flip runReaderT clientN $ do + runProto @UNIX + [ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink) + ] + + env <- FixerEnv Nothing + lwwAPI + refLogAPI + sink + peerAPI + (AnyStorage (StorageClient storageAPI)) + <$> newTVarIO mempty + <*> newTVarIO 30 + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO 0 + <*> newTVarIO mempty + <*> newTQueueIO + + void $ ContT $ bracket (pure ()) $ \_ -> do + readTVarIO (_listeners env) <&> HM.elems >>= mapM_ cancel + + p3 <- ContT $ withAsync $ runReaderT (runFixerM $ withConfig cfgPath action) env + + void $ waitAnyCatchCancel [mess,mn,notif,p1,p3] + + debug $ red "respawning..." + pause @'Seconds 5 + next + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE where errorPrefix = toStdout . logPrefix "[error] " @@ -232,16 +247,18 @@ data ConfWatch = | ConfUpdate [Syntax C] mainLoop :: FixerM IO () -mainLoop = forever $ do +mainLoop = do debug "hbs2-fixer. do stuff since 2024" conf <- getConf -- debug $ line <> vcat (fmap pretty conf) flip runContT pure do + debug $ red "Reloading..." + lift $ updateFromConfig conf - void $ ContT $ withAsync $ do + p1 <- ContT $ withAsync $ do cfg <- asks _configFile `orDie` "config file not specified" flip fix ConfRead $ \next -> \case @@ -271,7 +288,7 @@ mainLoop = forever $ do next ConfRead -- poll reflogs - void $ ContT $ withAsync do + p2 <- ContT $ withAsync do let w = asks _watchers >>= readTVarIO @@ -292,15 +309,20 @@ mainLoop = forever $ do pure () - jobs <- asks _pipeline - void $ ContT $ withAsync $ forever do - liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs) - >>= \case - Left e -> err (viaShow e) - _ -> pure () + p3 <- ContT $ withAsync $ fix \next -> do + r <- liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs) + case r of + Left e -> do + err (viaShow e) + let ee = fromException @AsyncCancelled e - forever $ pause @'Seconds 60 + unless (isJust ee) do + next + + _ -> next + + void $ waitAnyCatchCancel [p1,p2,p3] oneSec :: MonadUnliftIO m => m b -> m (Either () b) oneSec = race (pause @'Seconds 1) diff --git a/hbs2-git/hbs2-git.cabal b/hbs2-git/hbs2-git.cabal index e309a75c..fd93d4d3 100644 --- a/hbs2-git/hbs2-git.cabal +++ b/hbs2-git/hbs2-git.cabal @@ -168,3 +168,5 @@ executable git-remote-hbs2 default-language: GHC2021 + + diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index eb19d8e9..05f4adb2 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -945,7 +945,7 @@ logMergeProcess penv env q = withPeerM penv do hd <- MaybeT $ lift $ getHead menv headRef let quo = view refChanHeadQuorum hd & fromIntegral - guard $ checkACL hd (Just pk) ak + guard $ checkACL ACLUpdate hd (Just pk) ak pure [(href, (quo,mempty))] Accept _ box -> do diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanNotify.hs b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanNotify.hs index e43145c7..97a00541 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanNotify.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanNotify.hs @@ -79,7 +79,7 @@ refChanNotifyProto self adapter msg@(Notify rchan box) = do let refchanKey = RefChanHeadKey @s rchan headBlock <- MaybeT $ getActualRefChanHead @e refchanKey - guard $ checkACL headBlock Nothing authorKey + guard $ checkACL ACLNotify headBlock Nothing authorKey -- FIXME: garbage-collection-required liftIO $ putBlock sto (serialise msg) diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs index 2773c2b9..8caab382 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs @@ -295,7 +295,7 @@ refChanUpdateProto self pc adapter msg = do let pips = view refChanHeadPeers headBlock - guard $ checkACL headBlock (Just peerKey) authorKey + guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) @@ -453,7 +453,7 @@ refChanUpdateProto self pc adapter msg = do (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox -- может, и не надо второй раз проверять - guard $ checkACL headBlock (Just peerKey) authorKey + guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/Types.hs b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/Types.hs index 1397ef06..d254946e 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/Types.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/Types.hs @@ -46,6 +46,9 @@ type RefChanAuthor e = PubKey 'Sign (Encryption e) type Weight = Integer +data ACLType = ACLUpdate | ACLNotify + deriving stock (Eq,Ord,Generic,Data,Show) + data RefChanHeadBlock e = RefChanHeadBlockSmall { _refChanHeadVersion :: Integer @@ -63,6 +66,16 @@ data RefChanHeadBlock e = , _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e)) , _refChanHeadExt :: ByteString } + | RefChanHeadBlock2 + { _refChanHeadVersion :: Integer + , _refChanHeadQuorum :: Integer + , _refChanHeadWaitAccept :: Integer + , _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight + , _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e)) + , _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e)) + , _refChanHeadNotifiers' :: HashSet (PubKey 'Sign (Encryption e)) + , _refChanHeadExt :: ByteString + } deriving stock (Generic) makeLenses ''RefChanHeadBlock @@ -126,7 +139,25 @@ refChanHeadReaders = lens g s where g (RefChanHeadBlockSmall{}) = mempty g (RefChanHeadBlock1{..}) = _refChanHeadReaders' + g (RefChanHeadBlock2{..}) = _refChanHeadReaders' s v@(RefChanHeadBlock1{}) x = v { _refChanHeadReaders' = x } + s v@(RefChanHeadBlock2{}) x = v { _refChanHeadReaders' = x } + s x _ = x + + +refChanHeadNotifiers :: ForRefChans e + => Lens (RefChanHeadBlock e) + (RefChanHeadBlock e) + (HashSet (PubKey 'Sign (Encryption e))) + (HashSet (PubKey 'Sign (Encryption e))) + +refChanHeadNotifiers = lens g s + where + g (RefChanHeadBlockSmall{}) = mempty + g (RefChanHeadBlock1{}) = mempty + g (RefChanHeadBlock2{..}) = _refChanHeadNotifiers' + + s v@(RefChanHeadBlock2{}) x = v { _refChanHeadNotifiers' = x } s x _ = x instance ForRefChans e => Serialise (RefChanHeadBlock e) @@ -197,21 +228,16 @@ instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where + -- NOTE: we-dont-support-old-head-formats-anymore fromStringMay str = - case readers of - [] -> RefChanHeadBlockSmall <$> version - <*> quorum - <*> wait - <*> pure (HashMap.fromList peers) - <*> pure (HashSet.fromList authors) - - rs -> RefChanHeadBlock1 <$> version - <*> quorum - <*> wait - <*> pure (HashMap.fromList peers) - <*> pure (HashSet.fromList authors) - <*> pure (HashSet.fromList rs) - <*> pure mempty + RefChanHeadBlock2 <$> version + <*> quorum + <*> wait + <*> pure (HashMap.fromList peers) + <*> pure (HashSet.fromList authors) + <*> pure (HashSet.fromList readers) + <*> pure (HashSet.fromList notifiers) + <*> pure mempty where parsed = parseTop str & fromRight mempty @@ -231,6 +257,11 @@ instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where | (ListVal [SymbolVal "reader", LitStrVal s] ) <- parsed ] + + notifiers = catMaybes [ fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack s) + | (ListVal [SymbolVal "notifier", LitStrVal s] ) <- parsed + ] + instance (ForRefChans e , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , Pretty (AsBase58 (PubKey 'Encrypt (Encryption e))) @@ -241,16 +272,22 @@ instance (ForRefChans e <> parens ("wait" <+> pretty (view refChanHeadWaitAccept blk)) <> line <> - vcat (fmap peer (HashMap.toList $ view refChanHeadPeers blk)) <> line + lstOf peer (HashMap.toList $ view refChanHeadPeers blk) <> - vcat (fmap author (HashSet.toList $ view refChanHeadAuthors blk)) <> line + lstOf author (HashSet.toList $ view refChanHeadAuthors blk) <> - vcat (fmap reader (HashSet.toList $ view refChanHeadReaders blk)) <> line + lstOf reader (HashSet.toList $ view refChanHeadReaders blk) + <> + lstOf notifier (HashSet.toList $ view refChanHeadNotifiers blk) where peer (p,w) = parens ("peer" <+> dquotes (pretty (AsBase58 p)) <+> pretty w) author p = parens ("author" <+> dquotes (pretty (AsBase58 p))) reader p = parens ("reader" <+> dquotes (pretty (AsBase58 p))) + notifier p = parens ("notifier" <+> dquotes (pretty (AsBase58 p))) + + lstOf f e | null e = mempty + | otherwise = vcat (fmap f e) <> line -- блок головы может быть довольно большой. @@ -335,15 +372,19 @@ getRefChanHead sto k = runMaybeT do checkACL :: forall e s . (Encryption e ~ s, ForRefChans e) - => RefChanHeadBlock e + => ACLType + -> RefChanHeadBlock e -> Maybe (PubKey 'Sign s) -> PubKey 'Sign s -> Bool -checkACL theHead mbPeerKey authorKey = match +checkACL acl theHead mbPeerKey authorKey = match where pips = view refChanHeadPeers theHead aus = view refChanHeadAuthors theHead + notifiers = view refChanHeadNotifiers theHead match = maybe True (`HashMap.member` pips) mbPeerKey - && authorKey `HashSet.member` aus + && ( authorKey `HashSet.member` aus + || acl == ACLNotify && authorKey `HashSet.member` notifiers + ) diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 424e10a5..6c6c7722 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -947,3 +947,51 @@ executable test-playground , resourcet , text-icu >= 0.8.0.3 + +executable test-pipe-mess + import: shared-properties + default-language: Haskell2010 + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestPipeMessaging.hs + build-depends: + base, hbs2-core + , async + , bytestring + , cache + , containers + , directory + , hashable + , microlens-platform + , mtl + , network-byte-order + , prettyprinter + , QuickCheck + , quickcheck-instances + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-quickcheck + , tasty-hunit + , tasty-quickcheck + , transformers + , uniplate + , vector + , saltine + , simple-logger + , string-conversions + , filepath + , temporary + , unliftio + , unordered-containers + , unix + , timeit + + + diff --git a/hbs2-tests/test/PrototypeGenericService.hs b/hbs2-tests/test/PrototypeGenericService.hs index ea8e62c0..74f1e20a 100644 --- a/hbs2-tests/test/PrototypeGenericService.hs +++ b/hbs2-tests/test/PrototypeGenericService.hs @@ -60,9 +60,9 @@ type instance Output Method2 = () instance MonadIO m => HandleMethod m Method2 where handleMethod _ = pure () -instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m) - => HasDeferred UNIX (ServiceProto api UNIX) m where - deferred _ m = void (async m) +-- instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m) +-- => HasDeferred UNIX (ServiceProto api UNIX) m where +-- deferred m = void (async m) main :: IO () main = do diff --git a/hbs2-tests/test/TestPipeMessaging.hs b/hbs2-tests/test/TestPipeMessaging.hs new file mode 100644 index 00000000..88a0f04a --- /dev/null +++ b/hbs2-tests/test/TestPipeMessaging.hs @@ -0,0 +1,114 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE NumericUnderscores #-} +module Main where + +import HBS2.Prelude.Plated + +import HBS2.Net.Messaging +import HBS2.Net.Messaging.Pipe +import HBS2.Net.Proto.Service +import HBS2.Actors.Peer + +import HBS2.System.Logger.Simple.ANSI + +import Data.ByteString.Lazy (ByteString) +import System.Posix.IO +import UnliftIO +import Control.Monad.Trans.Cont +import Control.Monad.Reader +import Codec.Serialise +import Data.Fixed + +import System.TimeIt + +-- protocol's data +data Ping = + Ping Int + | Pong Int + deriving stock (Eq,Show,Generic) + +instance Pretty Ping where + pretty = viaShow + +instance Serialise Ping + +-- API definition +type MyServiceMethods1 = '[ Ping ] + +-- API endpoint definition +type instance Input Ping = Ping +type instance Output Ping = Maybe Ping + +-- API handler +instance MonadIO m => HandleMethod m Ping where + handleMethod = \case + Ping n -> pure (Just (Pong n)) + Pong _ -> pure Nothing + +-- Codec for protocol +instance HasProtocol PIPE (ServiceProto MyServiceMethods1 PIPE) where + type instance ProtocolId (ServiceProto MyServiceMethods1 PIPE) = 0xDEADF00D1 + type instance Encoded PIPE = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +-- Some "deferred" implementation for our monad +-- note -- plain asyncs may cause to resource leak +instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE)) + => HasDeferred (ServiceProto api PIPE) PIPE m where + deferred m = void (async m) + +mainLoop :: IO () +mainLoop = do + + flip runContT pure do + + -- pipe for server + (i1,o1) <- liftIO $ createPipe + >>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o + + -- pipe for client + (i2,o2) <- liftIO $ createPipe + >>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o + + -- interwire client and server by pipes + server <- newMessagingPipe (i2,o1) + client <- newMessagingPipe (i1,o2) + + -- run messaging workers + void $ ContT $ withAsync $ runMessagingPipe server + void $ ContT $ withAsync $ runMessagingPipe client + + -- make server protocol responder + void $ ContT $ withAsync $ flip runReaderT server do + runProto @PIPE + [ makeResponse (makeServer @MyServiceMethods1) + ] + + -- make client's "caller" + caller <- lift $ makeServiceCaller @MyServiceMethods1 @PIPE (localPeer client) + + -- make client's endpoint worker + void $ ContT $ withAsync $ runReaderT (runServiceClient caller) client + + let n = 20_000 + + (a, _) <- timeItT do + for_ [1..n] $ \i -> do + void $ callService @Ping caller (Ping i) + + debug $ "sent" <+> pretty n <+> "messages in" <+> pretty (realToFrac a :: Fixed E3) <> "sec" + <> line + <> "rps:" <+> pretty (realToFrac n / realToFrac a :: Fixed E2) + +main :: IO () +main = do + + setLogging @DEBUG defLog + mainLoop + `finally` do + setLoggingOff @DEBUG + + diff --git a/hbs2-tests/test/playground/Main.hs b/hbs2-tests/test/playground/Main.hs index 284e3f91..1c1525d7 100644 --- a/hbs2-tests/test/playground/Main.hs +++ b/hbs2-tests/test/playground/Main.hs @@ -10,6 +10,10 @@ import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Codec.Serialise import Lens.Micro.Platform +import Control.Monad.Trans.Cont + +import Control.Monad +import UnliftIO -- желаемое поведение: добавить в новую версию A какое-нибудь поле так, -- что бы предыдущие записи продолжали десериализоваться без этого поля, @@ -65,6 +69,29 @@ test w = case w of A -> "Match A" +runWithAsync :: IO () +runWithAsync = do + + hSetBuffering stdout LineBuffering + + flip runContT pure do + + t1 <- ContT $ withAsync do + forever do + print "PIU" + pause @'Seconds 1 + + q <- ContT $ withAsync do + pause @'Seconds 10 + print "FUCKIG QUIT" + + pysh <- ContT $ withAsync $ forever do + pause @'Seconds 2 + print "PYSHPYSH" + + void $ waitAnyCatchCancel [t1,q,pysh] + + main :: IO () main = do print "1"