mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
0828a6e01e
commit
e53273d3b6
|
@ -554,10 +554,17 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
rq <- newTQueueIO
|
rq <- newTQueueIO
|
||||||
|
|
||||||
let seenLimit = 1000
|
let seenLimit = 1000
|
||||||
|
|
||||||
seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef TimeSpec () )
|
seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef TimeSpec () )
|
||||||
|
|
||||||
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
|
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
|
||||||
|
|
||||||
|
let sizeCacheLimit = 10000
|
||||||
|
|
||||||
|
sizeCache <- newTVarIO ( HPSQ.empty :: HashPSQ (HashRef,Peer e) TimeSpec (Maybe Integer) )
|
||||||
|
|
||||||
|
sizeQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef NominalDiffTime () )
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
work <- newTQueueIO
|
work <- newTQueueIO
|
||||||
|
@ -571,6 +578,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
ContT $ withAsync $ forever do
|
||||||
pause @'Seconds 600
|
pause @'Seconds 600
|
||||||
|
|
||||||
debug $ "CLEANUP SEEN"
|
debug $ "CLEANUP SEEN"
|
||||||
atomically do
|
atomically do
|
||||||
fix \next -> do
|
fix \next -> do
|
||||||
|
@ -579,6 +587,14 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
modifyTVar seen HPSQ.deleteMin
|
modifyTVar seen HPSQ.deleteMin
|
||||||
next
|
next
|
||||||
|
|
||||||
|
debug $ "CLEANUP SIZES"
|
||||||
|
atomically do
|
||||||
|
fix \next -> do
|
||||||
|
n <- readTVar sizeCache <&> HPSQ.size
|
||||||
|
when (n > sizeCacheLimit) do
|
||||||
|
modifyTVar sizeCache HPSQ.deleteMin
|
||||||
|
next
|
||||||
|
|
||||||
liftIO $ withPeerM env do
|
liftIO $ withPeerM env do
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||||
now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
|
@ -600,6 +616,23 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
Nothing -> ((), Nothing)
|
Nothing -> ((), Nothing)
|
||||||
Just (p,v) -> ((), Just (succ p, v * 1.10 ))
|
Just (p,v) -> ((), Just (succ p, v * 1.10 ))
|
||||||
|
|
||||||
|
ContT $ withAsync $
|
||||||
|
polling (Polling 1 1) missChk $ \h -> do
|
||||||
|
pips <- readTVarIO pts <&> HM.keys
|
||||||
|
forConcurrently_ pips $ \p -> do
|
||||||
|
now <- getTimeCoarse
|
||||||
|
here <- readTVarIO sizeCache <&> HPSQ.member (h,p)
|
||||||
|
hereB <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
when (not here && not hereB) do
|
||||||
|
size <- queryBlockSizeFromPeer brains env (coerce h) p
|
||||||
|
case size of
|
||||||
|
Left{} -> pure ()
|
||||||
|
Right w -> do
|
||||||
|
atomically $ modifyTVar sizeCache ( HPSQ.insert (h,p) now w )
|
||||||
|
debug $ green "GOT SIZE" <+> pretty w <+> pretty h <+> pretty p
|
||||||
|
|
||||||
|
atomically $ modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h )
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
ContT $ withAsync $ forever do
|
||||||
polling (Polling 1 1) missChk $ \h -> do
|
polling (Polling 1 1) missChk $ \h -> do
|
||||||
debug $ blue "CHECK MISSED BLOCKS" <+> pretty h
|
debug $ blue "CHECK MISSED BLOCKS" <+> pretty h
|
||||||
|
|
Loading…
Reference in New Issue