From cc0ad4e24aae0b6eee757f438bd7f7e56af3fb93 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 10 Oct 2024 07:17:04 +0300 Subject: [PATCH] wip, maibox, send --- flake.nix | 5 +- hbs2-peer/app/CLI/Mailbox.hs | 71 ++++++++++++++++--- hbs2-peer/app/MailboxProtoWorker.hs | 48 ++++++++++--- hbs2-peer/app/PeerMain.hs | 5 +- hbs2-peer/app/RPC2/Mailbox.hs | 8 +++ hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs | 36 ++++++++-- hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs | 6 ++ hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs | 1 + 8 files changed, 154 insertions(+), 26 deletions(-) diff --git a/flake.nix b/flake.nix index 1441bb2d..3b028350 100644 --- a/flake.nix +++ b/flake.nix @@ -135,7 +135,10 @@ outputs = { self, nixpkgs, flake-utils, ... }@inputs: devShells.default = pkgs.haskellPackages.shellFor { - packages = _: []; + packages = _: + pkgs.lib.attrVals packageNames pkgs.haskellPackages ++ + pkgs.lib.attrVals miscellaneous pkgs.haskellPackages; + # withHoogle = true; buildInputs = ( with pkgs.haskellPackages; [ ghc diff --git a/hbs2-peer/app/CLI/Mailbox.hs b/hbs2-peer/app/CLI/Mailbox.hs index e907c9da..f0b8a8be 100644 --- a/hbs2-peer/app/CLI/Mailbox.hs +++ b/hbs2-peer/app/CLI/Mailbox.hs @@ -3,32 +3,36 @@ module CLI.Mailbox (pMailBox) where import HBS2.Prelude.Plated +import HBS2.Hash import HBS2.OrDie import HBS2.Data.Types.Refs import HBS2.Net.Proto.Service import HBS2.Net.Auth.Credentials +import HBS2.Storage import HBS2.Data.Types.SignedBox import HBS2.Peer.Proto.Mailbox import HBS2.Peer.Proto.Mailbox.Types import HBS2.Peer.RPC.API.Mailbox +import HBS2.Peer.RPC.API.Storage +import HBS2.Peer.RPC.Client.StorageClient import HBS2.KeyMan.Keys.Direct import CLI.Common import RPC2() import PeerLogger hiding (info) -import Data.Config.Suckless.Script - -import System.Exit -import System.Environment (lookupEnv) - +import Codec.Serialise import Control.Monad.Trans.Cont -import Options.Applicative +import Data.ByteString.Lazy qualified as LBS +import Data.Coerce +import Data.Config.Suckless.Script import Data.Maybe - import Data.Word import Lens.Micro.Platform +import Options.Applicative +import System.Environment (lookupEnv) +import System.Exit import UnliftIO import Text.InterpolatedString.Perl6 (qc) @@ -54,7 +58,7 @@ runMailboxCLI rpc s = do let t = TimeoutSec 1 - let dict api = makeDict @C do + let dict sto api = makeDict @C do entry $ bindMatch "hey" $ nil_ $ const do who <- liftIO (lookupEnv "USER") <&> fromMaybe "stranger" liftIO $ print $ "hey," <+> pretty who @@ -121,6 +125,53 @@ see hbs2-cli for sigil commands (create, store, load, etc) _ -> throwIO $ BadFormException @C nil + brief "send message via gossip" $ + desc [qc| +; reads message blob from stdin + +send --stdin + +; read message blob from file + +send --file FILE + +; reads message blob from storage + +send HASH + +you may create a message from plain text using + +hbs2-cli hbs2:mailbox:message:create + +command + +SEE ALSO + hbs2:mailbox:message:create + + |] + $ entry $ bindMatch "send" $ nil_ $ \syn -> do + + blob <- case syn of + [ StringLike "--stdin" ] -> do + liftIO (LBS.hGetContents stdin) + + [ StringLike "--file", StringLike fn ] -> do + liftIO (LBS.readFile fn) + + [ HashLike h ] -> do + liftIO (getBlock sto (coerce h)) + >>= orThrowUser "message not found" + + _ -> throwIO $ BadFormException @C nil + + mess <- deserialiseOrFail @(Message HBS2Basic) blob + & either (const $ error "malformed message") pure + + _ <- callRpcWaitMay @RpcMailboxSend t api mess + >>= orThrowUser "rpc call timeout" + + pure () + entry $ bindMatch "help" $ nil_ \case HelpEntryBound what -> helpEntry what [StringLike s] -> helpList False (Just s) @@ -129,5 +180,7 @@ see hbs2-cli for sigil commands (create, store, load, etc) flip runContT pure do caller <- ContT $ withMyRPC @MailboxAPI rpc - lift $ run (dict caller) cli >>= eatNil display + stoAPI <- ContT $ withMyRPC @StorageAPI rpc + let sto = AnyStorage (StorageClient stoAPI) + lift $ run (dict sto caller) cli >>= eatNil display diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 725458c7..d4e96bb3 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -1,4 +1,5 @@ {-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} module MailboxProtoWorker ( mailboxProtoWorker , createMailboxProtoWorker , MailboxProtoWorker @@ -18,6 +19,7 @@ import HBS2.Storage.Operations.Missed import HBS2.Hash import HBS2.Peer.Proto import HBS2.Peer.Proto.Mailbox +import HBS2.Net.Messaging.Unix import HBS2.Net.Auth.Credentials import HBS2.System.Dir @@ -52,14 +54,18 @@ hbs2MailboxDirOpt = "hbs2:mailbox:dir" data MailboxProtoWorker (s :: CryptoScheme) e = MailboxProtoWorker - { mpwStorage :: AnyStorage + { mpwPeerEnv :: PeerEnv e + , mpwDownloadEnv :: DownloadEnv e + , mpwStorage :: AnyStorage , inMessageQueue :: TBQueue (Message s, MessageContent s) + , inMessageQueueInNum :: TVar Int + , inMessageQueueOutNum :: TVar Int , inMessageQueueDropped :: TVar Int , inMessageDeclined :: TVar Int , mailboxDB :: TVar (Maybe DBPipeEnv) } -instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where +instance (s ~ HBS2Basic, e ~ L4Proto, s ~ Encryption e) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where mailboxGetStorage = pure . mpwStorage mailboxAcceptMessage MailboxProtoWorker{..} m c = do @@ -69,8 +75,10 @@ instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) whe modifyTVar inMessageQueueDropped succ else do writeTBQueue inMessageQueue (m,c) + modifyTVar inMessageQueueInNum succ -instance (s ~ HBS2Basic) => IsMailboxService s (MailboxProtoWorker s e) where +instance ( s ~ Encryption e, e ~ L4Proto + ) => IsMailboxService s (MailboxProtoWorker s e) where mailboxCreate MailboxProtoWorker{..} t p = do debug $ "mailboxWorker.mailboxCreate" <+> pretty (AsBase58 p) <+> pretty t @@ -91,6 +99,15 @@ instance (s ~ HBS2Basic) => IsMailboxService s (MailboxProtoWorker s e) where Right{} -> pure $ Right () Left{} -> pure $ Left (MailboxCreateFailed "database operation") + mailboxSendMessage w@MailboxProtoWorker{..} mess = do + -- we do not check message signature here + -- because it will be checked in the protocol handler anyway + liftIO $ withPeerM mpwPeerEnv do + me <- ownPeer @e + runResponseM me $ do + mailboxProto @e True w (MailBoxProtoV1 (SendMessage mess)) + + pure $ Right () getMailboxType_ :: (ForMailbox s, MonadIO m) => DBPipeEnv -> Recipient s -> m (Maybe MailboxType) getMailboxType_ d r = do @@ -101,16 +118,20 @@ getMailboxType_ d r = do <&> headMay . catMaybes createMailboxProtoWorker :: forall e m . MonadIO m - => AnyStorage + => PeerEnv e + -> DownloadEnv e + -> AnyStorage -> m (MailboxProtoWorker (Encryption e) e) -createMailboxProtoWorker sto = do +createMailboxProtoWorker pe de sto = do -- FIXME: queue-size-hardcode -- $class: hardcode inQ <- newTBQueueIO 1000 inDroppped <- newTVarIO 0 - decl <- newTVarIO 0 + inNum <- newTVarIO 0 + outNum <- newTVarIO 0 + decl <- newTVarIO 0 dbe <- newTVarIO Nothing - pure $ MailboxProtoWorker sto inQ inDroppped decl dbe + pure $ MailboxProtoWorker pe de sto inQ inNum outNum inDroppped decl dbe mailboxProtoWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -139,13 +160,15 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do inq <- ContT $ withAsync (mailboxInQ dbe) + sendq <- ContT $ withAsync $ mailboxSendQ + bs <- ContT $ withAsync do forever do pause @'Seconds 10 debug $ "I'm" <+> yellow "mailboxProtoWorker" - void $ waitAnyCancel [bs,pipe,inq] + void $ waitAnyCancel [bs,pipe,inq,sendq] `catch` \( e :: MailboxProtoException ) -> do err $ red "mailbox protocol worker terminated" <+> viaShow e @@ -154,17 +177,24 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do warn $ yellow "mailbox protocol worker exited" where + + mailboxSendQ = do + forever do + pause @'Seconds 10 + debug $ yellow "send mail loop" + mailboxInQ dbe = do forever do pause @'Seconds 10 mess <- atomically $ STM.flushTBQueue inMessageQueue for_ mess $ \(m,s) -> do + atomically $ modifyTVar inMessageQueueInNum pred -- FIXME: remove let ha = hashObject @HbSync (serialise m) -- сохраняем или нет? -- по госсипу уже послали. сохранять надо, только если -- у нас есть ящик - debug $ "received message" <+> pretty (AsBase58 (HashRef ha)) + debug $ yellow "received message" <+> pretty (AsBase58 (HashRef ha)) -- TODO: process-with-policy diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 9b4176e4..630ec129 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -902,7 +902,7 @@ runPeer opts = Exception.handle (\e -> myException e rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter - mailboxWorker <- createMailboxProtoWorker @e (AnyStorage s) + mailboxWorker <- createMailboxProtoWorker @e penv denv (AnyStorage s) let onNoBlock (p, h) = do already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust @@ -1136,7 +1136,7 @@ runPeer opts = Exception.handle (\e -> myException e , makeResponse (refChanNotifyProto False refChanAdapter) -- TODO: change-all-to-authorized , makeResponse ((authorized . subscribed (SomeBrains brains)) lwwRefProtoA) - , makeResponse ((authorized . mailboxProto) mailboxWorker) + , makeResponse ((authorized . mailboxProto False) mailboxWorker) ] @@ -1233,6 +1233,7 @@ runPeer opts = Exception.handle (\e -> myException e , rpcDoRefChanPropose = refChanProposeAction , rpcDoRefChanNotify = refChanNotifyAction , rpcMailboxService = AnyMailboxService @s mailboxWorker + , rpcMailboxAdapter = AnyMailboxAdapter @s mailboxWorker } m1 <- async $ runMessagingUnix rpcmsg diff --git a/hbs2-peer/app/RPC2/Mailbox.hs b/hbs2-peer/app/RPC2/Mailbox.hs index c9dac8ce..5e746d7c 100644 --- a/hbs2-peer/app/RPC2/Mailbox.hs +++ b/hbs2-peer/app/RPC2/Mailbox.hs @@ -43,3 +43,11 @@ instance (ForMailboxRPC m) => HandleMethod m RpcMailboxCreate where debug $ "rpc.RpcMailboxCreate" <+> pretty (AsBase58 puk) <+> pretty t +instance (ForMailboxRPC m) => HandleMethod m RpcMailboxSend where + + handleMethod mess = do + co <- getRpcContext @MailboxAPI @RPC2Context + let w = rpcMailboxService co + debug $ "rpc.RpcMailboxSend" + void $ mailboxSendMessage w mess + diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs index a4d92c83..ce4327e3 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs @@ -87,10 +87,33 @@ data MailboxServiceError = MailboxCreateFailed String deriving stock (Typeable,Show) -class IsMailboxService s a where - mailboxCreate :: forall m . MonadIO m => a -> MailboxType -> Recipient s -> m (Either MailboxServiceError ()) -data AnyMailboxService s = forall a . (IsMailboxService s a) => AnyMailboxService { adapter :: a } +class ForMailbox s => IsMailboxService s a where + + mailboxCreate :: forall m . MonadIO m + => a + -> MailboxType + -> Recipient s + -> m (Either MailboxServiceError ()) + + mailboxSendMessage :: forall m . MonadIO m + => a + -> Message s + -> m (Either MailboxServiceError ()) + +data AnyMailboxService s = + forall a . (IsMailboxService s a) => AnyMailboxService { mailboxService :: a } + +data AnyMailboxAdapter s = + forall a . (IsMailboxProtoAdapter s a) => AnyMailboxAdapter { mailboxAdapter :: a} + +instance ForMailbox s => IsMailboxService s (AnyMailboxService s) where + mailboxCreate (AnyMailboxService a) = mailboxCreate @s a + mailboxSendMessage (AnyMailboxService a) = mailboxSendMessage @s a + +instance IsMailboxProtoAdapter s (AnyMailboxAdapter s) where + mailboxGetStorage (AnyMailboxAdapter a) = mailboxGetStorage @s a + mailboxAcceptMessage (AnyMailboxAdapter a) = mailboxAcceptMessage @s a mailboxProto :: forall e s m p a . ( MonadIO m , Response e p m @@ -101,11 +124,12 @@ mailboxProto :: forall e s m p a . ( MonadIO m , s ~ Encryption e , ForMailbox s ) - => a + => Bool -- ^ inner, i.e from own peer + -> a -> MailBoxProto (Encryption e) e -> m () -mailboxProto adapter mess = do +mailboxProto inner adapter mess = do -- common stuff sto <- mailboxGetStorage @s adapter @@ -132,6 +156,8 @@ mailboxProto adapter mess = do let unboxed' = unboxSignedBox0 @(MessageContent s) (messageContent msg) -- ок, сообщение нормальное, шлём госсип, пишем, что обработали + -- TODO: increment-malformed-messages-statistics + -- $workflow: backlog (_, content) <- ContT $ maybe1 unboxed' none let h = hashObject @HbSync (serialise msg) & HashRef diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs index 977e8a66..8bab1687 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs @@ -8,6 +8,7 @@ import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Data.Types.SignedBox import HBS2.Peer.Proto.Mailbox.Types +import HBS2.Peer.Proto.Mailbox import Data.ByteString.Lazy ( ByteString ) import Data.ByteString qualified as BS @@ -15,9 +16,11 @@ import Codec.Serialise data RpcMailboxPoke data RpcMailboxCreate +data RpcMailboxSend type MailboxAPI = '[ RpcMailboxPoke , RpcMailboxCreate + , RpcMailboxSend ] type MailboxAPIProto = 0x056091510d3b2ec9 @@ -35,4 +38,7 @@ type instance Output RpcMailboxPoke = () type instance Input RpcMailboxCreate = (PubKey 'Sign HBS2Basic, MailboxType) type instance Output RpcMailboxCreate = () +type instance Input RpcMailboxSend = (Message HBS2Basic) +type instance Output RpcMailboxSend = () + diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs index 65d88d23..085dda94 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs @@ -41,6 +41,7 @@ data RPC2Context = , rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO () , rpcDoRefChanNotify :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO () , rpcMailboxService :: AnyMailboxService (Encryption L4Proto) + , rpcMailboxAdapter :: AnyMailboxAdapter (Encryption L4Proto) } instance (Monad m, Messaging MessagingUnix UNIX (Encoded UNIX)) => HasFabriq UNIX (ReaderT RPC2Context m) where