This commit is contained in:
voidlizard 2024-11-11 05:54:31 +03:00
parent c134ae1a6f
commit 9bc8e0e685
1 changed files with 71 additions and 23 deletions

View File

@ -554,8 +554,8 @@ instance BlockSizeCache e KnownSize where
data Work e =
RequestSize (Maybe (Peer e)) HashRef
| FetchBlock (Maybe (Peer e)) HashRef
RequestSize HashRef (Maybe (Peer e))
| FetchBlock HashRef (Maybe (Peer e))
deriving stock (Generic)
deriving stock instance Eq (Peer e) => Eq (Work e)
@ -571,7 +571,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
downloadDispatcher brains env = flip runContT pure do
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()))
tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double Int )
tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
void $ ContT $ withAsync $ manageThreads tasks pts
@ -582,15 +582,49 @@ downloadDispatcher brains env = flip runContT pure do
here <- hasBlock sto h <&> isJust
unless here do
atomically do
modifyTVar tasks (HPSQ.insert (RequestSize Nothing (HashRef h)) 1.0 0)
addNewTaskSTM tasks (RequestSize (HashRef h) Nothing) 1.0
debug $ green "New download request" <+> pretty h
ContT $ withAsync do
-- atomically do
-- tss <- readTVar tasks <&> HPSQ.toList
-- let tssAlive = flip L.filter tss $ \case
-- (RequestSize b _,_,_) | b == blk -> False
-- (FetchBlock b _,_,_) | b == blk -> False
-- _ -> True
-- writeTVar tasks (HPSQ.fromList tssAlive)
pause @'Seconds 5
forever $ (>> pause @'Seconds 10) do
size <- atomically $ readTVar tasks <&> HPSQ.size
debug $ yellow $ "wip:" <+> pretty size
where
addNewTaskSTM tasks k w = do
t <- newTVar 0
modifyTVar tasks (HPSQ.insert k w t)
delTaskSTM tasks k = modifyTVar tasks (HPSQ.delete k)
releaseTask v = atomically do
writeTVar v 0
seizeTask tasks k = atomically do
w <- readTVar tasks <&> HPSQ.lookup k
maybe1 w (pure Nothing) $ \(_,v) -> do
vv <- readTVar v
if vv > 0 then do
pure Nothing
else do
writeTVar v 1
pure (Just v)
manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS"
@ -631,39 +665,42 @@ downloadDispatcher brains env = flip runContT pure do
pt <- ContT $ withAsync $ forever do
(hr,bs) <- atomically $ readTQueue parseQ
let refs = extractBlockRefs (coerce hr) bs
atomically do
for_ refs $ \r -> do
modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0)
modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0)
for_ refs $ \r -> atomically do
addNewTaskSTM tasks (RequestSize r (Just p)) 0.1
addNewTaskSTM tasks (RequestSize r Nothing) 1
miss <- findMissedBlocks sto hr
atomically do
for_ miss $ \r -> do
modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0)
modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0)
addNewTaskSTM tasks (RequestSize r (Just p)) 0.1
addNewTaskSTM tasks (RequestSize r Nothing) 1
tt <- ContT $ withAsync $ forever $ flip runContT pure do
callCC \again -> do
callCC \again -> callCC \again2 -> do
q <- readTVarIO tasks <&> HPSQ.keys
when (L.null q) do
pause @'Seconds 1
again ()
for_ q $ \case
RequestSize (Just who) blk | who == p -> do
RequestSize blk (Just who) | who == p -> do
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
case r of
Left{} -> do
atomically $ modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
atomically $ delTaskSTM tasks (RequestSize blk (Just p))
again ()
Right s -> do
atomically do
modifyTVar _sizeCache (HM.insert blk s)
modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0)
delTaskSTM tasks (RequestSize blk (Just p))
addNewTaskSTM tasks (FetchBlock blk (Just p)) 1
RequestSize Nothing blk -> do
RequestSize blk Nothing -> do
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
case r of
Left{} -> again ()
@ -671,22 +708,33 @@ downloadDispatcher brains env = flip runContT pure do
Right s -> do
atomically do
modifyTVar _sizeCache (HM.insert blk s)
modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0)
delTaskSTM tasks (RequestSize blk (Just p))
addNewTaskSTM tasks (FetchBlock blk (Just p)) 1
FetchBlock w blk -> do
FetchBlock blk w -> do
s <- atomically $ readTVar _sizeCache <&> HM.lookup blk
case s of
Just (Just size) -> do
debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p
atomically $ modifyTVar tasks (HPSQ.delete (FetchBlock w blk))
here <- hasBlock sto (coerce blk) <&> isJust
when here $ again ()
seized' <- seizeTask tasks (FetchBlock blk w)
seized <- case seized' of
Nothing -> again2 ()
Just x -> pure x
bu <- lift $ getCurrentBurst bm
what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p
case what of
Left{} -> do
lift $ burstMachineAddErrors bm 1
atomically $ modifyTVar tasks (HPSQ.insert (FetchBlock w blk) 1 0)
releaseTask seized
again ()
Right bs -> do
@ -694,8 +742,8 @@ downloadDispatcher brains env = flip runContT pure do
tss <- readTVar tasks <&> HPSQ.toList
let tssAlive = flip L.filter tss $ \case
(RequestSize _ b,_,_) | b == blk -> False
(FetchBlock _ b,_,_) | b == blk -> False
(RequestSize b _,_,_) | b == blk -> False
(FetchBlock b _,_,_) | b == blk -> False
_ -> True
writeTVar tasks (HPSQ.fromList tssAlive)