mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
dc02702be6
commit
644d43c5c3
|
@ -267,7 +267,7 @@ runBurstMachine BurstMachine{..} = do
|
||||||
|
|
||||||
new <- if e2 > e1 then do
|
new <- if e2 > e1 then do
|
||||||
let d = max 2.0 (current * (1.0 - down))
|
let d = max 2.0 (current * (1.0 - down))
|
||||||
nrates <- readTVar _rates <&> drop 5 . Map.toList
|
nrates <- readTVar _rates <&> drop 3 . Map.toList
|
||||||
let newFucked = maybe d snd (headMay nrates)
|
let newFucked = maybe d snd (headMay nrates)
|
||||||
writeTVar _rates (Map.fromList nrates)
|
writeTVar _rates (Map.fromList nrates)
|
||||||
pure newFucked
|
pure newFucked
|
||||||
|
@ -565,6 +565,25 @@ data Work =
|
||||||
RequestSize HashRef (Maybe Integer -> IO ())
|
RequestSize HashRef (Maybe Integer -> IO ())
|
||||||
| FetchBlock HashRef Integer (BlockFetchResult -> IO ())
|
| FetchBlock HashRef Integer (BlockFetchResult -> IO ())
|
||||||
|
|
||||||
|
|
||||||
|
-- | Download control block
|
||||||
|
data DCB =
|
||||||
|
DCB
|
||||||
|
{ dcbStart :: TimeSpec
|
||||||
|
, dcbBusy :: TVar Int
|
||||||
|
, dcbDownloaded :: TVar Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
newDcbSTM :: TimeSpec -> STM DCB
|
||||||
|
newDcbSTM ts = DCB ts <$> newTVar 0 <*> newTVar False
|
||||||
|
|
||||||
|
data PSt =
|
||||||
|
PChoose
|
||||||
|
| PInit HashRef DCB
|
||||||
|
| PQuerySize HashRef DCB
|
||||||
|
| PFetchBlock HashRef DCB Integer
|
||||||
|
| PReleaseBlock HashRef DCB Bool
|
||||||
|
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
)
|
)
|
||||||
|
@ -573,21 +592,13 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
-> m ()
|
-> m ()
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TBQueue Work))
|
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) )
|
||||||
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
|
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
|
||||||
|
|
||||||
_blkNum <- newTVarIO 0
|
_blkNum <- newTVarIO 0
|
||||||
wip <- newTVarIO ( mempty :: HashMap HashRef NominalDiffTime )
|
wip <- newTVarIO ( HPSQ.empty @HashRef @Double @DCB )
|
||||||
sizeRq <- newTVarIO ( HPSQ.empty @HashRef @TimeSpec @() )
|
|
||||||
sizeRqWip <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) TimeSpec)
|
|
||||||
sizeCache <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) (Maybe Integer) )
|
|
||||||
downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @Double @Integer )
|
|
||||||
choosen <- newTVarIO ( mempty :: HashSet HashRef )
|
|
||||||
stat <- newTVarIO ( mempty :: HashMap (Peer e) Double )
|
|
||||||
fuckup <- newTVarIO ( mempty :: HashMap HashRef (HashSet (Peer e)) )
|
|
||||||
-- done <- newTVarIO ( mempty :: HashSet HashRef )
|
|
||||||
|
|
||||||
void $ ContT $ withAsync $ manageThreads stat pts
|
void $ ContT $ withAsync $ manageThreads wip pts
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
|
@ -598,235 +609,32 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar sizeRq (HPSQ.insert (HashRef h) now ())
|
already <- readTVar wip <&> HPSQ.member (HashRef h)
|
||||||
modifyTVar wip (HM.insert (HashRef h) 30)
|
dcb <- newDcbSTM now
|
||||||
|
unless already do
|
||||||
let onBlockSize p h s = do
|
modifyTVar wip (HPSQ.insert (HashRef h) 1.0 dcb)
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
||||||
|
debug "Sweep blocks"
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar sizeRqWip (HM.delete (p,h))
|
total <- readTVar wip <&> HPSQ.toList
|
||||||
modifyTVar sizeRq (HPSQ.delete h)
|
|
||||||
|
|
||||||
l <- atomically do
|
alive <- for total $ \e@(h,_,DCB{..}) -> do
|
||||||
w <- readTVar pts <&> fmap snd . HM.lookup p
|
down <- readTVar dcbDownloaded
|
||||||
maybe1 w (pure 0) lengthTBQueue
|
if down then
|
||||||
|
pure Nothing
|
||||||
|
else
|
||||||
|
pure (Just e)
|
||||||
|
|
||||||
-- let color = if isJust s then green else red
|
writeTVar wip (HPSQ.fromList (catMaybes alive))
|
||||||
-- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p
|
|
||||||
dtt <- randomRIO (-0.05, 0.05)
|
|
||||||
-- let dtt = 0
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
unless here do
|
|
||||||
dt <- readTVarIO stat <&> (*(1+dtt)) . fromMaybe 1.0 . HM.lookup p
|
|
||||||
let rate = dt + realToFrac l
|
|
||||||
atomically do
|
|
||||||
-- blkNum <- stateTVar _blkNum (\x -> (x, succ x))
|
|
||||||
modifyTVar sizeCache (HM.insert (p,h) s)
|
|
||||||
choo <- readTVar choosen <&> HS.member h
|
|
||||||
maybe1 s none $ \size -> do
|
|
||||||
unless choo do
|
|
||||||
modifyTVar downWip (HPSQ.insert (p,h) rate size)
|
|
||||||
|
|
||||||
parseQ <- newTQueueIO
|
|
||||||
|
|
||||||
let
|
|
||||||
deleteBlockFromWip :: HashRef -> IO ()
|
|
||||||
deleteBlockFromWip h = do
|
|
||||||
atomically do
|
|
||||||
modifyTVar wip (HM.delete h)
|
|
||||||
modifyTVar sizeRq (HPSQ.delete h)
|
|
||||||
-- modifyTVar choosen (HS.delete h)
|
|
||||||
srwq <- readTVar sizeRqWip <&> HM.toList
|
|
||||||
writeTVar sizeRqWip (HM.fromList $ [ x | x@((_,hi),_) <- srwq, hi /= h ])
|
|
||||||
dw <- readTVar downWip <&> HPSQ.toList
|
|
||||||
writeTVar downWip (HPSQ.fromList $ [ x | x@((_,hi),_,_) <- dw, hi /= h ])
|
|
||||||
cs <- readTVar sizeCache <&> HM.toList
|
|
||||||
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, hi /= h ])
|
|
||||||
|
|
||||||
let onBlock p h = \case
|
|
||||||
|
|
||||||
BlockAlreadyHere -> do
|
|
||||||
debug $ yellow "ALREADY HAVE BLOCK" <+> pretty h
|
|
||||||
-- deleteBlockFromWip h
|
|
||||||
|
|
||||||
BlockFetched bs -> do
|
|
||||||
-- debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p
|
|
||||||
void $ putBlock sto bs
|
|
||||||
atomically do
|
|
||||||
writeTQueue parseQ (h, bs)
|
|
||||||
modifyTVar fuckup (HM.insertWith (<>) h (HS.singleton p))
|
|
||||||
|
|
||||||
BlockFetchError -> do
|
|
||||||
now <- getTimeCoarse
|
|
||||||
debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p
|
|
||||||
atomically do
|
|
||||||
modifyTVar sizeRq (HPSQ.insert h now ())
|
|
||||||
modifyTVar choosen (HS.delete h)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
let blkz = readTVarIO choosen <&> fmap (,10) . HS.toList
|
|
||||||
polling (Polling 1 1) blkz $ \h -> do
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
if here then do
|
|
||||||
liftIO $ deleteBlockFromWip h
|
|
||||||
else do
|
|
||||||
now <- getTimeCoarse
|
|
||||||
atomically do
|
|
||||||
modifyTVar sizeRq (HPSQ.insert h now ())
|
|
||||||
modifyTVar choosen (HS.delete h)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
let blkz = readTVarIO wip <&> fmap (,10) . HM.keys
|
|
||||||
polling (Polling 1 1) blkz $ \h -> do
|
|
||||||
|
|
||||||
debug $ "CLEANUP BLOCK" <+> pretty h
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
|
|
||||||
when here $ do
|
|
||||||
liftIO $ deleteBlockFromWip h
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
pause @'Seconds 10
|
|
||||||
w <- readTVarIO downWip <&> HPSQ.size
|
|
||||||
when (w == 0) do
|
|
||||||
atomically $ writeTVar choosen mempty
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever $ (>> pause @'Seconds 300) do
|
|
||||||
atomically $ writeTVar fuckup mempty
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do
|
|
||||||
atomically do
|
|
||||||
w <- readTVar wip
|
|
||||||
cs <- readTVar sizeCache <&> HM.toList
|
|
||||||
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ])
|
|
||||||
|
|
||||||
ContT $ withAsync do
|
|
||||||
let blkz = readTVarIO wip <&> HM.toList
|
|
||||||
polling (Polling 1 1) blkz $ \h -> do
|
|
||||||
debug $ "POLL BLOCK" <+> pretty h
|
|
||||||
atomically $ modifyTVar wip (HM.adjust (*1.10) h)
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
now <- getTimeCoarse
|
|
||||||
unless here $ atomically do
|
|
||||||
modifyTVar sizeRq (HPSQ.insert h now ())
|
|
||||||
modifyTVar choosen (HS.delete h)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
(h,_) <- atomically $ readTQueue parseQ
|
|
||||||
missed <- findMissedBlocks sto h
|
|
||||||
now <- getTimeCoarse
|
|
||||||
for_ (HS.fromList missed) $ \h -> do
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
unless here do
|
|
||||||
atomically $ modifyTVar sizeRq (HPSQ.insert h now ())
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
|
|
||||||
reqs <- atomically do
|
|
||||||
xs <- readTVar sizeRq <&> HPSQ.toList
|
|
||||||
when (L.null xs) retry
|
|
||||||
writeTVar sizeRq HPSQ.empty
|
|
||||||
pure [ h | (h,_,_) <- xs ]
|
|
||||||
|
|
||||||
peers <- readTVarIO pts <&> HM.toList
|
|
||||||
|
|
||||||
let tasks = [ (p, w, h) | (p,(_,w)) <- peers, h <- reqs ]
|
|
||||||
|
|
||||||
now <- getTimeCoarse
|
|
||||||
for_ tasks $ \(p, w, h) -> do
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
full <- isFullTBQueue w
|
|
||||||
unless full do
|
|
||||||
writeTBQueue w (RequestSize h (onBlockSize p h))
|
|
||||||
modifyTVar sizeRqWip (HM.insert (p,h) now)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
|
|
||||||
(w1,dw,peerz) <- atomically do
|
|
||||||
|
|
||||||
peers0 <- readTVar pts
|
|
||||||
|
|
||||||
peers1 <- for (HM.toList peers0) $ \e@(_,(_,q)) -> do
|
|
||||||
full <- isFullTBQueue q
|
|
||||||
if full then do
|
|
||||||
pure (Left e)
|
|
||||||
else do
|
|
||||||
pure (Right e)
|
|
||||||
|
|
||||||
let peers = HM.fromList (rights peers1)
|
|
||||||
|
|
||||||
let n = HM.size peers
|
|
||||||
|
|
||||||
when (n == 0) retry
|
|
||||||
|
|
||||||
already <- readTVar choosen
|
|
||||||
|
|
||||||
dw <- readTVar downWip
|
|
||||||
|
|
||||||
let total = [ x
|
|
||||||
| x@((p,h),_,_) <- HPSQ.toList dw
|
|
||||||
, HM.member p peers
|
|
||||||
, not (HS.member h already)
|
|
||||||
] & L.take 100
|
|
||||||
|
|
||||||
when (L.null total) retry
|
|
||||||
|
|
||||||
let queue = total
|
|
||||||
|
|
||||||
let qq = [ (h, HS.singleton p)
|
|
||||||
| ((p,h),_,_) <- queue
|
|
||||||
] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>)
|
|
||||||
|
|
||||||
when (L.null qq) retry
|
|
||||||
|
|
||||||
for_ total $ \(k,_,_) -> do
|
|
||||||
modifyTVar downWip (HPSQ.delete k)
|
|
||||||
|
|
||||||
pure (qq,dw,peers)
|
|
||||||
|
|
||||||
for_ w1 $ \(h,pps) -> do
|
|
||||||
i <- randomIO @Int <&> (`mod` V.length pps) . abs
|
|
||||||
debug $ blue "CHOOSIN PEER" <+> pretty h <+> pretty i <+> pretty (V.length pps)
|
|
||||||
flip runContT pure do
|
|
||||||
|
|
||||||
p0 <- ContT $ maybe1 (pps !? i) (warn $ red "FUCKED PEER!")
|
|
||||||
|
|
||||||
(_,who) <- ContT $ maybe1 (HM.lookup p0 peerz) (warn $ red "FUCKED QUEUE")
|
|
||||||
|
|
||||||
(_,size) <- ContT $ maybe1 (HPSQ.lookup (p0,h) dw) (warn $ red "FUCKED SIZE")
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
choo <- readTVar choosen <&> HS.member h
|
|
||||||
unless choo do
|
|
||||||
full <- isFullTBQueue who
|
|
||||||
if full then do
|
|
||||||
-- FIXME: wtf?
|
|
||||||
modifyTVar downWip (HPSQ.insert (p0,h) 0.01 size)
|
|
||||||
else do
|
|
||||||
writeTBQueue who (FetchBlock h size (onBlock p0 h))
|
|
||||||
modifyTVar choosen (HS.insert h)
|
|
||||||
|
|
||||||
forever $ (>> pause @'Seconds 10) do
|
forever $ (>> pause @'Seconds 10) do
|
||||||
sw0 <- readTVarIO wip <&> HM.size
|
sw0 <- readTVarIO wip <&> HPSQ.size
|
||||||
srqw <- readTVarIO sizeRqWip <&> HM.size
|
|
||||||
sdw <- readTVarIO downWip <&> HPSQ.size
|
|
||||||
scsize <- readTVarIO sizeCache <&> HM.size
|
|
||||||
chooSize <- readTVarIO choosen <&> HS.size
|
|
||||||
|
|
||||||
fuck <- readTVarIO fuckup <&> HM.toList
|
|
||||||
let fucks = [ 1 | (h,p) <- fuck, HS.size p > 1 ] & sum
|
|
||||||
|
|
||||||
debug $ yellow $ "wip0" <+> pretty sw0
|
debug $ yellow $ "wip0" <+> pretty sw0
|
||||||
<+> "wip1:" <+> pretty srqw
|
|
||||||
<+> "wip2" <+> pretty sdw
|
|
||||||
<+> "wip3" <+> pretty chooSize
|
|
||||||
<+> "sizes" <+> pretty scsize
|
|
||||||
<+> "fuckup" <+> pretty fucks
|
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
manageThreads wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
peers <- getKnownPeers @e <&> HS.fromList
|
peers <- getKnownPeers @e <&> HS.fromList
|
||||||
|
@ -834,9 +642,8 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
for_ peers $ \p -> do
|
for_ peers $ \p -> do
|
||||||
here <- readTVarIO pts <&> HM.member p
|
here <- readTVarIO pts <&> HM.member p
|
||||||
unless here do
|
unless here do
|
||||||
work <- newTBQueueIO (16*1024*10)
|
a <- async (peerThread p wip)
|
||||||
a <- async (peerThread stat p work)
|
atomically $ modifyTVar pts (HM.insert p a)
|
||||||
atomically $ modifyTVar pts (HM.insert p (a,work))
|
|
||||||
|
|
||||||
loosers <- atomically do
|
loosers <- atomically do
|
||||||
xs <- readTVar pts <&> HM.toList
|
xs <- readTVar pts <&> HM.toList
|
||||||
|
@ -845,24 +652,50 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
writeTVar pts (HM.fromList alive)
|
writeTVar pts (HM.fromList alive)
|
||||||
pure dead
|
pure dead
|
||||||
|
|
||||||
mapM_ cancel (fmap (fst.snd) loosers)
|
mapM_ cancel (fmap snd loosers)
|
||||||
|
|
||||||
peerThread stat p work = flip runContT pure do
|
peerThread p wip = flip runContT pure do
|
||||||
|
|
||||||
btimes <- newTVarIO ( mempty :: [Double] )
|
btimes <- newTVarIO ( mempty :: [Double] )
|
||||||
|
|
||||||
|
_errors <- newTVarIO 0
|
||||||
_avg <- newTVarIO 600
|
_avg <- newTVarIO 600
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
||||||
|
|
||||||
bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.10 0.25
|
bm <- liftIO $ newBurstMachine 2.5 256 (Just 50) 0.10 0.25
|
||||||
|
|
||||||
void $ ContT $ bracket none $ const do
|
void $ ContT $ bracket none $ const do
|
||||||
debug $ "Cancelling thread for" <+> pretty p
|
debug $ "Cancelling thread for" <+> pretty p
|
||||||
|
|
||||||
debug $ yellow "Started thread for" <+> pretty p
|
debug $ yellow "Started thread for" <+> pretty p
|
||||||
|
|
||||||
|
-- TODO: sweep size cache!
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever $ (>> pause @'Seconds 60) do
|
||||||
|
atomically $ writeTVar _errors 0
|
||||||
|
|
||||||
|
tsweep <- ContT $ withAsync do
|
||||||
|
let hashes = readTVarIO _sizeCache <&> fmap (,60) . HM.keys
|
||||||
|
polling (Polling 1 10) hashes $ \h -> do
|
||||||
|
atomically do
|
||||||
|
here <- readTVar wip <&> HPSQ.member h
|
||||||
|
unless here do
|
||||||
|
modifyTVar _sizeCache (HM.delete h)
|
||||||
|
|
||||||
|
parseQ <- newTQueueIO
|
||||||
|
|
||||||
|
tparse <- ContT $ withAsync $ forever do
|
||||||
|
what <- atomically $ readTQueue parseQ
|
||||||
|
missed <- findMissedBlocks sto what
|
||||||
|
now <- getTimeCoarse
|
||||||
|
atomically do
|
||||||
|
for_ missed $ \hi -> do
|
||||||
|
dcb <- newDcbSTM now
|
||||||
|
modifyTVar wip (HPSQ.insert hi 1.0 dcb)
|
||||||
|
|
||||||
bmt <- ContT $ withAsync $ runBurstMachine bm
|
bmt <- ContT $ withAsync $ runBurstMachine bm
|
||||||
|
|
||||||
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
|
||||||
|
@ -870,33 +703,87 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
unless (L.null tss) do
|
unless (L.null tss) do
|
||||||
let avg = sum tss / realToFrac (L.length tss)
|
let avg = sum tss / realToFrac (L.length tss)
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar stat (HM.insert p avg)
|
|
||||||
writeTVar _avg avg
|
writeTVar _avg avg
|
||||||
|
|
||||||
twork <- ContT $ withAsync $ forever do
|
twork <- ContT $ withAsync $ forever do
|
||||||
w <- atomically $ readTBQueue work
|
|
||||||
|
|
||||||
case w of
|
flip fix PChoose $ \go -> \case
|
||||||
RequestSize h answ -> do
|
|
||||||
here <- hasBlock sto (coerce h)
|
|
||||||
case here of
|
|
||||||
Just s -> liftIO (answ (Just s))
|
|
||||||
Nothing -> do
|
|
||||||
s <- queryBlockSizeFromPeer brains env (coerce h) p
|
|
||||||
case s of
|
|
||||||
Left{} -> liftIO (answ Nothing)
|
|
||||||
Right s -> liftIO (answ s)
|
|
||||||
|
|
||||||
FetchBlock h s answ -> flip fix 0 $ \next i -> do
|
PChoose -> do
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
if here then do
|
what <- atomically do
|
||||||
liftIO $ answ BlockAlreadyHere
|
blocks <- readTVar wip <&> HPSQ.toList
|
||||||
|
let todo = blocks
|
||||||
|
flip fix todo $ \loop w -> do
|
||||||
|
erno <- readTVar _errors
|
||||||
|
if erno > 10 then
|
||||||
|
pure Nothing
|
||||||
else do
|
else do
|
||||||
-- debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p
|
case w of
|
||||||
|
[] -> retry
|
||||||
|
(h,_,dcb):xs -> do
|
||||||
|
wpsize <- readTVar wip <&> HPSQ.size
|
||||||
|
let trsh = if wpsize < 10 then 3 else 0
|
||||||
|
busy <- readTVar (dcbBusy dcb)
|
||||||
|
down <- readTVar (dcbDownloaded dcb)
|
||||||
|
absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h
|
||||||
|
if busy > trsh || down || absent then
|
||||||
|
loop xs
|
||||||
|
else do
|
||||||
|
modifyTVar (dcbBusy dcb) succ
|
||||||
|
pure $ Just (h,dcb)
|
||||||
|
|
||||||
|
case what of
|
||||||
|
Just (hx, dcb) -> go (PInit hx dcb)
|
||||||
|
Nothing -> do
|
||||||
|
erno <- readTVarIO _errors
|
||||||
|
if erno > 50 then do
|
||||||
|
pause @'Seconds 60
|
||||||
|
else do
|
||||||
|
pause @'Seconds 1.0
|
||||||
|
go PChoose
|
||||||
|
|
||||||
|
PInit hx dcb -> do
|
||||||
|
|
||||||
|
debug $ yellow "Block choosen" <+> pretty p <+> pretty hx
|
||||||
|
|
||||||
|
hereSize <- readTVarIO _sizeCache <&> HM.lookup hx
|
||||||
|
|
||||||
|
case hereSize of
|
||||||
|
Just (Just size) -> do
|
||||||
|
go (PFetchBlock hx dcb size)
|
||||||
|
|
||||||
|
Just Nothing -> do
|
||||||
|
debug $ blue "Release block" <+> pretty p <+> pretty hx
|
||||||
|
go (PReleaseBlock hx dcb False)
|
||||||
|
|
||||||
|
Nothing -> do
|
||||||
|
debug $ blue "Query size" <+> pretty p <+> pretty hx
|
||||||
|
go (PQuerySize hx dcb)
|
||||||
|
|
||||||
|
PQuerySize hx dcb -> do
|
||||||
|
s <- queryBlockSizeFromPeer brains env (coerce hx) p
|
||||||
|
case s of
|
||||||
|
Right (Just size) -> do
|
||||||
|
debug $ green "HAS BLOCK" <+> pretty p <+> pretty hx <+> pretty size
|
||||||
|
atomically $ modifyTVar _sizeCache (HM.insert hx (Just size))
|
||||||
|
go (PFetchBlock hx dcb size)
|
||||||
|
|
||||||
|
Right Nothing -> do
|
||||||
|
debug $ red "HAS NO BLOCK" <+> pretty p <+> pretty hx
|
||||||
|
atomically $ modifyTVar _sizeCache (HM.insert hx Nothing)
|
||||||
|
go (PReleaseBlock hx dcb False)
|
||||||
|
|
||||||
|
Left{} -> do
|
||||||
|
atomically $ modifyTVar _errors succ
|
||||||
|
go (PReleaseBlock hx dcb False)
|
||||||
|
|
||||||
|
PFetchBlock hx dcb size -> do
|
||||||
|
|
||||||
bu <- lift $ getCurrentBurst bm
|
bu <- lift $ getCurrentBurst bm
|
||||||
|
|
||||||
t0 <- getTimeCoarse
|
t0 <- getTimeCoarse
|
||||||
r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p
|
r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize size) env (coerce hx) p
|
||||||
t1 <- getTimeCoarse
|
t1 <- getTimeCoarse
|
||||||
|
|
||||||
case r of
|
case r of
|
||||||
|
@ -905,18 +792,33 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
avg <- readTVarIO _avg
|
avg <- readTVarIO _avg
|
||||||
|
|
||||||
when (dtsec > avg) do
|
when (dtsec > avg * 1.05) do
|
||||||
burstMachineAddErrors bm 1
|
burstMachineAddErrors bm 1
|
||||||
|
|
||||||
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
|
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
|
||||||
liftIO $ answ (BlockFetched bs)
|
atomically $ writeTVar (dcbDownloaded dcb) True
|
||||||
|
|
||||||
Left{} | i >= 2 -> liftIO $ answ BlockFetchError
|
go (PReleaseBlock hx dcb True)
|
||||||
| otherwise -> next (succ i)
|
|
||||||
|
Left e -> do
|
||||||
|
burstMachineAddErrors bm 1
|
||||||
|
atomically $ modifyTVar _errors succ
|
||||||
|
debug $ red "BLOCK DOWNLOAD FUCKED" <+> pretty p <+> pretty hx <+> viaShow e
|
||||||
|
|
||||||
|
go (PReleaseBlock hx dcb False)
|
||||||
|
|
||||||
|
PReleaseBlock hx dcb done -> do
|
||||||
|
atomically do
|
||||||
|
if not done then do
|
||||||
|
modifyTVar (dcbBusy dcb) pred
|
||||||
|
else do
|
||||||
|
-- modifyTVar (dcbBusy dcb) pred
|
||||||
|
modifyTVar wip (HPSQ.delete hx)
|
||||||
|
writeTQueue parseQ hx
|
||||||
|
|
||||||
bs <- ContT $ withAsync $ forever do
|
bs <- ContT $ withAsync $ forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
debug $ yellow "I'm thread" <+> pretty p
|
debug $ yellow "I'm thread" <+> pretty p
|
||||||
|
|
||||||
void $ waitAnyCatchCancel [bmt,bs,twork,tstat]
|
void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep,tparse]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue