wip, log merging

This commit is contained in:
Dmitry Zuikov 2023-07-19 09:49:47 +03:00
parent 126994720f
commit 84c74c3b4f
2 changed files with 59 additions and 23 deletions

View File

@ -104,7 +104,7 @@ class ( MonadIO m
response :: p -> m () response :: p -> m ()
class Request e p (m :: Type -> Type) | p -> e where class HasProtocol e p => Request e p (m :: Type -> Type) | p -> e where
request :: Peer e -> p -> m () request :: Peer e -> p -> m ()
data ReqLimPeriod = NoLimit data ReqLimPeriod = NoLimit

View File

@ -49,13 +49,13 @@ import Data.HashSet qualified as HashSet
import Data.List qualified as List import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Lens.Micro.Platform import Lens.Micro.Platform
import Data.Heap qualified as Heap -- import Data.Heap qualified as Heap
import Data.Heap (Heap,Entry(..)) import Data.Heap ()
import Codec.Serialise import Codec.Serialise
import UnliftIO import UnliftIO
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
import Streaming qualified as S import Streaming()
{- HLINT ignore "Use newtype instead of data" -} {- HLINT ignore "Use newtype instead of data" -}
@ -63,11 +63,13 @@ data DataNotReady = DataNotReady deriving (Show)
instance Exception DataNotReady instance Exception DataNotReady
type OnDownloadComplete = HashRef -> IO ()
data RefChanWorkerEnv e = data RefChanWorkerEnv e =
RefChanWorkerEnv RefChanWorkerEnv
{ _refChanWorkerEnvDEnv :: DownloadEnv e { _refChanWorkerEnvDEnv :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete)))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
, _refChanWorkerEnvWriteQ :: TQueue HashRef , _refChanWorkerEnvWriteQ :: TQueue HashRef
} }
@ -102,14 +104,18 @@ refChanNotifyOnUpdated env chan = do
refChanAddDownload :: forall e m . ( m ~ PeerM e IO refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e , MyPeer e
) )
=> RefChanWorkerEnv e -> RefChanId e -> HashRef -> m () => RefChanWorkerEnv e
refChanAddDownload env chan r = do -> RefChanId e
-> HashRef
-> OnDownloadComplete
-> m ()
refChanAddDownload env chan r onComlete = do
penv <- ask penv <- ask
t <- getTimeCoarse t <- getTimeCoarse
withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env) withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env)
$ processBlock @e (fromHashRef r) $ processBlock @e (fromHashRef r)
atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,t)) atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete)))
-- FIXME: slow-deep-scan-exception-seems-not-working -- FIXME: slow-deep-scan-exception-seems-not-working
checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool
@ -131,7 +137,7 @@ refChanWorker :: forall e s m . ( MonadIO m
, MyPeer e , MyPeer e
, HasStorage m , HasStorage m
, Request e (RefChanHead e) m , Request e (RefChanHead e) m
, HasProtocol e (RefChanHead e) , Request e (RefChanRequest e) m
, Sessions e (KnownPeer e) m , Sessions e (KnownPeer e) m
, Signatures s , Signatures s
, s ~ Encryption e , s ~ Encryption e
@ -151,26 +157,35 @@ refChanWorker env brains = do
penv <- ask penv <- ask
mergeQ <- newTQueueIO
-- FIXME: resume-on-exception -- FIXME: resume-on-exception
hw <- async (refChanHeadMon penv) hw <- async (refChanHeadMon penv)
downloads <- async monitorDownloads downloads <- async monitorHeadDownloads
polls <- async refChanHeadPoll polls <- async refChanPoll
wtrans <- async refChanWriter wtrans <- async refChanWriter
cleanup1 <- async cleanupRounds cleanup1 <- async cleanupRounds
merge <- async (logMergeProcess env mergeQ)
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
refChanAddDownload env chan val $ \href -> do
debug $ "BLOCK DOWNLOADED" <+> pretty href
atomically $ writeTQueue mergeQ (chan, href)
atomically $ writeTQueue mergeQ (chan, val)
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
debug "I'm refchan worker" debug "I'm refchan worker"
mapM_ waitCatch [hw,downloads,polls,wtrans,cleanup1] mapM_ waitCatch [hw,downloads,polls,wtrans,merge,cleanup1]
where where
@ -242,26 +257,28 @@ refChanWorker env brains = do
debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref
refChanHeadPoll = do refChanPoll = do
let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) )
polling (Polling 5 5) listRefs $ \ref -> do polling (Polling 5 5) listRefs $ \ref -> do
debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref)
broadCastMessage (RefChanGetHead @e ref) broadCastMessage (RefChanGetHead @e ref)
broadCastMessage (RefChanRequest @e ref)
monitorDownloads = forever do monitorHeadDownloads = forever do
pause @'Seconds 2 pause @'Seconds 1
all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList
now <- getTimeCoarse now <- getTimeCoarse
-- FIXME: change-to-polling-functions -- FIXME: change-to-polling-functions
-- FIXME: consider-timeouts-or-leak-is-possible -- FIXME: consider-timeouts-or-leak-is-possible
rest <- forM all $ \(r,item@(chan,t)) -> do rest <- forM all $ \(r,item@(chan,(t,onComplete))) -> do
here <- checkDownloaded r here <- checkDownloaded r
if here then do if here then do
refChanOnHeadFn env chan (RefChanHeadBlockTran r) liftIO $ onComplete r
-- refChanOnHeadFn env chan (RefChanHeadBlockTran r)
pure mempty pure mempty
else do else do
-- FIXME: fix-timeout-hardcode -- FIXME: fix-timeout-hardcode
@ -279,7 +296,7 @@ refChanWorker env brains = do
here <- checkDownloaded hr here <- checkDownloaded hr
if not here then do if not here then do
refChanAddDownload env chan hr refChanAddDownload env chan hr (withPeerM pe . refChanOnHeadFn env chan . RefChanHeadBlockTran)
trace $ "BLOCK IS NOT HERE" <+> pretty hr trace $ "BLOCK IS NOT HERE" <+> pretty hr
else do else do
sto <- getStorage sto <- getStorage
@ -343,10 +360,29 @@ refChanWorker env brains = do
pure () pure ()
-- если всё скачано --- то обрабатываем. logMergeProcess :: forall e s m . ( MonadUnliftIO m
-- если не скачано -- то говорим качать и ждём. как ждём? , MyPeer e
-- помещаем в фигню, которая download запускает, и время от времени ждёт, , ForRefChans e
-- пока скачается. как скачается -- убирает из своего локального стейта, , s ~ Encryption e
-- и пихает транзу обратно в эту очередь, допустим. )
=> RefChanWorkerEnv e
-> TQueue (RefChanId e, HashRef)
-> m ()
logMergeProcess _ q = do
forever do
-- FIXME: fix-hardcoded-timeout
pause @'Seconds 1
_ <- atomically $ peekTQueue q
logs <- liftIO $ atomically $ flushTQueue q
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList
-- FIXME: in-parallel
mapM_ logMergeChan byChan
where
logMergeChan (chan, logs) = do
debug $ "ABOUT TO MERGE LOGS" <+> pretty (AsBase58 chan) <+> pretty (length logs)