From e6804415a81d9de38dfab5fa1fdece871f3a40c0 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 13 Oct 2024 08:46:23 +0300 Subject: [PATCH] wip --- hbs2-peer/app/MailboxProtoWorker.hs | 111 +++++++++++++++-------- hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs | 3 + 2 files changed, 75 insertions(+), 39 deletions(-) diff --git a/hbs2-peer/app/MailboxProtoWorker.hs b/hbs2-peer/app/MailboxProtoWorker.hs index 62b1eb99..ccacc7ad 100644 --- a/hbs2-peer/app/MailboxProtoWorker.hs +++ b/hbs2-peer/app/MailboxProtoWorker.hs @@ -50,8 +50,10 @@ import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM import Data.HashSet (HashSet) import Data.HashSet qualified as HS +import Data.List qualified as L import Data.Maybe import Data.Word +import Data.Hashable import Codec.Serialise import Lens.Micro.Platform import Text.InterpolatedString.Perl6 (qc) @@ -83,8 +85,16 @@ data PolicyDownload s = PolicyDownload { policyDownloadWhen :: Word64 , policyDownloadWhat :: SetPolicyPayload s + , policyDownloadBox :: HashRef } - deriving stock Generic + deriving stock (Generic) + +instance ForMailbox s => Serialise (PolicyDownload s) + +deriving instance ForMailbox s => Eq (PolicyDownload s) + +instance ForMailbox s => Hashable (PolicyDownload s) where + hashWithSalt s p = hashWithSalt s (serialise p) data MailboxDowload = MailboxDowload @@ -267,45 +277,36 @@ instance ( s ~ Encryption e, e ~ L4Proto <+> pretty (AsBase58 who) - -- TODO: handle-invalid-policy-error - -- not "okay" actually - (rcptKey, pNew) <- ContT $ maybe1 (mbsMailboxPolicy >>= unboxSignedBox0) - (bogusPolicyMessage >> okay ()) + let downloadStatus v = do + maybe1 mbsMailboxHash (okay ()) $ \h -> do + startDownloadStuff me h + atomically $ modifyTVar inMailboxDownloadQ (HM.insert h (MailboxDowload now v)) + okay () - when (coerce rcptKey /= ref) $ lift bogusPolicyMessage >> stop (Right ()) + case mbsMailboxPolicy of + Nothing -> downloadStatus Nothing + Just newPolicy -> do - when (sppPolicyVersion pNew > p0) do - startDownloadStuff me (sppPolicyRef pNew) - atomically $ modifyTVar inPolicyDownloadQ (HM.insert (sppPolicyRef pNew) (PolicyDownload now pNew)) + -- TODO: handle-invalid-policy-error + -- not "okay" actually - let v = Just $ max p0 (sppPolicyVersion pNew) + (rcptKey, pNew) <- ContT $ maybe1 (unboxSignedBox0 newPolicy) + (bogusPolicyMessage >> okay ()) - maybe1 mbsMailboxHash (okay ()) $ \h -> do - startDownloadStuff me h - atomically $ modifyTVar inMailboxDownloadQ (HM.insert h (MailboxDowload now v)) - okay () + when (coerce rcptKey /= ref) $ lift bogusPolicyMessage >> stop (Right ()) - -- если версия p > версии p0 -- ставим скачиваться, по скачиванию -- обновляем - -- тут есть какой-то процесс, который должен поллить скачивания, не забываем, - -- что это довольно затратно (проверка всех блоков) - -- по идее и сообщения-то должны процессить уже с обновленной policy - -- но если этого ждать, то засинкаемся черти когда, однако же - -- надо их начинать качать, как можно раньше. поэтому что? - -- ставим качать policy - -- для каждого сообщения -- ставим качать его - -- наверное, запоминаем версию policy с которой можно процессировать - -- если на момент скачивания сообщения -- policy не достигнуто -- просто его запоминаем - -- если их дофига, а момент так и не наступил --- тогда что? - -- - -- наверное, запускаем циклический процесс по обновлению **этого** статуса. - -- сначала качаем policy. - -- - -- как скачали -- обновляем, если ок - -- - -- если не ок -- то но обновляем? а что тогда - -- + when (sppPolicyVersion pNew > p0) do + startDownloadStuff me (sppPolicyRef pNew) - okay () + mph <- putBlock mpwStorage (serialise newPolicy) + + for_ mph $ \ph -> do + let insActually = HM.insert (sppPolicyRef pNew) (PolicyDownload now pNew (HashRef ph)) + atomically $ modifyTVar inPolicyDownloadQ insActually + + let v = Just $ max p0 (sppPolicyVersion pNew) + + downloadStatus v mailboxGetStatus MailboxProtoWorker{..} ref = do -- TODO: support-policy-ASAP @@ -370,6 +371,18 @@ loadPolicyPayloadFor dbe sto who = do pure (ha, what) +loadPolicyPayloadUnboxed :: forall s m . (ForMailbox s, MonadIO m) + => DBPipeEnv + -> AnyStorage + -> MailboxRefKey s + -> m (Maybe (SetPolicyPayload s)) +loadPolicyPayloadUnboxed dbe sto mbox = do + loadPolicyPayloadFor dbe sto mbox + <&> fmap snd + <&> fmap unboxSignedBox0 + <&> join + <&> fmap snd + getMailboxType_ :: (ForMailbox s, MonadIO m) => DBPipeEnv -> MailboxRefKey s -> m (Maybe MailboxType) getMailboxType_ d r = do let sql = [qc|select type from mailbox where recipient = ? limit 1|] @@ -436,7 +449,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do mFetchQ <- ContT $ withAsync (mailboxFetchQ dbe) - pDownQ <- ContT $ withAsync policyDownloadQ + pDownQ <- ContT $ withAsync (policyDownloadQ dbe) sDownQ <- ContT $ withAsync stateDownloadQ @@ -535,10 +548,30 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do updateRef sto r nref debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref - policyDownloadQ = do - forever do - pause @'Seconds 10 - debug $ red "mailbox: policyDownloadQ" + policyDownloadQ dbe = do + + -- FIXME: too-often-checks-affect-performance + -- $class: performance + let policies = readTVarIO inPolicyDownloadQ + <&> HM.toList + <&> fmap (,10) + + polling (Polling 30 30) policies $ \(pk,PolicyDownload{..}) -> do + missed <- findMissedBlocks mpwStorage pk <&> L.null + unless missed do + let mbox = MailboxRefKey (sppMailboxKey policyDownloadWhat) + + current <- loadPolicyPayloadUnboxed @s dbe mpwStorage mbox + <&> fmap sppPolicyVersion + <&> fromMaybe 0 + + let downloaded = sppPolicyVersion policyDownloadWhat + + when (downloaded > current) do + -- set policy + pure () + + atomically $ modifyTVar inPolicyDownloadQ (HM.delete pk) stateDownloadQ = do forever do diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs index ed5719c0..95575ba7 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/Mailbox.hs @@ -41,6 +41,9 @@ data SetPolicyPayload s = } deriving stock (Generic) +-- for Hashable +deriving instance ForMailbox s => Eq (SetPolicyPayload s) + data MailBoxStatusPayload s = MailBoxStatusPayload { mbsMailboxPayloadNonce :: Word64