wip, messages to storage

This commit is contained in:
voidlizard 2024-10-11 07:07:54 +03:00
parent d69ea63319
commit 49bdbb1a0f
5 changed files with 89 additions and 18 deletions

View File

@ -12,22 +12,27 @@ import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Actors.Peer
import HBS2.Data.Types.Refs
import HBS2.Data.Detect
import HBS2.Net.Proto
import HBS2.Base58
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Merkle
import HBS2.Hash
import HBS2.Peer.Proto
import HBS2.Peer.Proto.Mailbox
import HBS2.Peer.Proto.Mailbox.Entry
import HBS2.Net.Messaging.Unix
import HBS2.Net.Auth.Credentials
import HBS2.Polling
import HBS2.System.Dir
import HBS2.Misc.PrettyStuff
import Brains
import PeerConfig
import PeerTypes
import BlockDownload()
import DBPipe.SQLite
@ -37,6 +42,10 @@ import Data.Maybe
import UnliftIO
import Control.Concurrent.STM qualified as STM
-- import Control.Concurrent.STM.TBQueue
import Data.HashMap.Strict qualified as HM
import Data.HashMap.Strict (HashMap)
import Data.HashSet qualified as HS
import Data.HashSet (HashSet)
import Lens.Micro.Platform
import Text.InterpolatedString.Perl6 (qc)
@ -59,6 +68,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e =
, mpwDownloadEnv :: DownloadEnv e
, mpwStorage :: AnyStorage
, inMessageQueue :: TBQueue (Message s, MessageContent s)
, inMessageMergeQueue :: TVar (HashMap (MailboxRefKey s) (HashSet HashRef))
, inMessageQueueInNum :: TVar Int
, inMessageQueueOutNum :: TVar Int
, inMessageQueueDropped :: TVar Int
@ -118,21 +128,22 @@ getMailboxType_ d r = do
<&> fmap (fromStringMay @MailboxType . fromOnly)
<&> headMay . catMaybes
createMailboxProtoWorker :: forall e m . MonadIO m
createMailboxProtoWorker :: forall s e m . (MonadIO m, s ~ Encryption e, ForMailbox s)
=> PeerEnv e
-> DownloadEnv e
-> AnyStorage
-> m (MailboxProtoWorker (Encryption e) e)
-> m (MailboxProtoWorker s e)
createMailboxProtoWorker pe de sto = do
-- FIXME: queue-size-hardcode
-- $class: hardcode
inQ <- newTBQueueIO 1000
mergeQ <- newTVarIO mempty
inDroppped <- newTVarIO 0
inNum <- newTVarIO 0
outNum <- newTVarIO 0
decl <- newTVarIO 0
dbe <- newTVarIO Nothing
pure $ MailboxProtoWorker pe de sto inQ inNum outNum inDroppped decl dbe
pure $ MailboxProtoWorker pe de sto inQ mergeQ inNum outNum inDroppped decl dbe
mailboxProtoWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
@ -144,6 +155,7 @@ mailboxProtoWorker :: forall e s m . ( MonadIO m
, s ~ Encryption e
, IsRefPubKey s
, ForMailbox s
, m ~ PeerM e IO
)
=> m [Syntax C]
-> MailboxProtoWorker s e
@ -161,13 +173,15 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
inq <- ContT $ withAsync (mailboxInQ dbe)
mergeQ <- ContT $ withAsync mailboxMergeQ
bs <- ContT $ withAsync do
forever do
pause @'Seconds 10
debug $ "I'm" <+> yellow "mailboxProtoWorker"
void $ waitAnyCancel [bs,pipe,inq]
void $ waitAnyCancel [bs,pipe,inq,mergeQ]
`catch` \( e :: MailboxProtoException ) -> do
err $ red "mailbox protocol worker terminated" <+> viaShow e
@ -178,17 +192,12 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
where
mailboxInQ dbe = do
let sto = mpwStorage
forever do
pause @'Seconds 10
mess <- atomically $ STM.flushTBQueue inMessageQueue
for_ mess $ \(m,s) -> do
atomically $ modifyTVar inMessageQueueInNum pred
-- FIXME: remove
let ha = hashObject @HbSync (serialise m)
-- сохраняем или нет?
-- по госсипу уже послали. сохранять надо, только если
-- у нас есть ящик
debug $ yellow "received message" <+> pretty (AsBase58 (HashRef ha))
-- TODO: process-with-policy
@ -196,12 +205,71 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
mbox <- getMailboxType_ @s dbe rcpt
>>= toMPlus
case mbox of
MailboxHub -> debug $ blue "HUB" <+> pretty (AsBase58 rcpt) <+> "WRITE MESSAGE"
MailboxRelay -> debug $ blue "RELAY"<+> pretty (AsBase58 rcpt) <+> "WRITE MESSAGE"
-- TODO: ASAP-block-accounting
ha' <- putBlock sto (serialise m) <&> fmap HashRef
ha <- case ha' of
Just x -> pure x
Nothing -> do
err $ red "storage error, can't store message"
mzero
let ref = MailboxRefKey @s rcpt
debug $ yellow "mailbox: message stored" <+> pretty ref <+> pretty ha
h' <- enqueueBlock sto (serialise (Existed ha))
for_ h' $ \h -> do
atomically do
modifyTVar inMessageMergeQueue (HM.insertWith (<>) ref (HS.singleton (HashRef h)))
-- TODO: check-attachment-policy-for-mailbox
-- TODO: ASAP-block-accounting-for-attachment
for_ (messageParts s) $ \part -> do
liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv
$ addDownload @e Nothing (fromHashRef part)
pure ()
-- read current mailbox
-- merge messages into
-- write current mailbox
-- put attachments to download
mailboxMergeQ = do
let sto = mpwStorage
-- FIXME: poll-timeout-hardcode?
let mboxes = readTVarIO inMessageMergeQueue
<&> fmap (,2) . HM.keys . HM.filter ( not . HS.null )
polling (Polling 2 2) mboxes $ \r -> void $ runMaybeT do
debug $ yellow "mailbox: merge-poll" <+> pretty r
-- NOTE: reliability
-- в случае отказа сторейджа все эти сообщения будут потеряны
-- однако, ввиду дублирования -- они рано или поздно будут
-- восстановлены с других реплик, если таковые имеются
newTx <- atomically do
n <- readTVar inMessageMergeQueue
<&> fromMaybe mempty . HM.lookup r
modifyTVar inMessageMergeQueue (HM.delete r)
pure n
v <- getRef sto r <&> fmap HashRef
txs <- maybe1 v (pure mempty) (readLog (liftIO . getBlock sto) )
let mergedTx = HS.fromList txs <> newTx & HS.toList
-- FIXME: size-hardcode-again
let pt = toPTree (MaxSize 6000) (MaxNum 256) mergedTx
nref <- makeMerkle 0 pt $ \(_,_,bss) -> void $ liftIO $ putBlock sto bss
updateRef sto r nref
debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref
mailboxStateEvolve :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, HasStorage m

