diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 49fdcdaf..641f0c69 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -554,8 +554,8 @@ instance BlockSizeCache e KnownSize where data Work e = - RequestSize (Maybe (Peer e)) HashRef - | FetchBlock (Maybe (Peer e)) HashRef + RequestSize HashRef (Maybe (Peer e)) + | FetchBlock HashRef (Maybe (Peer e)) deriving stock (Generic) deriving stock instance Eq (Peer e) => Eq (Work e) @@ -571,7 +571,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto downloadDispatcher brains env = flip runContT pure do pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ())) - tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double Int ) + tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) void $ ContT $ withAsync $ manageThreads tasks pts @@ -582,15 +582,49 @@ downloadDispatcher brains env = flip runContT pure do here <- hasBlock sto h <&> isJust unless here do atomically do - modifyTVar tasks (HPSQ.insert (RequestSize Nothing (HashRef h)) 1.0 0) + addNewTaskSTM tasks (RequestSize (HashRef h) Nothing) 1.0 debug $ green "New download request" <+> pretty h + ContT $ withAsync do + + -- atomically do + -- tss <- readTVar tasks <&> HPSQ.toList + + -- let tssAlive = flip L.filter tss $ \case + -- (RequestSize b _,_,_) | b == blk -> False + -- (FetchBlock b _,_,_) | b == blk -> False + -- _ -> True + + -- writeTVar tasks (HPSQ.fromList tssAlive) + + pause @'Seconds 5 + forever $ (>> pause @'Seconds 10) do size <- atomically $ readTVar tasks <&> HPSQ.size debug $ yellow $ "wip:" <+> pretty size where + + addNewTaskSTM tasks k w = do + t <- newTVar 0 + modifyTVar tasks (HPSQ.insert k w t) + + delTaskSTM tasks k = modifyTVar tasks (HPSQ.delete k) + + releaseTask v = atomically do + writeTVar v 0 + + seizeTask tasks k = atomically do + w <- readTVar tasks <&> HPSQ.lookup k + maybe1 w (pure Nothing) $ \(_,v) -> do + vv <- readTVar v + if vv > 0 then do + pure Nothing + else do + writeTVar v 1 + pure (Just v) + manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" @@ -631,39 +665,42 @@ downloadDispatcher brains env = flip runContT pure do pt <- ContT $ withAsync $ forever do (hr,bs) <- atomically $ readTQueue parseQ let refs = extractBlockRefs (coerce hr) bs - atomically do - for_ refs $ \r -> do - modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0) - modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0) + for_ refs $ \r -> atomically do + addNewTaskSTM tasks (RequestSize r (Just p)) 0.1 + addNewTaskSTM tasks (RequestSize r Nothing) 1 miss <- findMissedBlocks sto hr atomically do for_ miss $ \r -> do - modifyTVar tasks (HPSQ.insert (RequestSize (Just p) r) 0.1 0) - modifyTVar tasks (HPSQ.insert (RequestSize Nothing r) 1 0) + addNewTaskSTM tasks (RequestSize r (Just p)) 0.1 + addNewTaskSTM tasks (RequestSize r Nothing) 1 tt <- ContT $ withAsync $ forever $ flip runContT pure do - callCC \again -> do + callCC \again -> callCC \again2 -> do q <- readTVarIO tasks <&> HPSQ.keys + when (L.null q) do + pause @'Seconds 1 + again () + for_ q $ \case - RequestSize (Just who) blk | who == p -> do + RequestSize blk (Just who) | who == p -> do r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p case r of Left{} -> do - atomically $ modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk)) + atomically $ delTaskSTM tasks (RequestSize blk (Just p)) again () Right s -> do atomically do modifyTVar _sizeCache (HM.insert blk s) - modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk)) - modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0) + delTaskSTM tasks (RequestSize blk (Just p)) + addNewTaskSTM tasks (FetchBlock blk (Just p)) 1 - RequestSize Nothing blk -> do + RequestSize blk Nothing -> do r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p case r of Left{} -> again () @@ -671,22 +708,33 @@ downloadDispatcher brains env = flip runContT pure do Right s -> do atomically do modifyTVar _sizeCache (HM.insert blk s) - modifyTVar tasks (HPSQ.delete (RequestSize (Just p) blk)) - modifyTVar tasks (HPSQ.insert (FetchBlock (Just p) blk) 1 0) + delTaskSTM tasks (RequestSize blk (Just p)) + addNewTaskSTM tasks (FetchBlock blk (Just p)) 1 - FetchBlock w blk -> do + FetchBlock blk w -> do s <- atomically $ readTVar _sizeCache <&> HM.lookup blk case s of Just (Just size) -> do + debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p - atomically $ modifyTVar tasks (HPSQ.delete (FetchBlock w blk)) + + here <- hasBlock sto (coerce blk) <&> isJust + + when here $ again () + + seized' <- seizeTask tasks (FetchBlock blk w) + + seized <- case seized' of + Nothing -> again2 () + Just x -> pure x + bu <- lift $ getCurrentBurst bm what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p case what of Left{} -> do lift $ burstMachineAddErrors bm 1 - atomically $ modifyTVar tasks (HPSQ.insert (FetchBlock w blk) 1 0) + releaseTask seized again () Right bs -> do @@ -694,8 +742,8 @@ downloadDispatcher brains env = flip runContT pure do tss <- readTVar tasks <&> HPSQ.toList let tssAlive = flip L.filter tss $ \case - (RequestSize _ b,_,_) | b == blk -> False - (FetchBlock _ b,_,_) | b == blk -> False + (RequestSize b _,_,_) | b == blk -> False + (FetchBlock b _,_,_) | b == blk -> False _ -> True writeTVar tasks (HPSQ.fromList tssAlive)