From 6ef32866757a371b3ae467f4fd9100e51e62931d Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 9 Oct 2024 10:31:15 +0300 Subject: [PATCH] wip --- hbs2-peer/app/MailboxProtoWorker.hs | 23 ++++++++++++++++------- hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs | 1 + 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 9ece0952..8288dc61 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -31,6 +31,7 @@ import DBPipe.SQLite import Control.Monad.Trans.Cont import UnliftIO +import Control.Concurrent.STM qualified as STM -- import Control.Concurrent.STM.TBQueue import Lens.Micro.Platform import Text.InterpolatedString.Perl6 (qc) @@ -53,6 +54,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e = { mpwStorage :: AnyStorage , inMessageQueue :: TBQueue (Message s, MessageContent s) , inMessageQueueDropped :: TVar Int + , inMessageDeclined :: TVar Int , mailboxDB :: TVar (Maybe DBPipeEnv) } @@ -88,6 +90,11 @@ instance (s ~ HBS2Basic) => IsMailboxService s (MailboxProtoWorker s e) where Right{} -> pure $ Right () Left{} -> pure $ Left (MailboxCreateFailed "database operation") + +checkMailbox_ :: MonadIO m => DBPipeEnv -> Recipient s -> m Bool +checkMailbox_ d r = do + pure False + createMailboxProtoWorker :: forall e m . MonadIO m => AnyStorage -> m (MailboxProtoWorker (Encryption e) e) @@ -96,8 +103,9 @@ createMailboxProtoWorker sto = do -- $class: hardcode inQ <- newTBQueueIO 1000 inDroppped <- newTVarIO 0 + decl <- newTVarIO 0 dbe <- newTVarIO Nothing - pure $ MailboxProtoWorker sto inQ inDroppped dbe + pure $ MailboxProtoWorker sto inQ inDroppped decl dbe mailboxProtoWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -113,7 +121,7 @@ mailboxProtoWorker :: forall e s m . ( MonadIO m -> MailboxProtoWorker s e -> m () -mailboxProtoWorker readConf me = do +mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do pause @'Seconds 10 @@ -123,7 +131,7 @@ mailboxProtoWorker readConf me = do pipe <- ContT $ withAsync (runPipe dbe) - inq <- ContT $ withAsync mailboxInQ + inq <- ContT $ withAsync (mailboxInQ dbe me) bs <- ContT $ withAsync do @@ -131,7 +139,7 @@ mailboxProtoWorker readConf me = do pause @'Seconds 10 debug $ "I'm" <+> yellow "mailboxProtoWorker" - void $ waitAnyCancel [bs,pipe] + void $ waitAnyCancel [bs,pipe,inq] `catch` \( e :: MailboxProtoException ) -> do err $ "mailbox protocol worker terminated" <+> viaShow e @@ -140,11 +148,12 @@ mailboxProtoWorker readConf me = do warn $ yellow "mailbox protocol worker exited" where - mailboxInQ = do + mailboxInQ dbe MailboxProtoWorker{..} = do forever do pause @'Seconds 10 - debug "mailbox check inQ" - + mess <- atomically $ STM.flushTBQueue inMessageQueue + for_ mess $ \(m,s) -> do + debug "received message" mailboxStateEvolve :: forall e s m . ( MonadIO m , MonadUnliftIO m diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs index 2836cf27..a4d92c83 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs @@ -3,6 +3,7 @@ module HBS2.Peer.Proto.Mailbox ( module HBS2.Peer.Proto.Mailbox , module HBS2.Peer.Proto.Mailbox.Message + , module HBS2.Peer.Proto.Mailbox.Types ) where import HBS2.Prelude.Plated