View File

@ -902,7 +902,7 @@ runPeer opts = Exception.handle (\e -> myException e
rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter
mailboxWorker <- createMailboxProtoWorker @e penv denv (AnyStorage s)
mailboxWorker <- createMailboxProtoWorker penv denv (AnyStorage s)
let onNoBlock (p, h) = do
already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust

View File

@ -1004,8 +1004,6 @@ logMergeProcess penv env q = withPeerM penv do
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss
-- TODO: ASAP-emit-refchan-updated-notify
-- $workflow: wip
updateRef sto chanKey nref
notifyOnRefChanUpdated env chanKey nref

View File

@ -5,11 +5,17 @@ import HBS2.Peer.Proto.Mailbox.Types
import Data.Word
import Codec.Serialise
import Data.Hashable
data MailboxEntry =
Existed HashRef | Deleted HashRef
deriving stock (Eq,Ord,Show,Generic)
instance Hashable MailboxEntry where
hashWithSalt salt = \case
Existed r -> hashWithSalt salt (0x177c1a3ad45b678e :: Word64, r)
Deleted r -> hashWithSalt salt (0xac3196b4809ea027 :: Word64, r)
data RoutedEntry = RoutedEntry HashRef
deriving stock (Eq,Ord,Show,Generic)

View File

@ -36,4 +36,3 @@ instance Pretty (AsBase58 (PubKey 'Sign s)) => Pretty (MailboxRefKey s) where
pretty (MailboxRefKey k) = pretty (AsBase58 k)