diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index aa4d622f..c691f356 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -104,7 +104,7 @@ class ( MonadIO m response :: p -> m () -class Request e p (m :: Type -> Type) | p -> e where +class HasProtocol e p => Request e p (m :: Type -> Type) | p -> e where request :: Peer e -> p -> m () data ReqLimPeriod = NoLimit diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 17504cdb..8b13c976 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -49,13 +49,13 @@ import Data.HashSet qualified as HashSet import Data.List qualified as List import Data.Maybe import Lens.Micro.Platform -import Data.Heap qualified as Heap -import Data.Heap (Heap,Entry(..)) +-- import Data.Heap qualified as Heap +import Data.Heap () import Codec.Serialise import UnliftIO import Streaming.Prelude qualified as S -import Streaming qualified as S +import Streaming() {- HLINT ignore "Use newtype instead of data" -} @@ -63,11 +63,13 @@ data DataNotReady = DataNotReady deriving (Show) instance Exception DataNotReady +type OnDownloadComplete = HashRef -> IO () + data RefChanWorkerEnv e = RefChanWorkerEnv { _refChanWorkerEnvDEnv :: DownloadEnv e , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) - , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) + , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) , _refChanWorkerEnvWriteQ :: TQueue HashRef } @@ -102,14 +104,18 @@ refChanNotifyOnUpdated env chan = do refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e ) - => RefChanWorkerEnv e -> RefChanId e -> HashRef -> m () -refChanAddDownload env chan r = do + => RefChanWorkerEnv e + -> RefChanId e + -> HashRef + -> OnDownloadComplete + -> m () +refChanAddDownload env chan r onComlete = do penv <- ask t <- getTimeCoarse withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env) $ processBlock @e (fromHashRef r) - atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,t)) + atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete))) -- FIXME: slow-deep-scan-exception-seems-not-working checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool @@ -131,7 +137,7 @@ refChanWorker :: forall e s m . ( MonadIO m , MyPeer e , HasStorage m , Request e (RefChanHead e) m - , HasProtocol e (RefChanHead e) + , Request e (RefChanRequest e) m , Sessions e (KnownPeer e) m , Signatures s , s ~ Encryption e @@ -151,26 +157,35 @@ refChanWorker env brains = do penv <- ask + mergeQ <- newTQueueIO -- FIXME: resume-on-exception hw <- async (refChanHeadMon penv) - downloads <- async monitorDownloads + downloads <- async monitorHeadDownloads - polls <- async refChanHeadPoll + polls <- async refChanPoll wtrans <- async refChanWriter cleanup1 <- async cleanupRounds + merge <- async (logMergeProcess env mergeQ) + subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val + refChanAddDownload env chan val $ \href -> do + debug $ "BLOCK DOWNLOADED" <+> pretty href + atomically $ writeTQueue mergeQ (chan, href) + + atomically $ writeTQueue mergeQ (chan, val) + forever do pause @'Seconds 10 debug "I'm refchan worker" - mapM_ waitCatch [hw,downloads,polls,wtrans,cleanup1] + mapM_ waitCatch [hw,downloads,polls,wtrans,merge,cleanup1] where @@ -242,26 +257,28 @@ refChanWorker env brains = do debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref - refChanHeadPoll = do + refChanPoll = do let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) polling (Polling 5 5) listRefs $ \ref -> do debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) broadCastMessage (RefChanGetHead @e ref) + broadCastMessage (RefChanRequest @e ref) - monitorDownloads = forever do - pause @'Seconds 2 + monitorHeadDownloads = forever do + pause @'Seconds 1 all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList now <- getTimeCoarse -- FIXME: change-to-polling-functions -- FIXME: consider-timeouts-or-leak-is-possible - rest <- forM all $ \(r,item@(chan,t)) -> do + rest <- forM all $ \(r,item@(chan,(t,onComplete))) -> do here <- checkDownloaded r if here then do - refChanOnHeadFn env chan (RefChanHeadBlockTran r) + liftIO $ onComplete r + -- refChanOnHeadFn env chan (RefChanHeadBlockTran r) pure mempty else do -- FIXME: fix-timeout-hardcode @@ -279,7 +296,7 @@ refChanWorker env brains = do here <- checkDownloaded hr if not here then do - refChanAddDownload env chan hr + refChanAddDownload env chan hr (withPeerM pe . refChanOnHeadFn env chan . RefChanHeadBlockTran) trace $ "BLOCK IS NOT HERE" <+> pretty hr else do sto <- getStorage @@ -343,10 +360,29 @@ refChanWorker env brains = do pure () - -- если всё скачано --- то обрабатываем. - -- если не скачано -- то говорим качать и ждём. как ждём? - -- помещаем в фигню, которая download запускает, и время от времени ждёт, - -- пока скачается. как скачается -- убирает из своего локального стейта, - -- и пихает транзу обратно в эту очередь, допустим. +logMergeProcess :: forall e s m . ( MonadUnliftIO m + , MyPeer e + , ForRefChans e + , s ~ Encryption e + ) + => RefChanWorkerEnv e + -> TQueue (RefChanId e, HashRef) + -> m () + +logMergeProcess _ q = do + forever do + -- FIXME: fix-hardcoded-timeout + pause @'Seconds 1 + _ <- atomically $ peekTQueue q + logs <- liftIO $ atomically $ flushTQueue q + let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList + + -- FIXME: in-parallel + mapM_ logMergeChan byChan + where + + logMergeChan (chan, logs) = do + debug $ "ABOUT TO MERGE LOGS" <+> pretty (AsBase58 chan) <+> pretty (length logs) +