diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 6c79c6e5..5c0cd4c4 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -12,22 +12,27 @@ import HBS2.Prelude.Plated import HBS2.OrDie import HBS2.Actors.Peer import HBS2.Data.Types.Refs +import HBS2.Data.Detect import HBS2.Net.Proto import HBS2.Base58 import HBS2.Storage import HBS2.Storage.Operations.Missed +import HBS2.Merkle import HBS2.Hash import HBS2.Peer.Proto import HBS2.Peer.Proto.Mailbox +import HBS2.Peer.Proto.Mailbox.Entry import HBS2.Net.Messaging.Unix import HBS2.Net.Auth.Credentials +import HBS2.Polling import HBS2.System.Dir import HBS2.Misc.PrettyStuff import Brains import PeerConfig import PeerTypes +import BlockDownload() import DBPipe.SQLite @@ -37,6 +42,10 @@ import Data.Maybe import UnliftIO import Control.Concurrent.STM qualified as STM -- import Control.Concurrent.STM.TBQueue +import Data.HashMap.Strict qualified as HM +import Data.HashMap.Strict (HashMap) +import Data.HashSet qualified as HS +import Data.HashSet (HashSet) import Lens.Micro.Platform import Text.InterpolatedString.Perl6 (qc) @@ -59,6 +68,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e = , mpwDownloadEnv :: DownloadEnv e , mpwStorage :: AnyStorage , inMessageQueue :: TBQueue (Message s, MessageContent s) + , inMessageMergeQueue :: TVar (HashMap (MailboxRefKey s) (HashSet HashRef)) , inMessageQueueInNum :: TVar Int , inMessageQueueOutNum :: TVar Int , inMessageQueueDropped :: TVar Int @@ -118,21 +128,22 @@ getMailboxType_ d r = do <&> fmap (fromStringMay @MailboxType . fromOnly) <&> headMay . catMaybes -createMailboxProtoWorker :: forall e m . MonadIO m +createMailboxProtoWorker :: forall s e m . (MonadIO m, s ~ Encryption e, ForMailbox s) => PeerEnv e -> DownloadEnv e -> AnyStorage - -> m (MailboxProtoWorker (Encryption e) e) + -> m (MailboxProtoWorker s e) createMailboxProtoWorker pe de sto = do -- FIXME: queue-size-hardcode -- $class: hardcode inQ <- newTBQueueIO 1000 + mergeQ <- newTVarIO mempty inDroppped <- newTVarIO 0 inNum <- newTVarIO 0 outNum <- newTVarIO 0 decl <- newTVarIO 0 dbe <- newTVarIO Nothing - pure $ MailboxProtoWorker pe de sto inQ inNum outNum inDroppped decl dbe + pure $ MailboxProtoWorker pe de sto inQ mergeQ inNum outNum inDroppped decl dbe mailboxProtoWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -144,6 +155,7 @@ mailboxProtoWorker :: forall e s m . ( MonadIO m , s ~ Encryption e , IsRefPubKey s , ForMailbox s + , m ~ PeerM e IO ) => m [Syntax C] -> MailboxProtoWorker s e @@ -161,13 +173,15 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do inq <- ContT $ withAsync (mailboxInQ dbe) + mergeQ <- ContT $ withAsync mailboxMergeQ + bs <- ContT $ withAsync do forever do pause @'Seconds 10 debug $ "I'm" <+> yellow "mailboxProtoWorker" - void $ waitAnyCancel [bs,pipe,inq] + void $ waitAnyCancel [bs,pipe,inq,mergeQ] `catch` \( e :: MailboxProtoException ) -> do err $ red "mailbox protocol worker terminated" <+> viaShow e @@ -178,17 +192,12 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do where mailboxInQ dbe = do + let sto = mpwStorage forever do pause @'Seconds 10 mess <- atomically $ STM.flushTBQueue inMessageQueue for_ mess $ \(m,s) -> do atomically $ modifyTVar inMessageQueueInNum pred - -- FIXME: remove - let ha = hashObject @HbSync (serialise m) - -- сохраняем или нет? - -- по госсипу уже послали. сохранять надо, только если - -- у нас есть ящик - debug $ yellow "received message" <+> pretty (AsBase58 (HashRef ha)) -- TODO: process-with-policy @@ -196,11 +205,70 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do mbox <- getMailboxType_ @s dbe rcpt >>= toMPlus - case mbox of - MailboxHub -> debug $ blue "HUB" <+> pretty (AsBase58 rcpt) <+> "WRITE MESSAGE" - MailboxRelay -> debug $ blue "RELAY"<+> pretty (AsBase58 rcpt) <+> "WRITE MESSAGE" + -- TODO: ASAP-block-accounting + ha' <- putBlock sto (serialise m) <&> fmap HashRef - pure () + ha <- case ha' of + Just x -> pure x + Nothing -> do + err $ red "storage error, can't store message" + mzero + + let ref = MailboxRefKey @s rcpt + + debug $ yellow "mailbox: message stored" <+> pretty ref <+> pretty ha + + h' <- enqueueBlock sto (serialise (Existed ha)) + + for_ h' $ \h -> do + atomically do + modifyTVar inMessageMergeQueue (HM.insertWith (<>) ref (HS.singleton (HashRef h))) + + -- TODO: check-attachment-policy-for-mailbox + + -- TODO: ASAP-block-accounting-for-attachment + for_ (messageParts s) $ \part -> do + + liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv + $ addDownload @e Nothing (fromHashRef part) + + pure () + + -- read current mailbox + -- merge messages into + -- write current mailbox + -- put attachments to download + + mailboxMergeQ = do + let sto = mpwStorage + -- FIXME: poll-timeout-hardcode? + let mboxes = readTVarIO inMessageMergeQueue + <&> fmap (,2) . HM.keys . HM.filter ( not . HS.null ) + + polling (Polling 2 2) mboxes $ \r -> void $ runMaybeT do + debug $ yellow "mailbox: merge-poll" <+> pretty r + + -- NOTE: reliability + -- в случае отказа сторейджа все эти сообщения будут потеряны + -- однако, ввиду дублирования -- они рано или поздно будут + -- восстановлены с других реплик, если таковые имеются + newTx <- atomically do + n <- readTVar inMessageMergeQueue + <&> fromMaybe mempty . HM.lookup r + modifyTVar inMessageMergeQueue (HM.delete r) + pure n + + v <- getRef sto r <&> fmap HashRef + txs <- maybe1 v (pure mempty) (readLog (liftIO . getBlock sto) ) + + let mergedTx = HS.fromList txs <> newTx & HS.toList + + -- FIXME: size-hardcode-again + let pt = toPTree (MaxSize 6000) (MaxNum 256) mergedTx + nref <- makeMerkle 0 pt $ \(_,_,bss) -> void $ liftIO $ putBlock sto bss + + updateRef sto r nref + debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref mailboxStateEvolve :: forall e s m . ( MonadIO m , MonadUnliftIO m diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 630ec129..d84873e8 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -902,7 +902,7 @@ runPeer opts = Exception.handle (\e -> myException e rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter - mailboxWorker <- createMailboxProtoWorker @e penv denv (AnyStorage s) + mailboxWorker <- createMailboxProtoWorker penv denv (AnyStorage s) let onNoBlock (p, h) = do already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 09cf18d9..d0ecbc5c 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -1004,8 +1004,6 @@ logMergeProcess penv env q = withPeerM penv do nref <- makeMerkle 0 pt $ \(_,_,bss) -> do void $ putBlock sto bss - -- TODO: ASAP-emit-refchan-updated-notify - -- $workflow: wip updateRef sto chanKey nref notifyOnRefChanUpdated env chanKey nref diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs index 57970e3e..da92eda5 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs @@ -5,11 +5,17 @@ import HBS2.Peer.Proto.Mailbox.Types import Data.Word import Codec.Serialise +import Data.Hashable data MailboxEntry = Existed HashRef | Deleted HashRef deriving stock (Eq,Ord,Show,Generic) +instance Hashable MailboxEntry where + hashWithSalt salt = \case + Existed r -> hashWithSalt salt (0x177c1a3ad45b678e :: Word64, r) + Deleted r -> hashWithSalt salt (0xac3196b4809ea027 :: Word64, r) + data RoutedEntry = RoutedEntry HashRef deriving stock (Eq,Ord,Show,Generic) diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Ref.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Ref.hs index 1332c9e0..ceb450bf 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Ref.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Ref.hs @@ -36,4 +36,3 @@ instance Pretty (AsBase58 (PubKey 'Sign s)) => Pretty (MailboxRefKey s) where pretty (MailboxRefKey k) = pretty (AsBase58 k) -