mirror of https://github.com/voidlizard/hbs2
not good
This commit is contained in:
parent
32d8a3b987
commit
c2d9f5cfef
|
@ -554,8 +554,8 @@ instance BlockSizeCache e KnownSize where
|
||||||
|
|
||||||
|
|
||||||
data Work e =
|
data Work e =
|
||||||
RequestSize (Maybe (Peer e)) HashRef
|
RequestSize HashRef (Maybe (Peer e))
|
||||||
| FetchBlock (Maybe (Peer e)) HashRef
|
| FetchBlock HashRef (Maybe (Peer e))
|
||||||
deriving stock (Generic)
|
deriving stock (Generic)
|
||||||
|
|
||||||
deriving stock instance Eq (Peer e) => Eq (Work e)
|
deriving stock instance Eq (Peer e) => Eq (Work e)
|
||||||
|
@ -571,7 +571,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()))
|
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()))
|
||||||
tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double Int )
|
tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
|
||||||
|
|
||||||
void $ ContT $ withAsync $ manageThreads tasks pts
|
void $ ContT $ withAsync $ manageThreads tasks pts
|
||||||
|
|
||||||
|
@ -582,15 +582,49 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
here <- hasBlock sto h <&> isJust
|
here <- hasBlock sto h <&> isJust
|
||||||
unless here do
|
unless here do
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar tasks (HPSQ.insert (RequestSize Nothing (HashRef h)) 1.0 0)
|
addNewTaskSTM tasks (RequestSize (HashRef h) Nothing) 1.0
|
||||||
|
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
|
|
||||||
|
ContT $ withAsync do
|
||||||
|
|
||||||
|
-- atomically do
|
||||||
|
-- tss <- readTVar tasks <&> HPSQ.toList
|
||||||
|
|
||||||
|
-- let tssAlive = flip L.filter tss $ \case
|
||||||
|
-- (RequestSize b _,_,_) | b == blk -> False
|
||||||
|
-- (FetchBlock b _,_,_) | b == blk -> False
|
||||||
|
-- _ -> True
|
||||||
|
|
||||||
|
-- writeTVar tasks (HPSQ.fromList tssAlive)
|
||||||
|
|
||||||
|
pause @'Seconds 5
|
||||||
|
|
||||||
forever $ (>> pause @'Seconds 10) do
|
forever $ (>> pause @'Seconds 10) do
|
||||||
size <- atomically $ readTVar tasks <&> HPSQ.size
|
size <- atomically $ readTVar tasks <&> HPSQ.size
|
||||||
debug $ yellow $ "wip:" <+> pretty size
|
debug $ yellow $ "wip:" <+> pretty size
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
|
addNewTaskSTM tasks k w = do
|
||||||
|
t <- newTVar 0
|
||||||
|
modifyTVar tasks (HPSQ.insert k w t)
|
||||||
|
|
||||||
|
delTaskSTM tasks k = modifyTVar tasks (HPSQ.delete k)
|
||||||
|
|
||||||
|
releaseTask v = atomically do
|
||||||
|
writeTVar v 0
|
||||||
|
|
||||||
|
seizeTask tasks k = atomically do
|
||||||
|
w <- readTVar tasks <&> HPSQ.lookup k
|
||||||
|
maybe1 w (pure Nothing) $ \(_,v) -> do
|
||||||
|
vv <- readTVar v
|
||||||
|
if vv > 0 then do
|
||||||
|
pure Nothing
|
||||||
|
else do
|
||||||
|
writeTVar v 1
|
||||||
|
pure (Just v)
|
||||||
|
|
||||||
manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
|
@ -631,39 +665,42 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
pt <- ContT $ withAsync $ forever do
|
pt <- ContT $ withAsync $ forever do
|
||||||
(hr,bs) <- atomically $ readTQueue parseQ
|
(hr,bs) <- atomically $ readTQueue parseQ
|
||||||
let refs = extractBlockRefs (coerce hr) bs
|
let refs = extractBlockRefs (coerce hr) bs
|
||||||
atomically do
|
for_ refs $ \r -> atomically do
|
||||||
for_ refs $ \r -> do
|
addNewTaskSTM tasks (RequestSize r (Just p)) 0.1
|
||||||
modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0)
|
addNewTaskSTM tasks (RequestSize r Nothing) 1
|
||||||
modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0)
|
|
||||||
|
|
||||||
miss <- findMissedBlocks sto hr
|
miss <- findMissedBlocks sto hr
|
||||||
atomically do
|
atomically do
|
||||||
for_ miss $ \r -> do
|
for_ miss $ \r -> do
|
||||||
modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0)
|
addNewTaskSTM tasks (RequestSize r (Just p)) 0.1
|
||||||
modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0)
|
addNewTaskSTM tasks (RequestSize r Nothing) 1
|
||||||
|
|
||||||
tt <- ContT $ withAsync $ forever $ flip runContT pure do
|
tt <- ContT $ withAsync $ forever $ flip runContT pure do
|
||||||
|
|
||||||
callCC \again -> do
|
callCC \again -> callCC \again2 -> do
|
||||||
|
|
||||||
q <- readTVarIO tasks <&> HPSQ.keys
|
q <- readTVarIO tasks <&> HPSQ.keys
|
||||||
|
|
||||||
|
when (L.null q) do
|
||||||
|
pause @'Seconds 1
|
||||||
|
again ()
|
||||||
|
|
||||||
for_ q $ \case
|
for_ q $ \case
|
||||||
|
|
||||||
RequestSize (Just who) blk | who == p -> do
|
RequestSize blk (Just who) | who == p -> do
|
||||||
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
||||||
case r of
|
case r of
|
||||||
Left{} -> do
|
Left{} -> do
|
||||||
atomically $ modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
|
atomically $ delTaskSTM tasks (RequestSize blk (Just p))
|
||||||
again ()
|
again ()
|
||||||
|
|
||||||
Right s -> do
|
Right s -> do
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar _sizeCache (HM.insert blk s)
|
modifyTVar _sizeCache (HM.insert blk s)
|
||||||
modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
|
delTaskSTM tasks (RequestSize blk (Just p))
|
||||||
modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0)
|
addNewTaskSTM tasks (FetchBlock blk (Just p)) 1
|
||||||
|
|
||||||
RequestSize Nothing blk -> do
|
RequestSize blk Nothing -> do
|
||||||
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p
|
||||||
case r of
|
case r of
|
||||||
Left{} -> again ()
|
Left{} -> again ()
|
||||||
|
@ -671,22 +708,33 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
Right s -> do
|
Right s -> do
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar _sizeCache (HM.insert blk s)
|
modifyTVar _sizeCache (HM.insert blk s)
|
||||||
modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk))
|
delTaskSTM tasks (RequestSize blk (Just p))
|
||||||
modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0)
|
addNewTaskSTM tasks (FetchBlock blk (Just p)) 1
|
||||||
|
|
||||||
FetchBlock w blk -> do
|
FetchBlock blk w -> do
|
||||||
s <- atomically $ readTVar _sizeCache <&> HM.lookup blk
|
s <- atomically $ readTVar _sizeCache <&> HM.lookup blk
|
||||||
|
|
||||||
case s of
|
case s of
|
||||||
Just (Just size) -> do
|
Just (Just size) -> do
|
||||||
|
|
||||||
debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p
|
debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p
|
||||||
atomically $ modifyTVar tasks (HPSQ.delete (FetchBlock w blk))
|
|
||||||
|
here <- hasBlock sto (coerce blk) <&> isJust
|
||||||
|
|
||||||
|
when here $ again ()
|
||||||
|
|
||||||
|
seized' <- seizeTask tasks (FetchBlock blk w)
|
||||||
|
|
||||||
|
seized <- case seized' of
|
||||||
|
Nothing -> again2 ()
|
||||||
|
Just x -> pure x
|
||||||
|
|
||||||
bu <- lift $ getCurrentBurst bm
|
bu <- lift $ getCurrentBurst bm
|
||||||
what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p
|
what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p
|
||||||
case what of
|
case what of
|
||||||
Left{} -> do
|
Left{} -> do
|
||||||
lift $ burstMachineAddErrors bm 1
|
lift $ burstMachineAddErrors bm 1
|
||||||
atomically $ modifyTVar tasks (HPSQ.insert (FetchBlock w blk) 1 0)
|
releaseTask seized
|
||||||
again ()
|
again ()
|
||||||
|
|
||||||
Right bs -> do
|
Right bs -> do
|
||||||
|
@ -694,8 +742,8 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
tss <- readTVar tasks <&> HPSQ.toList
|
tss <- readTVar tasks <&> HPSQ.toList
|
||||||
|
|
||||||
let tssAlive = flip L.filter tss $ \case
|
let tssAlive = flip L.filter tss $ \case
|
||||||
(RequestSize _ b,_,_) | b == blk -> False
|
(RequestSize b _,_,_) | b == blk -> False
|
||||||
(FetchBlock _ b,_,_) | b == blk -> False
|
(FetchBlock b _,_,_) | b == blk -> False
|
||||||
_ -> True
|
_ -> True
|
||||||
|
|
||||||
writeTVar tasks (HPSQ.fromList tssAlive)
|
writeTVar tasks (HPSQ.fromList tssAlive)
|
||||||
|
|
Loading…
Reference in New Issue