mirror of https://github.com/voidlizard/hbs2
wip, log merging, debug-10
This commit is contained in:
parent
85987c4902
commit
43139c44ea
|
@ -184,14 +184,20 @@ refChanWorker env brains = do
|
||||||
|
|
||||||
merge <- async (logMergeProcess env mergeQ)
|
merge <- async (logMergeProcess env mergeQ)
|
||||||
|
|
||||||
|
sto <- getStorage
|
||||||
|
|
||||||
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
|
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
|
||||||
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
|
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
|
||||||
|
|
||||||
refChanAddDownload env chan val $ \href -> do
|
h <- liftIO $ getRef sto (RefChanLogKey @s chan)
|
||||||
debug $ "BLOCK DOWNLOADED" <+> pretty href
|
|
||||||
atomically $ writeTQueue mergeQ (chan, href)
|
|
||||||
|
|
||||||
atomically $ writeTQueue mergeQ (chan, val)
|
unless ((HashRef <$> h) == Just val) do
|
||||||
|
|
||||||
|
refChanAddDownload env chan val $ \href -> do
|
||||||
|
debug $ "BLOCK DOWNLOADED" <+> pretty href
|
||||||
|
atomically $ writeTQueue mergeQ (chan, href)
|
||||||
|
|
||||||
|
atomically $ writeTQueue mergeQ (chan, val)
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
@ -400,7 +406,10 @@ logMergeProcess _ q = do
|
||||||
pause @'Seconds 1
|
pause @'Seconds 1
|
||||||
_ <- atomically $ peekTQueue q
|
_ <- atomically $ peekTQueue q
|
||||||
logs <- liftIO $ atomically $ flushTQueue q
|
logs <- liftIO $ atomically $ flushTQueue q
|
||||||
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ] & HashMap.toList
|
|
||||||
|
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ]
|
||||||
|
& HashMap.toList
|
||||||
|
& fmap (over _2 List.nub)
|
||||||
|
|
||||||
-- FIXME: in-parallel
|
-- FIXME: in-parallel
|
||||||
mapM_ (logMergeChan menv sto) byChan
|
mapM_ (logMergeChan menv sto) byChan
|
||||||
|
@ -424,7 +433,6 @@ logMergeProcess _ q = do
|
||||||
atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk)
|
atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk)
|
||||||
pure headblk
|
pure headblk
|
||||||
|
|
||||||
|
|
||||||
logMergeChan menv sto (chan, logs) = void $ runMaybeT do
|
logMergeChan menv sto (chan, logs) = void $ runMaybeT do
|
||||||
|
|
||||||
let chanKey = RefChanLogKey @s chan
|
let chanKey = RefChanLogKey @s chan
|
||||||
|
|
Loading…
Reference in New Issue