mailbox db boilerplate

This commit is contained in:
voidlizard 2024-10-09 08:12:52 +03:00
parent c31c2c04c6
commit 04c26a0f5a
4 changed files with 107 additions and 16 deletions

View File

@ -3,9 +3,12 @@ module MailboxProtoWorker ( mailboxProtoWorker
, createMailboxProtoWorker , createMailboxProtoWorker
, MailboxProtoWorker , MailboxProtoWorker
, IsMailboxProtoAdapter , IsMailboxProtoAdapter
, MailboxProtoException(..)
, hbs2MailboxDirOpt
) where ) where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Net.Proto import HBS2.Net.Proto
@ -17,16 +20,31 @@ import HBS2.Peer.Proto
import HBS2.Peer.Proto.Mailbox import HBS2.Peer.Proto.Mailbox
import HBS2.Net.Auth.Credentials import HBS2.Net.Auth.Credentials
import HBS2.System.Dir
import HBS2.Misc.PrettyStuff import HBS2.Misc.PrettyStuff
import Brains import Brains
import PeerConfig import PeerConfig
import PeerTypes import PeerTypes
import Control.Monad import DBPipe.SQLite
import Control.Monad.Trans.Cont
import UnliftIO import UnliftIO
-- import Control.Concurrent.STM.TBQueue -- import Control.Concurrent.STM.TBQueue
import Lens.Micro.Platform import Lens.Micro.Platform
import Text.InterpolatedString.Perl6 (qc)
data MailboxProtoException =
MailboxProtoWorkerTerminatedException
| MailboxProtoCantAccessMailboxes FilePath
| MailboxProtoMailboxDirNotSet
deriving stock (Show,Typeable)
instance Exception MailboxProtoException
hbs2MailboxDirOpt :: String
hbs2MailboxDirOpt = "hbs2:mailbox:dir"
{- HLINT ignore "Functor law" -} {- HLINT ignore "Functor law" -}
@ -35,6 +53,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e =
{ mpwStorage :: AnyStorage { mpwStorage :: AnyStorage
, inMessageQueue :: TBQueue (Message s, MessageContent s) , inMessageQueue :: TBQueue (Message s, MessageContent s)
, inMessageQueueDropped :: TVar Int , inMessageQueueDropped :: TVar Int
, mailboxDB :: TVar (Maybe DBPipeEnv)
} }
instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where
@ -48,13 +67,16 @@ instance (s ~ HBS2Basic) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) whe
else do else do
writeTBQueue inMessageQueue (m,c) writeTBQueue inMessageQueue (m,c)
createMailboxProtoWorker :: forall e m . MonadIO m => AnyStorage -> m (MailboxProtoWorker (Encryption e) e) createMailboxProtoWorker :: forall e m . MonadIO m
=> AnyStorage
-> m (MailboxProtoWorker (Encryption e) e)
createMailboxProtoWorker sto = do createMailboxProtoWorker sto = do
-- FIXME: queue-size-hardcode -- FIXME: queue-size-hardcode
-- $class: hardcode -- $class: hardcode
inQ <- newTBQueueIO 1000 inQ <- newTBQueueIO 1000
inDroppped <- newTVarIO 0 inDroppped <- newTVarIO 0
pure $ MailboxProtoWorker sto inQ inDroppped dbe <- newTVarIO Nothing
pure $ MailboxProtoWorker sto inQ inDroppped dbe
mailboxProtoWorker :: forall e s m . ( MonadIO m mailboxProtoWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m , MonadUnliftIO m
@ -66,20 +88,78 @@ mailboxProtoWorker :: forall e s m . ( MonadIO m
, s ~ Encryption e , s ~ Encryption e
, IsRefPubKey s , IsRefPubKey s
) )
=> MailboxProtoWorker s e => m [Syntax C]
-> MailboxProtoWorker s e
-> m () -> m ()
mailboxProtoWorker me = do mailboxProtoWorker readConf me = do
pause @'Seconds 10
flip runContT pure do
dbe <- lift $ mailboxStateEvolve readConf me
pipe <- ContT $ withAsync (runPipe dbe)
inq <- ContT $ withAsync mailboxInQ
bs <- ContT $ withAsync do
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
debug $ "I'm" <+> yellow "mailboxProtoWorker" debug $ "I'm" <+> yellow "mailboxProtoWorker"
-- let listRefs = listPolledRefs @e brains (Just "lwwref") void $ waitAnyCancel [bs,pipe]
-- <&> fmap (\(a,_,b) -> (a,b))
-- <&> fmap (over _2 ( (*60) . fromIntegral) )
-- polling (Polling 5 5) listRefs $ \ref -> do `catch` \( e :: MailboxProtoException ) -> do
-- debug $ yellow "POLLING LWWREF" <+> pretty (AsBase58 ref) err $ "mailbox protocol worker terminated" <+> viaShow e
-- gossip (LWWRefProto1 @e (LWWProtoGet (LWWRefKey ref)))
`finally` do
warn $ yellow "mailbox protocol worker exited"
where
mailboxInQ = do
forever do
pause @'Seconds 10
debug "mailbox check inQ"
mailboxStateEvolve :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, HasStorage m
, s ~ Encryption e
)
=> m [Syntax C]
-> MailboxProtoWorker s e -> m DBPipeEnv
mailboxStateEvolve readConf MailboxProtoWorker{..} = do
conf <- readConf
debug $ red "mailboxStateEvolve" <> line <> pretty conf
mailboxDir <- lastMay [ dir
| ListVal [StringLike o, StringLike dir] <- conf
, o == hbs2MailboxDirOpt
]
& orThrow MailboxProtoMailboxDirNotSet
r <- try @_ @SomeException (mkdir mailboxDir)
either (const $ throwIO (MailboxProtoCantAccessMailboxes mailboxDir)) dontHandle r
dbe <- newDBPipeEnv dbPipeOptsDef (mailboxDir </> "state.db")
atomically $ writeTVar mailboxDB (Just dbe)
withDB dbe do
ddl [qc|create table if not exists
mailbox ( recipient text not null
, type text not null
, primary key (recipient)
)
|]
pure dbe

