mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
4ed3716d9d
commit
e6804415a8
|
@ -50,8 +50,10 @@ import Data.HashMap.Strict (HashMap)
|
||||||
import Data.HashMap.Strict qualified as HM
|
import Data.HashMap.Strict qualified as HM
|
||||||
import Data.HashSet (HashSet)
|
import Data.HashSet (HashSet)
|
||||||
import Data.HashSet qualified as HS
|
import Data.HashSet qualified as HS
|
||||||
|
import Data.List qualified as L
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
import Data.Word
|
import Data.Word
|
||||||
|
import Data.Hashable
|
||||||
import Codec.Serialise
|
import Codec.Serialise
|
||||||
import Lens.Micro.Platform
|
import Lens.Micro.Platform
|
||||||
import Text.InterpolatedString.Perl6 (qc)
|
import Text.InterpolatedString.Perl6 (qc)
|
||||||
|
@ -83,8 +85,16 @@ data PolicyDownload s =
|
||||||
PolicyDownload
|
PolicyDownload
|
||||||
{ policyDownloadWhen :: Word64
|
{ policyDownloadWhen :: Word64
|
||||||
, policyDownloadWhat :: SetPolicyPayload s
|
, policyDownloadWhat :: SetPolicyPayload s
|
||||||
|
, policyDownloadBox :: HashRef
|
||||||
}
|
}
|
||||||
deriving stock Generic
|
deriving stock (Generic)
|
||||||
|
|
||||||
|
instance ForMailbox s => Serialise (PolicyDownload s)
|
||||||
|
|
||||||
|
deriving instance ForMailbox s => Eq (PolicyDownload s)
|
||||||
|
|
||||||
|
instance ForMailbox s => Hashable (PolicyDownload s) where
|
||||||
|
hashWithSalt s p = hashWithSalt s (serialise p)
|
||||||
|
|
||||||
data MailboxDowload =
|
data MailboxDowload =
|
||||||
MailboxDowload
|
MailboxDowload
|
||||||
|
@ -267,45 +277,36 @@ instance ( s ~ Encryption e, e ~ L4Proto
|
||||||
<+> pretty (AsBase58 who)
|
<+> pretty (AsBase58 who)
|
||||||
|
|
||||||
|
|
||||||
|
let downloadStatus v = do
|
||||||
|
maybe1 mbsMailboxHash (okay ()) $ \h -> do
|
||||||
|
startDownloadStuff me h
|
||||||
|
atomically $ modifyTVar inMailboxDownloadQ (HM.insert h (MailboxDowload now v))
|
||||||
|
okay ()
|
||||||
|
|
||||||
|
case mbsMailboxPolicy of
|
||||||
|
Nothing -> downloadStatus Nothing
|
||||||
|
Just newPolicy -> do
|
||||||
|
|
||||||
-- TODO: handle-invalid-policy-error
|
-- TODO: handle-invalid-policy-error
|
||||||
-- not "okay" actually
|
-- not "okay" actually
|
||||||
(rcptKey, pNew) <- ContT $ maybe1 (mbsMailboxPolicy >>= unboxSignedBox0)
|
|
||||||
|
(rcptKey, pNew) <- ContT $ maybe1 (unboxSignedBox0 newPolicy)
|
||||||
(bogusPolicyMessage >> okay ())
|
(bogusPolicyMessage >> okay ())
|
||||||
|
|
||||||
when (coerce rcptKey /= ref) $ lift bogusPolicyMessage >> stop (Right ())
|
when (coerce rcptKey /= ref) $ lift bogusPolicyMessage >> stop (Right ())
|
||||||
|
|
||||||
when (sppPolicyVersion pNew > p0) do
|
when (sppPolicyVersion pNew > p0) do
|
||||||
startDownloadStuff me (sppPolicyRef pNew)
|
startDownloadStuff me (sppPolicyRef pNew)
|
||||||
atomically $ modifyTVar inPolicyDownloadQ (HM.insert (sppPolicyRef pNew) (PolicyDownload now pNew))
|
|
||||||
|
mph <- putBlock mpwStorage (serialise newPolicy)
|
||||||
|
|
||||||
|
for_ mph $ \ph -> do
|
||||||
|
let insActually = HM.insert (sppPolicyRef pNew) (PolicyDownload now pNew (HashRef ph))
|
||||||
|
atomically $ modifyTVar inPolicyDownloadQ insActually
|
||||||
|
|
||||||
let v = Just $ max p0 (sppPolicyVersion pNew)
|
let v = Just $ max p0 (sppPolicyVersion pNew)
|
||||||
|
|
||||||
maybe1 mbsMailboxHash (okay ()) $ \h -> do
|
downloadStatus v
|
||||||
startDownloadStuff me h
|
|
||||||
atomically $ modifyTVar inMailboxDownloadQ (HM.insert h (MailboxDowload now v))
|
|
||||||
okay ()
|
|
||||||
|
|
||||||
-- если версия p > версии p0 -- ставим скачиваться, по скачиванию -- обновляем
|
|
||||||
-- тут есть какой-то процесс, который должен поллить скачивания, не забываем,
|
|
||||||
-- что это довольно затратно (проверка всех блоков)
|
|
||||||
-- по идее и сообщения-то должны процессить уже с обновленной policy
|
|
||||||
-- но если этого ждать, то засинкаемся черти когда, однако же
|
|
||||||
-- надо их начинать качать, как можно раньше. поэтому что?
|
|
||||||
-- ставим качать policy
|
|
||||||
-- для каждого сообщения -- ставим качать его
|
|
||||||
-- наверное, запоминаем версию policy с которой можно процессировать
|
|
||||||
-- если на момент скачивания сообщения -- policy не достигнуто -- просто его запоминаем
|
|
||||||
-- если их дофига, а момент так и не наступил --- тогда что?
|
|
||||||
--
|
|
||||||
-- наверное, запускаем циклический процесс по обновлению **этого** статуса.
|
|
||||||
-- сначала качаем policy.
|
|
||||||
--
|
|
||||||
-- как скачали -- обновляем, если ок
|
|
||||||
--
|
|
||||||
-- если не ок -- то но обновляем? а что тогда
|
|
||||||
--
|
|
||||||
|
|
||||||
okay ()
|
|
||||||
|
|
||||||
mailboxGetStatus MailboxProtoWorker{..} ref = do
|
mailboxGetStatus MailboxProtoWorker{..} ref = do
|
||||||
-- TODO: support-policy-ASAP
|
-- TODO: support-policy-ASAP
|
||||||
|
@ -370,6 +371,18 @@ loadPolicyPayloadFor dbe sto who = do
|
||||||
pure (ha, what)
|
pure (ha, what)
|
||||||
|
|
||||||
|
|
||||||
|
loadPolicyPayloadUnboxed :: forall s m . (ForMailbox s, MonadIO m)
|
||||||
|
=> DBPipeEnv
|
||||||
|
-> AnyStorage
|
||||||
|
-> MailboxRefKey s
|
||||||
|
-> m (Maybe (SetPolicyPayload s))
|
||||||
|
loadPolicyPayloadUnboxed dbe sto mbox = do
|
||||||
|
loadPolicyPayloadFor dbe sto mbox
|
||||||
|
<&> fmap snd
|
||||||
|
<&> fmap unboxSignedBox0
|
||||||
|
<&> join
|
||||||
|
<&> fmap snd
|
||||||
|
|
||||||
getMailboxType_ :: (ForMailbox s, MonadIO m) => DBPipeEnv -> MailboxRefKey s -> m (Maybe MailboxType)
|
getMailboxType_ :: (ForMailbox s, MonadIO m) => DBPipeEnv -> MailboxRefKey s -> m (Maybe MailboxType)
|
||||||
getMailboxType_ d r = do
|
getMailboxType_ d r = do
|
||||||
let sql = [qc|select type from mailbox where recipient = ? limit 1|]
|
let sql = [qc|select type from mailbox where recipient = ? limit 1|]
|
||||||
|
@ -436,7 +449,7 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
|
||||||
|
|
||||||
mFetchQ <- ContT $ withAsync (mailboxFetchQ dbe)
|
mFetchQ <- ContT $ withAsync (mailboxFetchQ dbe)
|
||||||
|
|
||||||
pDownQ <- ContT $ withAsync policyDownloadQ
|
pDownQ <- ContT $ withAsync (policyDownloadQ dbe)
|
||||||
|
|
||||||
sDownQ <- ContT $ withAsync stateDownloadQ
|
sDownQ <- ContT $ withAsync stateDownloadQ
|
||||||
|
|
||||||
|
@ -535,10 +548,30 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
|
||||||
updateRef sto r nref
|
updateRef sto r nref
|
||||||
debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref
|
debug $ yellow "mailbox updated" <+> pretty r <+> pretty nref
|
||||||
|
|
||||||
policyDownloadQ = do
|
policyDownloadQ dbe = do
|
||||||
forever do
|
|
||||||
pause @'Seconds 10
|
-- FIXME: too-often-checks-affect-performance
|
||||||
debug $ red "mailbox: policyDownloadQ"
|
-- $class: performance
|
||||||
|
let policies = readTVarIO inPolicyDownloadQ
|
||||||
|
<&> HM.toList
|
||||||
|
<&> fmap (,10)
|
||||||
|
|
||||||
|
polling (Polling 30 30) policies $ \(pk,PolicyDownload{..}) -> do
|
||||||
|
missed <- findMissedBlocks mpwStorage pk <&> L.null
|
||||||
|
unless missed do
|
||||||
|
let mbox = MailboxRefKey (sppMailboxKey policyDownloadWhat)
|
||||||
|
|
||||||
|
current <- loadPolicyPayloadUnboxed @s dbe mpwStorage mbox
|
||||||
|
<&> fmap sppPolicyVersion
|
||||||
|
<&> fromMaybe 0
|
||||||
|
|
||||||
|
let downloaded = sppPolicyVersion policyDownloadWhat
|
||||||
|
|
||||||
|
when (downloaded > current) do
|
||||||
|
-- set policy
|
||||||
|
pure ()
|
||||||
|
|
||||||
|
atomically $ modifyTVar inPolicyDownloadQ (HM.delete pk)
|
||||||
|
|
||||||
stateDownloadQ = do
|
stateDownloadQ = do
|
||||||
forever do
|
forever do
|
||||||
|
|
|
@ -41,6 +41,9 @@ data SetPolicyPayload s =
|
||||||
}
|
}
|
||||||
deriving stock (Generic)
|
deriving stock (Generic)
|
||||||
|
|
||||||
|
-- for Hashable
|
||||||
|
deriving instance ForMailbox s => Eq (SetPolicyPayload s)
|
||||||
|
|
||||||
data MailBoxStatusPayload s =
|
data MailBoxStatusPayload s =
|
||||||
MailBoxStatusPayload
|
MailBoxStatusPayload
|
||||||
{ mbsMailboxPayloadNonce :: Word64
|
{ mbsMailboxPayloadNonce :: Word64
|
||||||
|
|
Loading…
Reference in New Issue