From 04c26a0f5a8155a6452e181a51a028d66c32e8e1 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 9 Oct 2024 08:12:52 +0300 Subject: [PATCH] mailbox db boilerplate --- hbs2-peer/app/MailboxProtoWorker.hs | 108 ++++++++++++++++++++--- hbs2-peer/app/PeerMain.hs | 12 ++- hbs2-peer/hbs2-peer.cabal | 2 +- hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs | 1 + 4 files changed, 107 insertions(+), 16 deletions(-) diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 5e13ab15..f42aebe8 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -3,9 +3,12 @@ module MailboxProtoWorker ( mailboxProtoWorker , createMailboxProtoWorker , MailboxProtoWorker , IsMailboxProtoAdapter + , MailboxProtoException(..) + , hbs2MailboxDirOpt ) where import HBS2.Prelude.Plated +import HBS2.OrDie import HBS2.Actors.Peer import HBS2.Data.Types.Refs import HBS2.Net.Proto @@ -17,16 +20,31 @@ import HBS2.Peer.Proto import HBS2.Peer.Proto.Mailbox import HBS2.Net.Auth.Credentials +import HBS2.System.Dir import HBS2.Misc.PrettyStuff import Brains import PeerConfig import PeerTypes -import Control.Monad +import DBPipe.SQLite + +import Control.Monad.Trans.Cont import UnliftIO -- import Control.Concurrent.STM.TBQueue import Lens.Micro.Platform +import Text.InterpolatedString.Perl6 (qc) + +data MailboxProtoException = + MailboxProtoWorkerTerminatedException + | MailboxProtoCantAccessMailboxes FilePath + | MailboxProtoMailboxDirNotSet + deriving stock (Show,Typeable) + +instance Exception MailboxProtoException + +hbs2MailboxDirOpt :: String +hbs2MailboxDirOpt = "hbs2:mailbox:dir" {- HLINT ignore "Functor law" -} @@ -35,6 +53,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e = { mpwStorage :: AnyStorage , inMessageQueue :: TBQueue (Message s, MessageContent s) , inMessageQueueDropped :: TVar Int + , mailboxDB :: TVar (Maybe DBPipeEnv) } instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where @@ -48,13 +67,16 @@ instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) whe else do writeTBQueue inMessageQueue (m,c) -createMailboxProtoWorker :: forall e m . MonadIO m => AnyStorage -> m (MailboxProtoWorker (Encryption e) e) +createMailboxProtoWorker :: forall e m . MonadIO m + => AnyStorage + -> m (MailboxProtoWorker (Encryption e) e) createMailboxProtoWorker sto = do -- FIXME: queue-size-hardcode -- $class: hardcode inQ <- newTBQueueIO 1000 inDroppped <- newTVarIO 0 - pure $ MailboxProtoWorker sto inQ inDroppped + dbe <- newTVarIO Nothing + pure $ MailboxProtoWorker sto inQ inDroppped dbe mailboxProtoWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m @@ -66,20 +88,78 @@ mailboxProtoWorker :: forall e s m . ( MonadIO m , s ~ Encryption e , IsRefPubKey s ) - => MailboxProtoWorker s e + => m [Syntax C] + -> MailboxProtoWorker s e -> m () -mailboxProtoWorker me = do - forever do - pause @'Seconds 10 - debug $ "I'm" <+> yellow "mailboxProtoWorker" +mailboxProtoWorker readConf me = do --- let listRefs = listPolledRefs @e brains (Just "lwwref") --- <&> fmap (\(a,_,b) -> (a,b)) --- <&> fmap (over _2 ( (*60) . fromIntegral) ) + pause @'Seconds 10 --- polling (Polling 5 5) listRefs $ \ref -> do --- debug $ yellow "POLLING LWWREF" <+> pretty (AsBase58 ref) --- gossip (LWWRefProto1 @e (LWWProtoGet (LWWRefKey ref))) + flip runContT pure do + + dbe <- lift $ mailboxStateEvolve readConf me + + pipe <- ContT $ withAsync (runPipe dbe) + + inq <- ContT $ withAsync mailboxInQ + + bs <- ContT $ withAsync do + + forever do + pause @'Seconds 10 + debug $ "I'm" <+> yellow "mailboxProtoWorker" + + void $ waitAnyCancel [bs,pipe] + + `catch` \( e :: MailboxProtoException ) -> do + err $ "mailbox protocol worker terminated" <+> viaShow e + + `finally` do + warn $ yellow "mailbox protocol worker exited" + + where + mailboxInQ = do + forever do + pause @'Seconds 10 + debug "mailbox check inQ" +mailboxStateEvolve :: forall e s m . ( MonadIO m + , MonadUnliftIO m + , HasStorage m + , s ~ Encryption e + ) + => m [Syntax C] + -> MailboxProtoWorker s e -> m DBPipeEnv + +mailboxStateEvolve readConf MailboxProtoWorker{..} = do + + conf <- readConf + + debug $ red "mailboxStateEvolve" <> line <> pretty conf + + mailboxDir <- lastMay [ dir + | ListVal [StringLike o, StringLike dir] <- conf + , o == hbs2MailboxDirOpt + ] + & orThrow MailboxProtoMailboxDirNotSet + + r <- try @_ @SomeException (mkdir mailboxDir) + + either (const $ throwIO (MailboxProtoCantAccessMailboxes mailboxDir)) dontHandle r + + dbe <- newDBPipeEnv dbPipeOptsDef (mailboxDir "state.db") + + atomically $ writeTVar mailboxDB (Just dbe) + + withDB dbe do + ddl [qc|create table if not exists + mailbox ( recipient text not null + , type text not null + , primary key (recipient) + ) + |] + + pure dbe + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index ef69b51c..4e874521 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -11,6 +11,7 @@ import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Merkle import HBS2.Defaults +import HBS2.System.Dir (takeDirectory,()) import HBS2.Events import HBS2.Hash import HBS2.Data.Types.Refs @@ -84,6 +85,8 @@ import HBS2.Peer.Proto.LWWRef.Internal import RPC2(RPC2Context(..)) +import Data.Config.Suckless.Script hiding (optional) + import Codec.Serialise as Serialise import Control.Concurrent (myThreadId) import Control.Concurrent.STM @@ -96,6 +99,7 @@ import Data.Aeson qualified as Aeson import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.Cache qualified as Cache +import Data.Coerce import Data.Fixed import Data.List qualified as L import Data.Map (Map) @@ -1106,8 +1110,14 @@ runPeer opts = Exception.handle (\e -> myException e peerThread "lwwRefWorker" (lwwRefWorker @e conf (SomeBrains brains)) + -- setup mailboxes stuff mbw <- createMailboxProtoWorker @e (AnyStorage s) - peerThread "mailboxProtoWorker" (mailboxProtoWorker mbw) + let defConf = coerce conf + let mboxConf = maybe1 pref defConf $ \p -> do + let mboxDir = takeDirectory (coerce p) "hbs2-mailbox" + mkList [mkSym hbs2MailboxDirOpt, mkStr mboxDir] : coerce defConf + + peerThread "mailboxProtoWorker" (mailboxProtoWorker (pure mboxConf) mbw) liftIO $ withPeerM penv do runProto @e diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 089b7e1d..fbb9bd04 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -18,7 +18,7 @@ common warnings common common-deps build-depends: - base, hbs2-core, hbs2-storage-simple + base, hbs2-core, hbs2-storage-simple, db-pipe , aeson , async , bytestring diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs index 241ad00d..c34c560d 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs @@ -73,6 +73,7 @@ instance ForMailbox s => Serialise (MailBoxProtoMessage s e) instance ForMailbox s => Serialise (MailBoxProto s e) class IsMailboxProtoAdapter s a where + mailboxGetStorage :: forall m . MonadIO m => a -> m AnyStorage mailboxAcceptMessage :: forall m . (ForMailbox s, MonadIO m)