diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index b057d3fc..c4864057 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -38,11 +38,12 @@ import HBS2.Storage.Operations.Missed import HBS2.System.Logger.Simple -import PeerTypes +import PeerTypes hiding (downloads) import PeerConfig import BlockDownload import Brains +import Control.Monad.Trans.Cont import Codec.Serialise import Control.Concurrent.STM (flushTQueue) import Control.Exception () @@ -539,51 +540,63 @@ refChanWorker env brains = do mergeQ <- newTQueueIO -- FIXME: resume-on-exception - hw <- async (refChanHeadMon penv) -- FIXME: insist-more-during-download -- что-то частая ситуация, когда блоки -- с трудом докачиваются. надо бы -- разобраться. возможно переделать -- механизм скачивания блоков - downloads <- async monitorHeadDownloads + -- - polls <- async refChanPoll + -- всё это нужно вместе. соответственно, + -- упало одно - отменяем всё и простреливаем + -- наверх. + -- соответственно - bracket на каждый поток - wtrans <- async refChanWriter + flip runContT (either throwIO (const none) .snd) do - cleanup1 <- async cleanupRounds + hw <- ContT $ withAsync (refChanHeadMon penv) - merge <- async (logMergeProcess env mergeQ) + downloads <- ContT $ withAsync (monitorHeadDownloads penv) - sto <- getStorage + polls <- ContT $ withAsync (refChanPoll penv) - liftIO $ refChanWorkerInitValidators env - liftIO $ refChanWorkerInitNotifiers env + wtrans <- ContT $ withAsync (refChanWriter penv) - subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do - debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val + cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv)) - h <- liftIO $ getRef sto (RefChanLogKey @s chan) + merge <- ContT $ withAsync (liftIO $ logMergeProcess penv env mergeQ) - -- игнорируем, если синхронно - unless ((HashRef <$> h) == Just val) do + sto <- lift getStorage - refChanAddDownload env chan val $ \href -> do - debug $ "BLOCK DOWNLOADED" <+> pretty href - atomically $ writeTQueue mergeQ (chan, href) + liftIO $ refChanWorkerInitValidators env - atomically $ writeTQueue mergeQ (chan, val) + liftIO $ refChanWorkerInitNotifiers env - forever do - pause @'Seconds 10 - debug "I'm refchan worker" + liftIO $ withPeerM penv do + subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do + debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val - mapM_ waitCatch [hw,downloads,polls,wtrans,merge,cleanup1] + h <- liftIO $ getRef sto (RefChanLogKey @s chan) + + -- игнорируем, если синхронно + unless ((HashRef <$> h) == Just val) do + + refChanAddDownload env chan val $ \href -> do + debug $ "BLOCK DOWNLOADED" <+> pretty href + atomically $ writeTQueue mergeQ (chan, href) + + atomically $ writeTQueue mergeQ (chan, val) + + bullshit <- ContT $ withAsync $ forever do + pause @'Seconds 10 + debug "I'm refchan worker" + + waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit] where - cleanupRounds = do + cleanupRounds penv = withPeerM penv do rounds <- newTVarIO HashSet.empty @@ -619,7 +632,7 @@ refChanWorker env brains = do atomically $ modifyTVar rounds (HashSet.delete x) debug $ "CLEANUP ROUND" <+> pretty x - refChanWriter = do + refChanWriter penv = withPeerM penv do sto <- getStorage forever do pause @'Seconds 1 @@ -655,7 +668,7 @@ refChanWorker env brains = do debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref - refChanPoll = do + refChanPoll penv = withPeerM penv do let listRefs = listPolledRefs @e brains (Just "refchan") <&> fmap (\(a,_,b) -> (a,b)) @@ -666,7 +679,7 @@ refChanWorker env brains = do broadCastMessage (RefChanGetHead @e ref) broadCastMessage (RefChanRequest @e ref) - monitorHeadDownloads = forever do + monitorHeadDownloads penv = withPeerM penv $ forever do pause @'Seconds 1 all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList @@ -780,11 +793,12 @@ logMergeProcess :: forall e s m . ( MonadUnliftIO m , s ~ Encryption e , m ~ PeerM e IO ) - => RefChanWorkerEnv e + => PeerEnv e + -> RefChanWorkerEnv e -> TQueue (RefChanId e, HashRef) - -> m () + -> IO () -logMergeProcess env q = do +logMergeProcess penv env q = withPeerM penv do sto <- getStorage