diff --git a/hbs2-peer/app/CLI/Mailbox.hs b/hbs2-peer/app/CLI/Mailbox.hs index 775092bc..0c150387 100644 --- a/hbs2-peer/app/CLI/Mailbox.hs +++ b/hbs2-peer/app/CLI/Mailbox.hs @@ -253,7 +253,7 @@ runMailboxCLI rpc s = do Deleted _ mh -> do atomically $ modifyTVar d (HS.insert mh) - Existed _ mh -> do + Exists _ mh -> do atomically $ modifyTVar r (HS.insert mh) deleted <- readTVarIO d diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 35411d26..9d03bbea 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -508,7 +508,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do -- TODO: add-policy-reference let proof = ProofOfExist mzero - h' <- enqueueBlock sto (serialise (Existed proof ha)) + h' <- enqueueBlock sto (serialise (Exists proof ha)) for_ h' $ \h -> do atomically do @@ -558,6 +558,11 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do updateRef sto r nref debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref + for_ newTx $ \t -> do + -- FIXME: use-bloom-filter-or-something + -- $class: leak + putBlock sto (serialise (MergedEntry r t)) + policyDownloadQ dbe = do -- FIXME: too-often-checks-affect-performance @@ -602,22 +607,89 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do polling (Polling 30 30) mail $ \(pk, down@MailboxDownload{..}) -> do done <- findMissedBlocks mpwStorage mailboxStatusRef <&> L.null + fails <- newTVarIO 0 + when (done && not mailboxDownDone) do atomically $ modifyTVar inMailboxDownloadQ (HM.insert pk (down { mailboxDownDone = True })) debug $ "mailbox state downloaded" <+> pretty pk when done do debug $ "mailbox/debug: drop state" <+> pretty pk <+> pretty mailboxStatusRef - atomically $ modifyTVar inMailboxDownloadQ (HM.delete pk) -- FIXME: assume-huge-mailboxes walkMerkle @[HashRef] (coerce mailboxStatusRef) (getBlock mpwStorage) $ \case - Left what -> err $ red "mailbox: missed block for tree" <+> pretty mailboxStatusRef - Right hs -> void $ runMaybeT do - for_ hs $ \h -> do + Left what -> do + err $ red "mailbox: missed block for tree" <+> pretty mailboxStatusRef <+> pretty what + atomically $ modifyTVar fails succ + + Right hs -> do + for_ hs $ \h -> void $ runMaybeT do debug $ red ">>>" <+> "MERGE MAILBOX ENTRY" <+> pretty h + -- FIXME: invent-better-filter + -- $class: leak + let mergedEntry = serialise (MergedEntry mailboxRef h) + let mergedH = mergedEntry & hashObject + + already <- getBlock mpwStorage mergedH + + when (isJust already) do + debug $ red "!!!" <+> "skip already merged tx" <+> pretty h + mzero + + entry' <- getBlock mpwStorage (coerce h) + + when (isNothing entry') do + startDownloadStuff me h + atomically $ modifyTVar fails succ + mzero + + entry <- toMPlus entry' + <&> deserialiseOrFail @MailboxEntry + >>= toMPlus + + case entry of + Deleted{} -> do + atomically $ modifyTVar inMessageMergeQueue (HM.insert mailboxRef (HS.singleton h)) + -- write-already-merged + + Exists _ w -> do + msg' <- getBlock mpwStorage (coerce w) + + case msg' of + Nothing -> do + startDownloadStuff me h + atomically $ modifyTVar fails succ + mzero + + Just msg -> do + let mess = deserialiseOrFail @(Message s) msg + + case mess of + Left{} -> do + warn $ "malformed message" <+> pretty w + void $ putBlock mpwStorage mergedEntry + + Right normal -> do + let checked = unboxSignedBox0 (messageContent normal) + + case checked of + Nothing -> do + warn $ "invalid signature for message" <+> pretty w + void $ putBlock mpwStorage mergedEntry + + Just (_, content) -> do + -- FIXME: what-if-message-queue-full? + mailboxAcceptMessage me normal content + pure () + + failNum <- readTVarIO fails + + when (failNum == 0) do + debug $ "mailbox state process succeed" <+> pretty mailboxStatusRef + atomically $ modifyTVar inMailboxDownloadQ (HM.delete pk) + mailboxFetchQ dbe = forever do toFetch <- atomically $ do q <- readTVar mpwFetchQ diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs index 95575ba7..62b69e97 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs @@ -33,6 +33,12 @@ import Data.Maybe import Data.Word import Lens.Micro.Platform + +data MergedEntry s = MergedEntry (MailboxRefKey s) HashRef + deriving stock (Generic) + +instance ForMailbox s => Serialise (MergedEntry s) + data SetPolicyPayload s = SetPolicyPayload { sppMailboxKey :: MailboxKey s diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs index 15149126..2036a4f5 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox/Entry.hs @@ -33,18 +33,19 @@ instance Semigroup ProofOfExist where (<>) (ProofOfExist a1) (ProofOfExist a2) = ProofOfExist (a1 <|> a2) data MailboxEntry = - Existed ProofOfExist HashRef + Exists ProofOfExist HashRef | Deleted ProofOfDelete HashRef -- ^ proof-of-message-to-validate deriving stock (Eq,Ord,Show,Generic) instance Hashable MailboxEntry where hashWithSalt salt = \case - Existed p r -> hashWithSalt salt (0x177c1a3ad45b678e :: Word64, serialise (p,r)) + Exists p r -> hashWithSalt salt (0x177c1a3ad45b678e :: Word64, serialise (p,r)) Deleted p r -> hashWithSalt salt (0xac3196b4809ea027 :: Word64, serialise (p,r)) data RoutedEntry = RoutedEntry HashRef deriving stock (Eq,Ord,Show,Generic) + instance Serialise MailboxEntry instance Serialise RoutedEntry instance Serialise ProofOfDelete