diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 6e6f9f6f..f09999e3 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -184,14 +184,20 @@ refChanWorker env brains = do merge <- async (logMergeProcess env mergeQ) + sto <- getStorage + subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val - refChanAddDownload env chan val $ \href -> do - debug $ "BLOCK DOWNLOADED" <+> pretty href - atomically $ writeTQueue mergeQ (chan, href) + h <- liftIO $ getRef sto (RefChanLogKey @s chan) - atomically $ writeTQueue mergeQ (chan, val) + unless ((HashRef <$> h) == Just val) do + + refChanAddDownload env chan val $ \href -> do + debug $ "BLOCK DOWNLOADED" <+> pretty href + atomically $ writeTQueue mergeQ (chan, href) + + atomically $ writeTQueue mergeQ (chan, val) forever do pause @'Seconds 10 @@ -400,7 +406,10 @@ logMergeProcess _ q = do pause @'Seconds 1 _ <- atomically $ peekTQueue q logs <- liftIO $ atomically $ flushTQueue q - let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList + + let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] + & HashMap.toList + & fmap (over _2 List.nub) -- FIXME: in-parallel mapM_ (logMergeChan menv sto) byChan @@ -424,7 +433,6 @@ logMergeProcess _ q = do atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk) pure headblk - logMergeChan menv sto (chan, logs) = void $ runMaybeT do let chanKey = RefChanLogKey @s chan