This commit is contained in:
voidlizard 2024-11-13 17:22:37 +03:00
parent 23118aaea3
commit 14ed31dc38
1 changed files with 26 additions and 17 deletions

View File

@ -597,8 +597,14 @@ downloadDispatcher brains env = flip runContT pure do
_blkNum <- newTVarIO 0
wip <- newTVarIO ( HPSQ.empty @HashRef @Double @DCB )
parseQ <- newTQueueIO
void $ ContT $ withAsync $ manageThreads wip pts
let
onBlockSTM :: HashRef -> STM ()
onBlockSTM = writeTQueue parseQ
void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts
sto <- withPeerM env getStorage
@ -611,8 +617,9 @@ downloadDispatcher brains env = flip runContT pure do
atomically do
already <- readTVar wip <&> HPSQ.member (HashRef h)
dcb <- newDcbSTM now
let w = 1.0 -- realToFrac now
unless already do
modifyTVar wip (HPSQ.insert (HashRef h) 1.0 dcb)
modifyTVar wip (HPSQ.insert (HashRef h) w dcb)
ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
debug "Sweep blocks"
@ -628,13 +635,24 @@ downloadDispatcher brains env = flip runContT pure do
writeTVar wip (HPSQ.fromList (catMaybes alive))
ContT $ withAsync $ replicateM_ 4 $ forever do
what <- atomically $ readTQueue parseQ
missed <- findMissedBlocks sto what
now <- getTimeCoarse
for_ missed $ \hi -> do
atomically do
dcb <- newDcbSTM now
let w = realToFrac now
modifyTVar wip (HPSQ.insert hi 1.0 dcb)
forever $ (>> pause @'Seconds 10) do
sw0 <- readTVarIO wip <&> HPSQ.size
debug $ yellow $ "wip0" <+> pretty sw0
where
manageThreads wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
manageThreads onBlock wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS"
peers <- getKnownPeers @e <&> HS.fromList
@ -642,7 +660,7 @@ downloadDispatcher brains env = flip runContT pure do
for_ peers $ \p -> do
here <- readTVarIO pts <&> HM.member p
unless here do
a <- async (peerThread p wip)
a <- async (peerThread onBlock p wip)
atomically $ modifyTVar pts (HM.insert p a)
loosers <- atomically do
@ -654,7 +672,7 @@ downloadDispatcher brains env = flip runContT pure do
mapM_ cancel (fmap snd loosers)
peerThread p wip = flip runContT pure do
peerThread onBlock p wip = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] )
@ -687,15 +705,6 @@ downloadDispatcher brains env = flip runContT pure do
parseQ <- newTQueueIO
tparse <- ContT $ withAsync $ forever do
what <- atomically $ readTQueue parseQ
missed <- findMissedBlocks sto what
now <- getTimeCoarse
for_ missed $ \hi -> do
atomically do
dcb <- newDcbSTM now
modifyTVar wip (HPSQ.insert hi 1.0 dcb)
bmt <- ContT $ withAsync $ runBurstMachine bm
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
@ -792,7 +801,7 @@ downloadDispatcher brains env = flip runContT pure do
avg <- readTVarIO _avg
when (dtsec > avg * 1.05) do
when (dtsec > avg * 1.10) do
burstMachineAddErrors bm 1
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
@ -814,11 +823,11 @@ downloadDispatcher brains env = flip runContT pure do
else do
-- modifyTVar (dcbBusy dcb) pred
modifyTVar wip (HPSQ.delete hx)
writeTQueue parseQ hx
onBlock hx
bs <- ContT $ withAsync $ forever do
pause @'Seconds 10
debug $ yellow "I'm thread" <+> pretty p
void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep,tparse]
void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep]