This commit is contained in:
voidlizard 2024-10-13 07:51:29 +03:00
parent 523632da9a
commit 4ed3716d9d
5 changed files with 168 additions and 27 deletions

View File

@ -154,6 +154,16 @@ runMailboxCLI rpc s = do
_ -> throwIO $ BadFormException @C nil _ -> throwIO $ BadFormException @C nil
brief "fetch mailbox"
$ entry $ bindMatch "fetch" $ nil_ $ \case
[ SignPubKeyLike m ] -> do
callRpcWaitMay @RpcMailboxFetch t api m
>>= orThrowUser "rpc call timeout"
>>= orThrowPassIO
_ -> throwIO $ BadFormException @C nil
brief "set mailbox policy" $ brief "set mailbox policy" $
desc setPolicyDesc desc setPolicyDesc
-- $ examples setPolicyExamples -- $ examples setPolicyExamples

View File

@ -51,6 +51,7 @@ import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet) import Data.HashSet (HashSet)
import Data.HashSet qualified as HS import Data.HashSet qualified as HS
import Data.Maybe import Data.Maybe
import Data.Word
import Codec.Serialise import Codec.Serialise
import Lens.Micro.Platform import Lens.Micro.Platform
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
@ -78,14 +79,31 @@ hbs2MailboxDirOpt = "hbs2:mailbox:dir"
{- HLINT ignore "Functor law" -} {- HLINT ignore "Functor law" -}
data PolicyDownload s =
PolicyDownload
{ policyDownloadWhen :: Word64
, policyDownloadWhat :: SetPolicyPayload s
}
deriving stock Generic
data MailboxDowload =
MailboxDowload
{ mailboxDownWhen :: Word64
, mailboxDownPolicy :: Maybe PolicyVersion
}
deriving stock Generic
data MailboxProtoWorker (s :: CryptoScheme) e = data MailboxProtoWorker (s :: CryptoScheme) e =
MailboxProtoWorker MailboxProtoWorker
{ mpwPeerEnv :: PeerEnv e { mpwPeerEnv :: PeerEnv e
, mpwDownloadEnv :: DownloadEnv e , mpwDownloadEnv :: DownloadEnv e
, mpwStorage :: AnyStorage , mpwStorage :: AnyStorage
, mpwCredentials :: PeerCredentials s , mpwCredentials :: PeerCredentials s
, mpwFetchQ :: TVar (HashSet (MailboxRefKey s))
, inMessageQueue :: TBQueue (Message s, MessageContent s) , inMessageQueue :: TBQueue (Message s, MessageContent s)
, inMessageMergeQueue :: TVar (HashMap (MailboxRefKey s) (HashSet HashRef)) , inMessageMergeQueue :: TVar (HashMap (MailboxRefKey s) (HashSet HashRef))
, inPolicyDownloadQ :: TVar (HashMap HashRef (PolicyDownload s))
, inMailboxDownloadQ :: TVar (HashMap HashRef MailboxDowload)
, inMessageQueueInNum :: TVar Int , inMessageQueueInNum :: TVar Int
, inMessageQueueOutNum :: TVar Int , inMessageQueueOutNum :: TVar Int
, inMessageQueueDropped :: TVar Int , inMessageQueueDropped :: TVar Int
@ -93,6 +111,9 @@ data MailboxProtoWorker (s :: CryptoScheme) e =
, mailboxDB :: TVar (Maybe DBPipeEnv) , mailboxDB :: TVar (Maybe DBPipeEnv)
} }
okay :: Monad m => good -> m (Either bad good)
okay good = pure (Right good)
instance (s ~ HBS2Basic, e ~ L4Proto, s ~ Encryption e) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where instance (s ~ HBS2Basic, e ~ L4Proto, s ~ Encryption e) => IsMailboxProtoAdapter s (MailboxProtoWorker s e) where
mailboxGetCredentials = pure . mpwCredentials mailboxGetCredentials = pure . mpwCredentials
@ -173,6 +194,8 @@ instance ( s ~ Encryption e, e ~ L4Proto
on conflict (mailbox) do update set hash = excluded.hash on conflict (mailbox) do update set hash = excluded.hash
|] (MailboxRefKey @s who, PolicyHash what) |] (MailboxRefKey @s who, PolicyHash what)
-- TODO: ASAP-gossip-new-state
pure what pure what
mailboxDelete MailboxProtoWorker{..} mbox = do mailboxDelete MailboxProtoWorker{..} mbox = do
@ -217,24 +240,50 @@ instance ( s ~ Encryption e, e ~ L4Proto
debug $ red "mailboxListBasic" debug $ red "mailboxListBasic"
r <- withDB dbe do r <- listMailboxes dbe
select_ @_ @(MailboxRefKey s, MailboxType) [qc|select recipient,type from mailbox|]
pure $ Right r pure $ Right r
mailboxAcceptStatus MailboxProtoWorker{..} ref who MailBoxStatusPayload{..} = do mailboxAcceptStatus me@MailboxProtoWorker{..} ref who MailBoxStatusPayload{..} = do
-- TODO: implement-policy-first -- TODO: implement-policy-first
-- итак, мы не можем двигаться, пока не будет реализована policy. -- итак, мы не можем двигаться, пока не будет реализована policy.
flip runContT pure $ callCC \_ -> do flip runContT pure $ callCC \stop -> do
now <- liftIO $ getPOSIXTime <&> round
mdbe <- readTVarIO mailboxDB mdbe <- readTVarIO mailboxDB
dbe <- ContT $ maybe1 mdbe (pure $ Left (MailboxCreateFailed "database not ready")) dbe <- ContT $ maybe1 mdbe (pure $ Left (MailboxCreateFailed "database not ready"))
p0 <- loadPolicyPayloadFor dbe mpwStorage ref <&> fmap snd p0 <- loadPolicyPayloadFor dbe mpwStorage ref
<&> fmap (sppPolicyVersion . snd) . ((unboxSignedBox0 . snd) =<<)
<&> fromMaybe 0
let p = unboxSignedBox0 =<< mbsMailboxPolicy let bogusPolicyMessage =
err $ red "!!! arrived invalid policy signature for"
<+> pretty ref
<+> "from"
<+> pretty (AsBase58 who)
-- TODO: handle-invalid-policy-error
-- not "okay" actually
(rcptKey, pNew) <- ContT $ maybe1 (mbsMailboxPolicy >>= unboxSignedBox0)
(bogusPolicyMessage >> okay ())
when (coerce rcptKey /= ref) $ lift bogusPolicyMessage >> stop (Right ())
when (sppPolicyVersion pNew > p0) do
startDownloadStuff me (sppPolicyRef pNew)
atomically $ modifyTVar inPolicyDownloadQ (HM.insert (sppPolicyRef pNew) (PolicyDownload now pNew))
let v = Just $ max p0 (sppPolicyVersion pNew)
maybe1 mbsMailboxHash (okay ()) $ \h -> do
startDownloadStuff me h
atomically $ modifyTVar inMailboxDownloadQ (HM.insert h (MailboxDowload now v))
okay ()
-- если версия p > версии p0 -- ставим скачиваться, по скачиванию -- обновляем -- если версия p > версии p0 -- ставим скачиваться, по скачиванию -- обновляем
-- тут есть какой-то процесс, который должен поллить скачивания, не забываем, -- тут есть какой-то процесс, который должен поллить скачивания, не забываем,
@ -256,9 +305,7 @@ instance ( s ~ Encryption e, e ~ L4Proto
-- если не ок -- то но обновляем? а что тогда -- если не ок -- то но обновляем? а что тогда
-- --
pure $ Right () okay ()
pure $ Right ()
mailboxGetStatus MailboxProtoWorker{..} ref = do mailboxGetStatus MailboxProtoWorker{..} ref = do
-- TODO: support-policy-ASAP -- TODO: support-policy-ASAP
@ -282,6 +329,27 @@ instance ( s ~ Encryption e, e ~ L4Proto
pure $ Right $ Just $ MailBoxStatusPayload @s now (coerce ref) t v spp pure $ Right $ Just $ MailBoxStatusPayload @s now (coerce ref) t v spp
mailboxFetch MailboxProtoWorker{..} ref = do
debug $ red "mailboxFetch" <+> pretty ref
atomically (modifyTVar mpwFetchQ (HS.insert ref))
okay ()
startDownloadStuff :: forall s e m . (ForMailbox s, s ~ Encryption e, MyPeer e, MonadIO m)
=> MailboxProtoWorker s e
-> HashRef
-> m ()
startDownloadStuff MailboxProtoWorker{..} href = do
liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv
$ addDownload @e Nothing (coerce href)
listMailboxes :: forall s m . (ForMailbox s, MonadIO m)
=> DBPipeEnv
-> m [(MailboxRefKey s, MailboxType)]
listMailboxes dbe = do
withDB dbe do
select_ [qc|select recipient,type from mailbox|]
loadPolicyPayloadFor :: forall s m . (ForMailbox s, MonadIO m) loadPolicyPayloadFor :: forall s m . (ForMailbox s, MonadIO m)
=> DBPipeEnv => DBPipeEnv
-> AnyStorage -> AnyStorage
@ -310,7 +378,10 @@ getMailboxType_ d r = do
<&> fmap (fromStringMay @MailboxType . fromOnly) <&> fmap (fromStringMay @MailboxType . fromOnly)
<&> headMay . catMaybes <&> headMay . catMaybes
createMailboxProtoWorker :: forall s e m . (MonadIO m, s ~ Encryption e, ForMailbox s) createMailboxProtoWorker :: forall s e m . ( MonadIO m
, s ~ Encryption e
, ForMailbox s
)
=> PeerCredentials s => PeerCredentials s
-> PeerEnv e -> PeerEnv e
-> DownloadEnv e -> DownloadEnv e
@ -319,14 +390,17 @@ createMailboxProtoWorker :: forall s e m . (MonadIO m, s ~ Encryption e, ForMail
createMailboxProtoWorker pc pe de sto = do createMailboxProtoWorker pc pe de sto = do
-- FIXME: queue-size-hardcode -- FIXME: queue-size-hardcode
-- $class: hardcode -- $class: hardcode
inQ <- newTBQueueIO 8000 MailboxProtoWorker pe de sto pc
mergeQ <- newTVarIO mempty <$> newTVarIO mempty
inDroppped <- newTVarIO 0 <*> newTBQueueIO 8000
inNum <- newTVarIO 0 <*> newTVarIO mempty
outNum <- newTVarIO 0 <*> newTVarIO mempty
decl <- newTVarIO 0 <*> newTVarIO mempty
dbe <- newTVarIO Nothing <*> newTVarIO 0
pure $ MailboxProtoWorker pe de sto pc inQ mergeQ inNum outNum inDroppped decl dbe <*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO Nothing
mailboxProtoWorker :: forall e s m . ( MonadIO m mailboxProtoWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m , MonadUnliftIO m
@ -352,19 +426,27 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
dbe <- lift $ mailboxStateEvolve readConf me dbe <- lift $ mailboxStateEvolve readConf me
pipe <- ContT $ withAsync (runPipe dbe) dpipe <- ContT $ withAsync (runPipe dbe)
inq <- ContT $ withAsync (mailboxInQ dbe) inq <- ContT $ withAsync (mailboxInQ dbe)
mergeQ <- ContT $ withAsync mailboxMergeQ mergeQ <- ContT $ withAsync mailboxMergeQ
mCheckQ <- ContT $ withAsync (mailboxCheckQ dbe)
mFetchQ <- ContT $ withAsync (mailboxFetchQ dbe)
pDownQ <- ContT $ withAsync policyDownloadQ
sDownQ <- ContT $ withAsync stateDownloadQ
bs <- ContT $ withAsync do bs <- ContT $ withAsync do
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
debug $ "I'm" <+> yellow "mailboxProtoWorker" debug $ "I'm" <+> yellow "mailboxProtoWorker"
void $ waitAnyCancel [bs,pipe,inq,mergeQ] void $ waitAnyCancel [bs,dpipe,inq,mergeQ,pDownQ,sDownQ,mCheckQ,mFetchQ]
`catch` \( e :: MailboxProtoException ) -> do `catch` \( e :: MailboxProtoException ) -> do
err $ red "mailbox protocol worker terminated" <+> viaShow e err $ red "mailbox protocol worker terminated" <+> viaShow e
@ -412,12 +494,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
-- TODO: check-attachment-policy-for-mailbox -- TODO: check-attachment-policy-for-mailbox
-- TODO: ASAP-block-accounting-for-attachment -- TODO: ASAP-block-accounting-for-attachment
for_ (messageParts s) $ \part -> do for_ (messageParts s) (startDownloadStuff me)
liftIO $ withPeerM mpwPeerEnv $ withDownload mpwDownloadEnv
$ addDownload @e Nothing (fromHashRef part)
pure ()
-- read current mailbox -- read current mailbox
-- merge messages into -- merge messages into
@ -430,7 +507,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
let mboxes = readTVarIO inMessageMergeQueue let mboxes = readTVarIO inMessageMergeQueue
<&> fmap (,2) . HM.keys . HM.filter ( not . HS.null ) <&> fmap (,2) . HM.keys . HM.filter ( not . HS.null )
polling (Polling 2 2) mboxes $ \r -> void $ runMaybeT do polling (Polling 2 5) mboxes $ \r -> void $ runMaybeT do
debug $ yellow "mailbox: merge-poll" <+> pretty r debug $ yellow "mailbox: merge-poll" <+> pretty r
-- NOTE: reliability -- NOTE: reliability
@ -458,6 +535,42 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
updateRef sto r nref updateRef sto r nref
debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref
policyDownloadQ = do
forever do
pause @'Seconds 10
debug $ red "mailbox: policyDownloadQ"
stateDownloadQ = do
forever do
pause @'Seconds 10
debug $ red "mailbox: stateDownloadQ"
mailboxFetchQ dbe = forever do
toFetch <- atomically $ do
q <- readTVar mpwFetchQ
when (HS.null q) STM.retry
writeTVar mpwFetchQ mempty
pure q
for_ toFetch $ \r -> do
t <- getMailboxType_ dbe r
maybe1 t none $ \_ -> do
debug $ yellow "mailbox: SEND FETCH REQUEST FOR" <+> pretty r
now <- liftIO (getPOSIXTime <&> round)
gossip (MailBoxProtoV1 @s @e (CheckMailbox (Just now) (coerce r)))
mailboxCheckQ dbe = do
-- FIXME: mailbox-check-period
-- right now it's 10 seconds for debug purposes
-- remove hardcode to smth reasonable
let mboxes = liftIO (listMailboxes @s dbe <&> fmap (set _2 10) )
polling (Polling 10 10) mboxes $ \r -> do
debug $ yellow "mailbox: SEND FETCH REQUEST FOR" <+> pretty r
now <- liftIO (getPOSIXTime <&> round)
gossip (MailBoxProtoV1 @s @e (CheckMailbox (Just now) (coerce r)))
mailboxStateEvolve :: forall e s m . ( MonadIO m mailboxStateEvolve :: forall e s m . ( MonadIO m
, MonadUnliftIO m , MonadUnliftIO m
, HasStorage m , HasStorage m

View File

@ -61,6 +61,13 @@ instance (ForMailboxRPC m) => HandleMethod m RpcMailboxGetStatus where
debug $ "rpc.RpcMailboxGetStatus" <+> pretty (AsBase58 puk) debug $ "rpc.RpcMailboxGetStatus" <+> pretty (AsBase58 puk)
mailboxGetStatus @HBS2Basic mbs (MailboxRefKey puk) mailboxGetStatus @HBS2Basic mbs (MailboxRefKey puk)
instance (ForMailboxRPC m) => HandleMethod m RpcMailboxFetch where
handleMethod puk = do
AnyMailboxService mbs <- getRpcContext @MailboxAPI @RPC2Context <&> rpcMailboxService
debug $ "rpc.RpcMailboxFetch" <+> pretty (AsBase58 puk)
mailboxFetch @HBS2Basic mbs (MailboxRefKey puk)
instance (ForMailboxRPC m) => HandleMethod m RpcMailboxDelete where instance (ForMailboxRPC m) => HandleMethod m RpcMailboxDelete where
handleMethod puk = do handleMethod puk = do

View File

@ -135,6 +135,11 @@ class ForMailbox s => IsMailboxService s a where
-> MailBoxStatusPayload s -> MailBoxStatusPayload s
-> m (Either MailboxServiceError ()) -> m (Either MailboxServiceError ())
mailboxFetch :: forall m . MonadIO m
=> a
-> MailboxRefKey s
-> m (Either MailboxServiceError ())
data AnyMailboxService s = data AnyMailboxService s =
forall a . (IsMailboxService s a) => AnyMailboxService { mailboxService :: a } forall a . (IsMailboxService s a) => AnyMailboxService { mailboxService :: a }
@ -150,6 +155,7 @@ instance ForMailbox s => IsMailboxService s (AnyMailboxService s) where
mailboxListBasic (AnyMailboxService a) = mailboxListBasic @s a mailboxListBasic (AnyMailboxService a) = mailboxListBasic @s a
mailboxGetStatus (AnyMailboxService a) = mailboxGetStatus @s a mailboxGetStatus (AnyMailboxService a) = mailboxGetStatus @s a
mailboxAcceptStatus (AnyMailboxService a) = mailboxAcceptStatus @s a mailboxAcceptStatus (AnyMailboxService a) = mailboxAcceptStatus @s a
mailboxFetch (AnyMailboxService a) = mailboxFetch @s a
instance IsMailboxProtoAdapter s (AnyMailboxAdapter s) where instance IsMailboxProtoAdapter s (AnyMailboxAdapter s) where
mailboxGetCredentials (AnyMailboxAdapter a) = mailboxGetCredentials @s a mailboxGetCredentials (AnyMailboxAdapter a) = mailboxGetCredentials @s a

View File

@ -19,6 +19,7 @@ data RpcMailboxCreate
data RpcMailboxSetPolicy data RpcMailboxSetPolicy
data RpcMailboxDelete data RpcMailboxDelete
data RpcMailboxGetStatus data RpcMailboxGetStatus
data RpcMailboxFetch
data RpcMailboxList data RpcMailboxList
data RpcMailboxSend data RpcMailboxSend
data RpcMailboxGet data RpcMailboxGet
@ -28,6 +29,7 @@ type MailboxAPI = '[ RpcMailboxPoke
, RpcMailboxSetPolicy , RpcMailboxSetPolicy
, RpcMailboxDelete , RpcMailboxDelete
, RpcMailboxGetStatus , RpcMailboxGetStatus
, RpcMailboxFetch
, RpcMailboxList , RpcMailboxList
, RpcMailboxSend , RpcMailboxSend
, RpcMailboxGet , RpcMailboxGet
@ -57,6 +59,9 @@ type instance Output RpcMailboxDelete = ()
type instance Input RpcMailboxGetStatus = (PubKey 'Sign HBS2Basic) type instance Input RpcMailboxGetStatus = (PubKey 'Sign HBS2Basic)
type instance Output RpcMailboxGetStatus = Either MailboxServiceError (Maybe (MailBoxStatusPayload 'HBS2Basic)) type instance Output RpcMailboxGetStatus = Either MailboxServiceError (Maybe (MailBoxStatusPayload 'HBS2Basic))
type instance Input RpcMailboxFetch = (PubKey 'Sign HBS2Basic)
type instance Output RpcMailboxFetch = Either MailboxServiceError ()
type instance Input RpcMailboxList = () type instance Input RpcMailboxList = ()
type instance Output RpcMailboxList = [(MailboxRefKey 'HBS2Basic, MailboxType)] type instance Output RpcMailboxList = [(MailboxRefKey 'HBS2Basic, MailboxType)]