diff --git a/hbs2-peer/app/CLI/Mailbox.hs b/hbs2-peer/app/CLI/Mailbox.hs index c74caa08..775092bc 100644 --- a/hbs2-peer/app/CLI/Mailbox.hs +++ b/hbs2-peer/app/CLI/Mailbox.hs @@ -154,6 +154,16 @@ runMailboxCLI rpc s = do _ -> throwIO $ BadFormException @C nil + brief "fetch mailbox" + $ entry $ bindMatch "fetch" $ nil_ $ \case + [ SignPubKeyLike m ] -> do + + callRpcWaitMay @RpcMailboxFetch t api m + >>= orThrowUser "rpc call timeout" + >>= orThrowPassIO + + _ -> throwIO $ BadFormException @C nil + brief "set mailbox policy" $ desc setPolicyDesc -- $ examples setPolicyExamples diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 8f3e776c..62b1eb99 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -51,6 +51,7 @@ import Data.HashMap.Strict qualified as HM import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.Maybe +import Data.Word import Codec.Serialise import Lens.Micro.Platform import Text.InterpolatedString.Perl6 (qc) @@ -78,14 +79,31 @@ hbs2MailboxDirOpt = "hbs2:mailbox:dir" {- HLINT ignore "Functor law" -} +data PolicyDownload s = + PolicyDownload + { policyDownloadWhen :: Word64 + , policyDownloadWhat :: SetPolicyPayload s + } + deriving stock Generic + +data MailboxDowload = + MailboxDowload + { mailboxDownWhen :: Word64 + , mailboxDownPolicy :: Maybe PolicyVersion + } + deriving stock Generic + data MailboxProtoWorker (s :: CryptoScheme) e = MailboxProtoWorker { mpwPeerEnv :: PeerEnv e , mpwDownloadEnv :: DownloadEnv e , mpwStorage :: AnyStorage , mpwCredentials :: PeerCredentials s + , mpwFetchQ :: TVar (HashSet (MailboxRefKey s)) , inMessageQueue :: TBQueue (Message s, MessageContent s) , inMessageMergeQueue :: TVar (HashMap (MailboxRefKey s) (HashSet HashRef)) + , inPolicyDownloadQ :: TVar (HashMap HashRef (PolicyDownload s)) + , inMailboxDownloadQ :: TVar (HashMap HashRef MailboxDowload) , inMessageQueueInNum :: TVar Int , inMessageQueueOutNum :: TVar Int , inMessageQueueDropped :: TVar Int @@ -93,6 +111,9 @@ data MailboxProtoWorker (s :: CryptoScheme) e = , mailboxDB :: TVar (Maybe DBPipeEnv) } +okay :: Monad m => good -> m (Either bad good) +okay good = pure (Right good) + instance (s ~ HBS2Basic, e ~ L4Proto, s ~ Encryption e) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where mailboxGetCredentials = pure . mpwCredentials @@ -173,6 +194,8 @@ instance ( s ~ Encryption e, e ~ L4Proto on conflict (mailbox) do update set hash = excluded.hash |] (MailboxRefKey @s who, PolicyHash what) + -- TODO: ASAP-gossip-new-state + pure what mailboxDelete MailboxProtoWorker{..} mbox = do @@ -217,24 +240,50 @@ instance ( s ~ Encryption e, e ~ L4Proto debug $ red "mailboxListBasic" - r <- withDB dbe do - select_ @_ @(MailboxRefKey s, MailboxType) [qc|select recipient,type from mailbox|] + r <- listMailboxes dbe pure $ Right r - mailboxAcceptStatus MailboxProtoWorker{..} ref who MailBoxStatusPayload{..} = do + mailboxAcceptStatus me@MailboxProtoWorker{..} ref who MailBoxStatusPayload{..} = do -- TODO: implement-policy-first -- итак, мы не можем двигаться, пока не будет реализована policy. - flip runContT pure $ callCC \_ -> do + flip runContT pure $ callCC \stop -> do + + now <- liftIO $ getPOSIXTime <&> round mdbe <- readTVarIO mailboxDB dbe <- ContT $ maybe1 mdbe (pure $ Left (MailboxCreateFailed "database not ready")) - p0 <- loadPolicyPayloadFor dbe mpwStorage ref <&> fmap snd + p0 <- loadPolicyPayloadFor dbe mpwStorage ref + <&> fmap (sppPolicyVersion . snd) . ((unboxSignedBox0 . snd) =<<) + <&> fromMaybe 0 - let p = unboxSignedBox0 =<< mbsMailboxPolicy + let bogusPolicyMessage = + err $ red "!!! arrived invalid policy signature for" + <+> pretty ref + <+> "from" + <+> pretty (AsBase58 who) + + + -- TODO: handle-invalid-policy-error + -- not "okay" actually + (rcptKey, pNew) <- ContT $ maybe1 (mbsMailboxPolicy >>= unboxSignedBox0) + (bogusPolicyMessage >> okay ()) + + when (coerce rcptKey /= ref) $ lift bogusPolicyMessage >> stop (Right ()) + + when (sppPolicyVersion pNew > p0) do + startDownloadStuff me (sppPolicyRef pNew) + atomically $ modifyTVar inPolicyDownloadQ (HM.insert (sppPolicyRef pNew) (PolicyDownload now pNew)) + + let v = Just $ max p0 (sppPolicyVersion pNew) + + maybe1 mbsMailboxHash (okay ()) $ \h -> do + startDownloadStuff me h + atomically $ modifyTVar inMailboxDownloadQ (HM.insert h (MailboxDowload now v)) + okay () -- если версия p > версии p0 -- ставим скачиваться, по скачиванию -- обновляем -- тут есть какой-то процесс, который должен поллить скачивания, не забываем, @@ -256,9 +305,7 @@ instance ( s ~ Encryption e, e ~ L4Proto -- если не ок -- то но обновляем? а что тогда -- - pure $ Right () - - pure $ Right () + okay () mailboxGetStatus MailboxProtoWorker{..} ref = do -- TODO: support-policy-ASAP @@ -282,6 +329,27 @@ instance ( s ~ Encryption e, e ~ L4Proto pure $ Right $ Just $ MailBoxStatusPayload @s now (coerce ref) t v spp + mailboxFetch MailboxProtoWorker{..} ref = do + debug $ red "mailboxFetch" <+> pretty ref + atomically (modifyTVar mpwFetchQ (HS.insert ref)) + okay () + +startDownloadStuff :: forall s e m . (ForMailbox s, s ~ Encryption e, MyPeer e, MonadIO m) + => MailboxProtoWorker s e + -> HashRef + -> m () + +startDownloadStuff MailboxProtoWorker{..} href = do + liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv + $ addDownload @e Nothing (coerce href) + +listMailboxes :: forall s m . (ForMailbox s, MonadIO m) + => DBPipeEnv + -> m [(MailboxRefKey s, MailboxType)] +listMailboxes dbe = do + withDB dbe do + select_ [qc|select recipient,type from mailbox|] + loadPolicyPayloadFor :: forall s m . (ForMailbox s, MonadIO m) => DBPipeEnv -> AnyStorage @@ -310,7 +378,10 @@ getMailboxType_ d r = do <&> fmap (fromStringMay @MailboxType . fromOnly) <&> headMay . catMaybes -createMailboxProtoWorker :: forall s e m . (MonadIO m, s ~ Encryption e, ForMailbox s) +createMailboxProtoWorker :: forall s e m . ( MonadIO m + , s ~ Encryption e + , ForMailbox s + ) => PeerCredentials s -> PeerEnv e -> DownloadEnv e @@ -319,14 +390,17 @@ createMailboxProtoWorker :: forall s e m . (MonadIO m, s ~ Encryption e, ForMail createMailboxProtoWorker pc pe de sto = do -- FIXME: queue-size-hardcode -- $class: hardcode - inQ <- newTBQueueIO 8000 - mergeQ <- newTVarIO mempty - inDroppped <- newTVarIO 0 - inNum <- newTVarIO 0 - outNum <- newTVarIO 0 - decl <- newTVarIO 0 - dbe <- newTVarIO Nothing - pure $ MailboxProtoWorker pe de sto pc inQ mergeQ inNum outNum inDroppped decl dbe + MailboxProtoWorker pe de sto pc + <$> newTVarIO mempty + <*> newTBQueueIO 8000 + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO Nothing mailboxProtoWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -352,19 +426,27 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do dbe <- lift $ mailboxStateEvolve readConf me - pipe <- ContT $ withAsync (runPipe dbe) + dpipe <- ContT $ withAsync (runPipe dbe) inq <- ContT $ withAsync (mailboxInQ dbe) mergeQ <- ContT $ withAsync mailboxMergeQ + mCheckQ <- ContT $ withAsync (mailboxCheckQ dbe) + + mFetchQ <- ContT $ withAsync (mailboxFetchQ dbe) + + pDownQ <- ContT $ withAsync policyDownloadQ + + sDownQ <- ContT $ withAsync stateDownloadQ + bs <- ContT $ withAsync do forever do pause @'Seconds 10 debug $ "I'm" <+> yellow "mailboxProtoWorker" - void $ waitAnyCancel [bs,pipe,inq,mergeQ] + void $ waitAnyCancel [bs,dpipe,inq,mergeQ,pDownQ,sDownQ,mCheckQ,mFetchQ] `catch` \( e :: MailboxProtoException ) -> do err $ red "mailbox protocol worker terminated" <+> viaShow e @@ -412,12 +494,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do -- TODO: check-attachment-policy-for-mailbox -- TODO: ASAP-block-accounting-for-attachment - for_ (messageParts s) $ \part -> do - - liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv - $ addDownload @e Nothing (fromHashRef part) - - pure () + for_ (messageParts s) (startDownloadStuff me) -- read current mailbox -- merge messages into @@ -430,7 +507,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do let mboxes = readTVarIO inMessageMergeQueue <&> fmap (,2) . HM.keys . HM.filter ( not . HS.null ) - polling (Polling 2 2) mboxes $ \r -> void $ runMaybeT do + polling (Polling 2 5) mboxes $ \r -> void $ runMaybeT do debug $ yellow "mailbox: merge-poll" <+> pretty r -- NOTE: reliability @@ -458,6 +535,42 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do updateRef sto r nref debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref + policyDownloadQ = do + forever do + pause @'Seconds 10 + debug $ red "mailbox: policyDownloadQ" + + stateDownloadQ = do + forever do + pause @'Seconds 10 + debug $ red "mailbox: stateDownloadQ" + + mailboxFetchQ dbe = forever do + toFetch <- atomically $ do + q <- readTVar mpwFetchQ + when (HS.null q) STM.retry + writeTVar mpwFetchQ mempty + pure q + + for_ toFetch $ \r -> do + t <- getMailboxType_ dbe r + maybe1 t none $ \_ -> do + debug $ yellow "mailbox: SEND FETCH REQUEST FOR" <+> pretty r + now <- liftIO (getPOSIXTime <&> round) + gossip (MailBoxProtoV1 @s @e (CheckMailbox (Just now) (coerce r))) + + mailboxCheckQ dbe = do + + -- FIXME: mailbox-check-period + -- right now it's 10 seconds for debug purposes + -- remove hardcode to smth reasonable + let mboxes = liftIO (listMailboxes @s dbe <&> fmap (set _2 10) ) + + polling (Polling 10 10) mboxes $ \r -> do + debug $ yellow "mailbox: SEND FETCH REQUEST FOR" <+> pretty r + now <- liftIO (getPOSIXTime <&> round) + gossip (MailBoxProtoV1 @s @e (CheckMailbox (Just now) (coerce r))) + mailboxStateEvolve :: forall e s m . ( MonadIO m , MonadUnliftIO m , HasStorage m diff --git a/hbs2-peer/app/RPC2/Mailbox.hs b/hbs2-peer/app/RPC2/Mailbox.hs index 8c2b7311..f3926430 100644 --- a/hbs2-peer/app/RPC2/Mailbox.hs +++ b/hbs2-peer/app/RPC2/Mailbox.hs @@ -61,6 +61,13 @@ instance (ForMailboxRPC m) => HandleMethod m RpcMailboxGetStatus where debug $ "rpc.RpcMailboxGetStatus" <+> pretty (AsBase58 puk) mailboxGetStatus @HBS2Basic mbs (MailboxRefKey puk) +instance (ForMailboxRPC m) => HandleMethod m RpcMailboxFetch where + + handleMethod puk = do + AnyMailboxService mbs <- getRpcContext @MailboxAPI @RPC2Context <&> rpcMailboxService + debug $ "rpc.RpcMailboxFetch" <+> pretty (AsBase58 puk) + mailboxFetch @HBS2Basic mbs (MailboxRefKey puk) + instance (ForMailboxRPC m) => HandleMethod m RpcMailboxDelete where handleMethod puk = do diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs index 0204139b..ed5719c0 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs @@ -135,6 +135,11 @@ class ForMailbox s => IsMailboxService s a where -> MailBoxStatusPayload s -> m (Either MailboxServiceError ()) + mailboxFetch :: forall m . MonadIO m + => a + -> MailboxRefKey s + -> m (Either MailboxServiceError ()) + data AnyMailboxService s = forall a . (IsMailboxService s a) => AnyMailboxService { mailboxService :: a } @@ -150,6 +155,7 @@ instance ForMailbox s => IsMailboxService s (AnyMailboxService s) where mailboxListBasic (AnyMailboxService a) = mailboxListBasic @s a mailboxGetStatus (AnyMailboxService a) = mailboxGetStatus @s a mailboxAcceptStatus (AnyMailboxService a) = mailboxAcceptStatus @s a + mailboxFetch (AnyMailboxService a) = mailboxFetch @s a instance IsMailboxProtoAdapter s (AnyMailboxAdapter s) where mailboxGetCredentials (AnyMailboxAdapter a) = mailboxGetCredentials @s a diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs index 90cdaed3..f0897f39 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Mailbox.hs @@ -19,6 +19,7 @@ data RpcMailboxCreate data RpcMailboxSetPolicy data RpcMailboxDelete data RpcMailboxGetStatus +data RpcMailboxFetch data RpcMailboxList data RpcMailboxSend data RpcMailboxGet @@ -28,6 +29,7 @@ type MailboxAPI = '[ RpcMailboxPoke , RpcMailboxSetPolicy , RpcMailboxDelete , RpcMailboxGetStatus + , RpcMailboxFetch , RpcMailboxList , RpcMailboxSend , RpcMailboxGet @@ -57,6 +59,9 @@ type instance Output RpcMailboxDelete = () type instance Input RpcMailboxGetStatus = (PubKey 'Sign HBS2Basic) type instance Output RpcMailboxGetStatus = Either MailboxServiceError (Maybe (MailBoxStatusPayload 'HBS2Basic)) +type instance Input RpcMailboxFetch = (PubKey 'Sign HBS2Basic) +type instance Output RpcMailboxFetch = Either MailboxServiceError () + type instance Input RpcMailboxList = () type instance Output RpcMailboxList = [(MailboxRefKey 'HBS2Basic, MailboxType)]