mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
85aef5298e
commit
7fe91abc0b
|
@ -597,8 +597,14 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
_blkNum <- newTVarIO 0
|
_blkNum <- newTVarIO 0
|
||||||
wip <- newTVarIO ( HPSQ.empty @HashRef @Double @DCB )
|
wip <- newTVarIO ( HPSQ.empty @HashRef @Double @DCB )
|
||||||
|
parseQ <- newTQueueIO
|
||||||
|
|
||||||
void $ ContT $ withAsync $ manageThreads wip pts
|
|
||||||
|
let
|
||||||
|
onBlockSTM :: HashRef -> STM ()
|
||||||
|
onBlockSTM = writeTQueue parseQ
|
||||||
|
|
||||||
|
void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
|
@ -611,8 +617,9 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
atomically do
|
atomically do
|
||||||
already <- readTVar wip <&> HPSQ.member (HashRef h)
|
already <- readTVar wip <&> HPSQ.member (HashRef h)
|
||||||
dcb <- newDcbSTM now
|
dcb <- newDcbSTM now
|
||||||
|
let w = 1.0 -- realToFrac now
|
||||||
unless already do
|
unless already do
|
||||||
modifyTVar wip (HPSQ.insert (HashRef h) 1.0 dcb)
|
modifyTVar wip (HPSQ.insert (HashRef h) w dcb)
|
||||||
|
|
||||||
ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
||||||
debug "Sweep blocks"
|
debug "Sweep blocks"
|
||||||
|
@ -628,13 +635,24 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
writeTVar wip (HPSQ.fromList (catMaybes alive))
|
writeTVar wip (HPSQ.fromList (catMaybes alive))
|
||||||
|
|
||||||
|
|
||||||
|
ContT $ withAsync $ replicateM_ 4 $ forever do
|
||||||
|
what <- atomically $ readTQueue parseQ
|
||||||
|
missed <- findMissedBlocks sto what
|
||||||
|
now <- getTimeCoarse
|
||||||
|
for_ missed $ \hi -> do
|
||||||
|
atomically do
|
||||||
|
dcb <- newDcbSTM now
|
||||||
|
let w = realToFrac now
|
||||||
|
modifyTVar wip (HPSQ.insert hi 1.0 dcb)
|
||||||
|
|
||||||
forever $ (>> pause @'Seconds 10) do
|
forever $ (>> pause @'Seconds 10) do
|
||||||
sw0 <- readTVarIO wip <&> HPSQ.size
|
sw0 <- readTVarIO wip <&> HPSQ.size
|
||||||
debug $ yellow $ "wip0" <+> pretty sw0
|
debug $ yellow $ "wip0" <+> pretty sw0
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
manageThreads wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
manageThreads onBlock wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
peers <- getKnownPeers @e <&> HS.fromList
|
peers <- getKnownPeers @e <&> HS.fromList
|
||||||
|
@ -642,7 +660,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
for_ peers $ \p -> do
|
for_ peers $ \p -> do
|
||||||
here <- readTVarIO pts <&> HM.member p
|
here <- readTVarIO pts <&> HM.member p
|
||||||
unless here do
|
unless here do
|
||||||
a <- async (peerThread p wip)
|
a <- async (peerThread onBlock p wip)
|
||||||
atomically $ modifyTVar pts (HM.insert p a)
|
atomically $ modifyTVar pts (HM.insert p a)
|
||||||
|
|
||||||
loosers <- atomically do
|
loosers <- atomically do
|
||||||
|
@ -654,7 +672,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
mapM_ cancel (fmap snd loosers)
|
mapM_ cancel (fmap snd loosers)
|
||||||
|
|
||||||
peerThread p wip = flip runContT pure do
|
peerThread onBlock p wip = flip runContT pure do
|
||||||
|
|
||||||
btimes <- newTVarIO ( mempty :: [Double] )
|
btimes <- newTVarIO ( mempty :: [Double] )
|
||||||
|
|
||||||
|
@ -687,15 +705,6 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
parseQ <- newTQueueIO
|
parseQ <- newTQueueIO
|
||||||
|
|
||||||
tparse <- ContT $ withAsync $ forever do
|
|
||||||
what <- atomically $ readTQueue parseQ
|
|
||||||
missed <- findMissedBlocks sto what
|
|
||||||
now <- getTimeCoarse
|
|
||||||
for_ missed $ \hi -> do
|
|
||||||
atomically do
|
|
||||||
dcb <- newDcbSTM now
|
|
||||||
modifyTVar wip (HPSQ.insert hi 1.0 dcb)
|
|
||||||
|
|
||||||
bmt <- ContT $ withAsync $ runBurstMachine bm
|
bmt <- ContT $ withAsync $ runBurstMachine bm
|
||||||
|
|
||||||
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
||||||
|
@ -792,7 +801,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
avg <- readTVarIO _avg
|
avg <- readTVarIO _avg
|
||||||
|
|
||||||
when (dtsec > avg * 1.05) do
|
when (dtsec > avg * 1.10) do
|
||||||
burstMachineAddErrors bm 1
|
burstMachineAddErrors bm 1
|
||||||
|
|
||||||
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
|
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
|
||||||
|
@ -814,11 +823,11 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
else do
|
else do
|
||||||
-- modifyTVar (dcbBusy dcb) pred
|
-- modifyTVar (dcbBusy dcb) pred
|
||||||
modifyTVar wip (HPSQ.delete hx)
|
modifyTVar wip (HPSQ.delete hx)
|
||||||
writeTQueue parseQ hx
|
onBlock hx
|
||||||
|
|
||||||
bs <- ContT $ withAsync $ forever do
|
bs <- ContT $ withAsync $ forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
debug $ yellow "I'm thread" <+> pretty p
|
debug $ yellow "I'm thread" <+> pretty p
|
||||||
|
|
||||||
void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep,tparse]
|
void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue