refactor refchans, async management

This commit is contained in:
Dmitry Zuikov 2023-11-14 07:52:47 +03:00
parent fc52fabbf9
commit 63caa3b5b7
1 changed files with 44 additions and 30 deletions

View File

@ -38,11 +38,12 @@ import HBS2.Storage.Operations.Missed
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import PeerTypes import PeerTypes hiding (downloads)
import PeerConfig import PeerConfig
import BlockDownload import BlockDownload
import Brains import Brains
import Control.Monad.Trans.Cont
import Codec.Serialise import Codec.Serialise
import Control.Concurrent.STM (flushTQueue) import Control.Concurrent.STM (flushTQueue)
import Control.Exception () import Control.Exception ()
@ -539,51 +540,63 @@ refChanWorker env brains = do
mergeQ <- newTQueueIO mergeQ <- newTQueueIO
-- FIXME: resume-on-exception -- FIXME: resume-on-exception
hw <- async (refChanHeadMon penv)
-- FIXME: insist-more-during-download -- FIXME: insist-more-during-download
-- что-то частая ситуация, когда блоки -- что-то частая ситуация, когда блоки
-- с трудом докачиваются. надо бы -- с трудом докачиваются. надо бы
-- разобраться. возможно переделать -- разобраться. возможно переделать
-- механизм скачивания блоков -- механизм скачивания блоков
downloads <- async monitorHeadDownloads --
polls <- async refChanPoll -- всё это нужно вместе. соответственно,
-- упало одно - отменяем всё и простреливаем
-- наверх.
-- соответственно - bracket на каждый поток
wtrans <- async refChanWriter flip runContT (either throwIO (const none) .snd) do
cleanup1 <- async cleanupRounds hw <- ContT $ withAsync (refChanHeadMon penv)
merge <- async (logMergeProcess env mergeQ) downloads <- ContT $ withAsync (monitorHeadDownloads penv)
sto <- getStorage polls <- ContT $ withAsync (refChanPoll penv)
liftIO $ refChanWorkerInitValidators env wtrans <- ContT $ withAsync (refChanWriter penv)
liftIO $ refChanWorkerInitNotifiers env
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv))
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
h <- liftIO $ getRef sto (RefChanLogKey @s chan) merge <- ContT $ withAsync (liftIO $ logMergeProcess penv env mergeQ)
-- игнорируем, если синхронно sto <- lift getStorage
unless ((HashRef <$> h) == Just val) do
refChanAddDownload env chan val $ \href -> do liftIO $ refChanWorkerInitValidators env
debug $ "BLOCK DOWNLOADED" <+> pretty href
atomically $ writeTQueue mergeQ (chan, href)
atomically $ writeTQueue mergeQ (chan, val) liftIO $ refChanWorkerInitNotifiers env
forever do liftIO $ withPeerM penv do
pause @'Seconds 10 subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
debug "I'm refchan worker" debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
mapM_ waitCatch [hw,downloads,polls,wtrans,merge,cleanup1] h <- liftIO $ getRef sto (RefChanLogKey @s chan)
-- игнорируем, если синхронно
unless ((HashRef <$> h) == Just val) do
refChanAddDownload env chan val $ \href -> do
debug $ "BLOCK DOWNLOADED" <+> pretty href
atomically $ writeTQueue mergeQ (chan, href)
atomically $ writeTQueue mergeQ (chan, val)
bullshit <- ContT $ withAsync $ forever do
pause @'Seconds 10
debug "I'm refchan worker"
waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit]
where where
cleanupRounds = do cleanupRounds penv = withPeerM penv do
rounds <- newTVarIO HashSet.empty rounds <- newTVarIO HashSet.empty
@ -619,7 +632,7 @@ refChanWorker env brains = do
atomically $ modifyTVar rounds (HashSet.delete x) atomically $ modifyTVar rounds (HashSet.delete x)
debug $ "CLEANUP ROUND" <+> pretty x debug $ "CLEANUP ROUND" <+> pretty x
refChanWriter = do refChanWriter penv = withPeerM penv do
sto <- getStorage sto <- getStorage
forever do forever do
pause @'Seconds 1 pause @'Seconds 1
@ -655,7 +668,7 @@ refChanWorker env brains = do
debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref
refChanPoll = do refChanPoll penv = withPeerM penv do
let listRefs = listPolledRefs @e brains (Just "refchan") let listRefs = listPolledRefs @e brains (Just "refchan")
<&> fmap (\(a,_,b) -> (a,b)) <&> fmap (\(a,_,b) -> (a,b))
@ -666,7 +679,7 @@ refChanWorker env brains = do
broadCastMessage (RefChanGetHead @e ref) broadCastMessage (RefChanGetHead @e ref)
broadCastMessage (RefChanRequest @e ref) broadCastMessage (RefChanRequest @e ref)
monitorHeadDownloads = forever do monitorHeadDownloads penv = withPeerM penv $ forever do
pause @'Seconds 1 pause @'Seconds 1
all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList
@ -780,11 +793,12 @@ logMergeProcess :: forall e s m . ( MonadUnliftIO m
, s ~ Encryption e , s ~ Encryption e
, m ~ PeerM e IO , m ~ PeerM e IO
) )
=> RefChanWorkerEnv e => PeerEnv e
-> RefChanWorkerEnv e
-> TQueue (RefChanId e, HashRef) -> TQueue (RefChanId e, HashRef)
-> m () -> IO ()
logMergeProcess env q = do logMergeProcess penv env q = withPeerM penv do
sto <- getStorage sto <- getStorage