diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 8b13c976..e892a179 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -15,6 +15,7 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 +import HBS2.Hash import HBS2.Clock import HBS2.Events import HBS2.Net.Proto.Peer @@ -132,6 +133,17 @@ checkDownloaded hr = do pure $ maybe False (not . List.null) $ sequence result +readLog :: forall m . ( MonadUnliftIO m ) + => AnyStorage + -> HashRef + -> m [HashRef] +readLog sto (HashRef h) = + S.toList_ $ do + walkMerkle h (liftIO . getBlock sto) $ \hr -> do + case hr of + Left{} -> pure () + Right (hrr :: [HashRef]) -> S.each hrr + refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m , MyPeer e @@ -239,12 +251,7 @@ refChanWorker env brains = do forM_ (HashMap.toList byChan) $ \(c,new) -> do mbLog <- liftIO $ getRef sto c - hashes <- maybe1 mbLog (pure mempty) $ \hlog -> do - S.toList_ $ do - walkMerkle hlog (liftIO . getBlock sto) $ \hr -> do - case hr of - Left{} -> pure () - Right (hrr :: [HashRef]) -> S.each hrr + hashes <- maybe1 mbLog (pure mempty) $ readLog sto . HashRef -- FIXME: might-be-problems-on-large-logs let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList @@ -363,6 +370,7 @@ refChanWorker env brains = do logMergeProcess :: forall e s m . ( MonadUnliftIO m , MyPeer e , ForRefChans e + , HasStorage m , s ~ Encryption e ) => RefChanWorkerEnv e @@ -370,6 +378,9 @@ logMergeProcess :: forall e s m . ( MonadUnliftIO m -> m () logMergeProcess _ q = do + + sto <- getStorage + forever do -- FIXME: fix-hardcoded-timeout pause @'Seconds 1 @@ -378,11 +389,22 @@ logMergeProcess _ q = do let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList -- FIXME: in-parallel - mapM_ logMergeChan byChan - + mapM_ (logMergeChan sto) byChan where - logMergeChan (chan, logs) = do + logMergeChan sto (chan, logs) = runMaybeT do + + h <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan) + + current <- lift $ readLog sto (HashRef h) <&> HashSet.fromList + debug $ "ABOUT TO MERGE LOGS" <+> pretty (AsBase58 chan) <+> pretty (length logs) + trans <- filter (not . flip HashSet.member current) . mconcat <$> mapM (lift . readLog sto) logs + + forM_ trans $ \t -> do + + debug $ "ABOUT TO MERGE TRANS" <+> pretty t + +