This commit is contained in:
voidlizard 2024-10-09 10:31:15 +03:00
parent f2b8bc1517
commit 6ef3286675
2 changed files with 17 additions and 7 deletions

View File

@ -31,6 +31,7 @@ import DBPipe.SQLite
import Control.Monad.Trans.Cont
import UnliftIO
import Control.Concurrent.STM qualified as STM
-- import Control.Concurrent.STM.TBQueue
import Lens.Micro.Platform
import Text.InterpolatedString.Perl6 (qc)
@ -53,6 +54,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e =
{ mpwStorage :: AnyStorage
, inMessageQueue :: TBQueue (Message s, MessageContent s)
, inMessageQueueDropped :: TVar Int
, inMessageDeclined :: TVar Int
, mailboxDB :: TVar (Maybe DBPipeEnv)
}
@ -88,6 +90,11 @@ instance (s ~ HBS2Basic) => IsMailboxService s (MailboxProtoWorker s e) where
Right{} -> pure $ Right ()
Left{} -> pure $ Left (MailboxCreateFailed "database operation")
checkMailbox_ :: MonadIO m => DBPipeEnv -> Recipient s -> m Bool
checkMailbox_ d r = do
pure False
createMailboxProtoWorker :: forall e m . MonadIO m
=> AnyStorage
-> m (MailboxProtoWorker (Encryption e) e)
@ -96,8 +103,9 @@ createMailboxProtoWorker sto = do
-- $class: hardcode
inQ <- newTBQueueIO 1000
inDroppped <- newTVarIO 0
decl <- newTVarIO 0
dbe <- newTVarIO Nothing
pure $ MailboxProtoWorker sto inQ inDroppped dbe
pure $ MailboxProtoWorker sto inQ inDroppped decl dbe
mailboxProtoWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
@ -113,7 +121,7 @@ mailboxProtoWorker :: forall e s m . ( MonadIO m
-> MailboxProtoWorker s e
-> m ()
mailboxProtoWorker readConf me = do
mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
pause @'Seconds 10
@ -123,7 +131,7 @@ mailboxProtoWorker readConf me = do
pipe <- ContT $ withAsync (runPipe dbe)
inq <- ContT $ withAsync mailboxInQ
inq <- ContT $ withAsync (mailboxInQ dbe me)
bs <- ContT $ withAsync do
@ -131,7 +139,7 @@ mailboxProtoWorker readConf me = do
pause @'Seconds 10
debug $ "I'm" <+> yellow "mailboxProtoWorker"
void $ waitAnyCancel [bs,pipe]
void $ waitAnyCancel [bs,pipe,inq]
`catch` \( e :: MailboxProtoException ) -> do
err $ "mailbox protocol worker terminated" <+> viaShow e
@ -140,11 +148,12 @@ mailboxProtoWorker readConf me = do
warn $ yellow "mailbox protocol worker exited"
where
mailboxInQ = do
mailboxInQ dbe MailboxProtoWorker{..} = do
forever do
pause @'Seconds 10
debug "mailbox check inQ"
mess <- atomically $ STM.flushTBQueue inMessageQueue
for_ mess $ \(m,s) -> do
debug "received message"
mailboxStateEvolve :: forall e s m . ( MonadIO m
, MonadUnliftIO m

View File

@ -3,6 +3,7 @@
module HBS2.Peer.Proto.Mailbox
( module HBS2.Peer.Proto.Mailbox
, module HBS2.Peer.Proto.Mailbox.Message
, module HBS2.Peer.Proto.Mailbox.Types
) where
import HBS2.Prelude.Plated