View File

@ -11,6 +11,7 @@ import HBS2.Actors.Peer
import HBS2.Base58 import HBS2.Base58
import HBS2.Merkle import HBS2.Merkle
import HBS2.Defaults import HBS2.Defaults
import HBS2.System.Dir (takeDirectory,(</>))
import HBS2.Events import HBS2.Events
import HBS2.Hash import HBS2.Hash
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
@ -84,6 +85,8 @@ import HBS2.Peer.Proto.LWWRef.Internal
import RPC2(RPC2Context(..)) import RPC2(RPC2Context(..))
import Data.Config.Suckless.Script hiding (optional)
import Codec.Serialise as Serialise import Codec.Serialise as Serialise
import Control.Concurrent (myThreadId) import Control.Concurrent (myThreadId)
import Control.Concurrent.STM import Control.Concurrent.STM
@ -96,6 +99,7 @@ import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS import Data.ByteString qualified as BS
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
import Data.Coerce
import Data.Fixed import Data.Fixed
import Data.List qualified as L import Data.List qualified as L
import Data.Map (Map) import Data.Map (Map)
@ -1106,8 +1110,14 @@ runPeer opts = Exception.handle (\e -> myException e
peerThread "lwwRefWorker" (lwwRefWorker @e conf (SomeBrains brains)) peerThread "lwwRefWorker" (lwwRefWorker @e conf (SomeBrains brains))
-- setup mailboxes stuff
mbw <- createMailboxProtoWorker @e (AnyStorage s) mbw <- createMailboxProtoWorker @e (AnyStorage s)
peerThread "mailboxProtoWorker" (mailboxProtoWorker mbw) let defConf = coerce conf
let mboxConf = maybe1 pref defConf $ \p -> do
let mboxDir = takeDirectory (coerce p) </> "hbs2-mailbox"
mkList [mkSym hbs2MailboxDirOpt, mkStr mboxDir] : coerce defConf
peerThread "mailboxProtoWorker" (mailboxProtoWorker (pure mboxConf) mbw)
liftIO $ withPeerM penv do liftIO $ withPeerM penv do
runProto @e runProto @e

View File

@ -18,7 +18,7 @@ common warnings
common common-deps common common-deps
build-depends: build-depends:
base, hbs2-core, hbs2-storage-simple base, hbs2-core, hbs2-storage-simple, db-pipe
, aeson , aeson
, async , async
, bytestring , bytestring

View File

@ -73,6 +73,7 @@ instance ForMailbox s => Serialise (MailBoxProtoMessage s e)
instance ForMailbox s => Serialise (MailBoxProto s e) instance ForMailbox s => Serialise (MailBoxProto s e)
class IsMailboxProtoAdapter s a where class IsMailboxProtoAdapter s a where
mailboxGetStorage :: forall m . MonadIO m => a -> m AnyStorage mailboxGetStorage :: forall m . MonadIO m => a -> m AnyStorage
mailboxAcceptMessage :: forall m . (ForMailbox s, MonadIO m) mailboxAcceptMessage :: forall m . (ForMailbox s, MonadIO m)