wip, log merging, debug

This commit is contained in:
Dmitry Zuikov 2023-07-19 10:14:28 +03:00
parent 84c74c3b4f
commit f31846f848
1 changed files with 31 additions and 9 deletions

View File

@ -15,6 +15,7 @@ import HBS2.Prelude.Plated
import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Hash
import HBS2.Clock
import HBS2.Events
import HBS2.Net.Proto.Peer
@ -132,6 +133,17 @@ checkDownloaded hr = do
pure $ maybe False (not . List.null) $ sequence result
readLog :: forall m . ( MonadUnliftIO m )
=> AnyStorage
-> HashRef
-> m [HashRef]
readLog sto (HashRef h) =
S.toList_ $ do
walkMerkle h (liftIO . getBlock sto) $ \hr -> do
case hr of
Left{} -> pure ()
Right (hrr :: [HashRef]) -> S.each hrr
refChanWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
@ -239,12 +251,7 @@ refChanWorker env brains = do
forM_ (HashMap.toList byChan) $ \(c,new) -> do
mbLog <- liftIO $ getRef sto c
hashes <- maybe1 mbLog (pure mempty) $ \hlog -> do
S.toList_ $ do
walkMerkle hlog (liftIO . getBlock sto) $ \hr -> do
case hr of
Left{} -> pure ()
Right (hrr :: [HashRef]) -> S.each hrr
hashes <- maybe1 mbLog (pure mempty) $ readLog sto . HashRef
-- FIXME: might-be-problems-on-large-logs
let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList
@ -363,6 +370,7 @@ refChanWorker env brains = do
logMergeProcess :: forall e s m . ( MonadUnliftIO m
, MyPeer e
, ForRefChans e
, HasStorage m
, s ~ Encryption e
)
=> RefChanWorkerEnv e
@ -370,6 +378,9 @@ logMergeProcess :: forall e s m . ( MonadUnliftIO m
-> m ()
logMergeProcess _ q = do
sto <- getStorage
forever do
-- FIXME: fix-hardcoded-timeout
pause @'Seconds 1
@ -378,11 +389,22 @@ logMergeProcess _ q = do
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList
-- FIXME: in-parallel
mapM_ logMergeChan byChan
mapM_ (logMergeChan sto) byChan
where
logMergeChan (chan, logs) = do
logMergeChan sto (chan, logs) = runMaybeT do
h <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan)
current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList
debug $ "ABOUT TO MERGE LOGS" <+> pretty (AsBase58 chan) <+> pretty (length logs)
trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs
forM_ trans $ \t -> do
debug $ "ABOUT TO MERGE TRANS" <+> pretty t