From ebc749142c1cf32f22cc8f0d6cf5ea2075080e59 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 11 Nov 2024 12:24:24 +0300 Subject: [PATCH] works? --- hbs2-peer/app/BlockDownloadNew.hs | 309 +++++++++++++++++------------- 1 file changed, 171 insertions(+), 138 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 641f0c69..dee2784d 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -506,7 +506,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do blk <- readTVarIO _sBlockChunks2 let rs = LBS.concat $ IntMap.elems blk - ha <- enqueueBlock sto rs + ha <- putBlock sto rs -- let ha = Just $ hashObject @HbSync rs @@ -549,18 +549,17 @@ data S2 = newtype KnownSize = KnownSize Integer instance BlockSizeCache e KnownSize where - cacheBlockSize _ _ _ _ = pure () + cacheBlockSize _ y_ _ _ = pure () findBlockSize (KnownSize s) _ _ = pure (Just s) +data BlockFetchResult = + BlockFetchError + | BlockFetched ByteString + | BlockAlreadyHere -data Work e = - RequestSize HashRef (Maybe (Peer e)) - | FetchBlock HashRef (Maybe (Peer e)) - deriving stock (Generic) - -deriving stock instance Eq (Peer e) => Eq (Work e) -deriving stock instance Ord (Peer e) => Ord (Work e) -instance (Eq (Peer e), Hashable (Peer e)) => Hashable (Work e) +data Work = + RequestSize HashRef (Maybe Integer -> IO ()) + | FetchBlock HashRef Integer (BlockFetchResult -> IO ()) downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m @@ -570,10 +569,17 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ())) - tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue Work)) + -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) - void $ ContT $ withAsync $ manageThreads tasks pts + wip <- newTVarIO ( mempty :: HashMap HashRef NominalDiffTime ) + sizeRq <- newTVarIO ( HPSQ.empty @HashRef @TimeSpec @() ) + sizeRqWip <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) TimeSpec) + sizeCache <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) (Maybe Integer) ) + downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @TimeSpec @Integer ) + choosen <- newTVarIO ( mempty :: HashSet HashRef ) + + void $ ContT $ withAsync $ manageThreads pts sto <- withPeerM env getStorage @@ -581,51 +587,142 @@ downloadDispatcher brains env = flip runContT pure do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do here <- hasBlock sto h <&> isJust unless here do - atomically do - addNewTaskSTM tasks (RequestSize (HashRef h) Nothing) 1.0 - + now <- getTimeCoarse debug $ green "New download request" <+> pretty h + atomically do + modifyTVar sizeRq (HPSQ.insert (HashRef h) now ()) + modifyTVar wip (HM.insert (HashRef h) 10) - ContT $ withAsync do + let onBlockSize p h s = do + let color = if isJust s then green else red + debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p + now <- getTimeCoarse + atomically do + modifyTVar sizeRq (HPSQ.delete h) + modifyTVar sizeCache (HM.insert (p,h) s) + modifyTVar sizeRqWip (HM.delete (p,h)) + maybe1 s none $ \size -> do + modifyTVar downWip (HPSQ.insert (p,h) now size) - -- atomically do - -- tss <- readTVar tasks <&> HPSQ.toList + parseQ <- newTQueueIO - -- let tssAlive = flip L.filter tss $ \case - -- (RequestSize b _,_,_) | b == blk -> False - -- (FetchBlock b _,_,_) | b == blk -> False - -- _ -> True + let + deleteBlockFromWip :: HashRef -> IO () + deleteBlockFromWip h = do + atomically do + modifyTVar wip (HM.delete h) + modifyTVar sizeRq (HPSQ.delete h) + modifyTVar choosen (HS.delete h) + srwq <- readTVar sizeRqWip <&> HM.toList + writeTVar sizeRqWip (HM.fromList $ [ x | x@((_,hi),_) <- srwq, hi /= h ]) + dw <- readTVar downWip <&> HPSQ.toList + writeTVar downWip (HPSQ.fromList $ [ x | x@((_,hi),_,_) <- dw, hi /= h ]) + cs <- readTVar sizeCache <&> HM.toList + writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, hi /= h ]) - -- writeTVar tasks (HPSQ.fromList tssAlive) + let onBlock p h = \case + + BlockAlreadyHere -> do + debug $ yellow "ALREADY HAVE BLOCK" <+> pretty h + deleteBlockFromWip h + + BlockFetched bs -> do + debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p + atomically $ writeTQueue parseQ (h, bs) + deleteBlockFromWip h + + BlockFetchError -> do + now <- getTimeCoarse + debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p + atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) + + ContT $ withAsync $ forever do + let blkz = readTVarIO wip <&> fmap (,10) . HM.keys + polling (Polling 1 1) blkz $ \h -> do + debug $ "CLEANUP BLOCK" <+> pretty h + here <- hasBlock sto (coerce h) <&> isJust + when here $ do + liftIO $ deleteBlockFromWip h + + -- ContT $ withAsync do + -- let blkz = readTVarIO wip <&> HM.toList + -- polling (Polling 1 1) blkz $ \h -> do + -- debug $ "POLL BLOCK" <+> pretty h + -- atomically $ modifyTVar wip (HM.adjust (*1.10) h) + -- here <- hasBlock sto (coerce h) <&> isJust + -- now <- getTimeCoarse + -- unless here $ atomically do + -- modifyTVar sizeRq (HPSQ.insert h now ()) + -- modifyTVar choosen (HS.delete h) + + ContT $ withAsync $ forever do + (h,bs) <- atomically $ readTQueue parseQ + let refs = extractBlockRefs (coerce h) bs + missed <- findMissedBlocks sto h + now <- getTimeCoarse + for_ (HS.fromList ( refs <> missed )) $ \h -> do + here <- hasBlock sto (coerce h) <&> isJust + unless here do + atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) + + ContT $ withAsync $ forever do + + reqs <- atomically do + xs <- readTVar sizeRq <&> HPSQ.toList + when (L.null xs) retry + writeTVar sizeRq HPSQ.empty + pure [ h | (h,_,_) <- xs ] + + peers <- readTVarIO pts <&> HM.toList + + let tasks = [ (p, w, h) | (p,(_,w)) <- peers, h <- reqs ] + + now <- getTimeCoarse + for_ tasks $ \(p, w, h) -> do + + atomically do + writeTQueue w (RequestSize h (onBlockSize p h)) + modifyTVar sizeRqWip (HM.insert (p,h) now) + + ContT $ withAsync $ forever do + atomically do + peers <- readTVar pts + let n = HM.size peers * 20 + + when ( n == 0 ) retry + + (w,rest) <- readTVar downWip <&> L.splitAt n . HPSQ.toList + + writeTVar downWip (HPSQ.fromList rest) + + when (L.null w) retry + + for_ w $ \e@((p,h),prio,size) -> do + case HM.lookup p peers of + Nothing -> modifyTVar downWip (HPSQ.insert (p,h) prio size) + Just (_,q) -> do + here <- readTVar choosen <&> HS.member h + unless here do + writeTQueue q (FetchBlock h size (onBlock p h)) + modifyTVar choosen (HS.insert h) - pause @'Seconds 5 forever $ (>> pause @'Seconds 10) do - size <- atomically $ readTVar tasks <&> HPSQ.size - debug $ yellow $ "wip:" <+> pretty size + sw0 <- readTVarIO wip <&> HM.size + srqw <- readTVarIO sizeRqWip <&> HM.size + sdw <- readTVarIO downWip <&> HPSQ.size + scsize <- readTVarIO sizeCache <&> HM.size + chooSize <- readTVarIO choosen <&> HS.size + debug $ yellow $ "wip0" <+> pretty sw0 + <+> "wip1:" <+> pretty srqw + <+> "wip2" <+> pretty sdw + <+> "wip3" <+> pretty chooSize + <+> "sizes" <+> pretty scsize where - addNewTaskSTM tasks k w = do - t <- newTVar 0 - modifyTVar tasks (HPSQ.insert k w t) - delTaskSTM tasks k = modifyTVar tasks (HPSQ.delete k) - - releaseTask v = atomically do - writeTVar v 0 - - seizeTask tasks k = atomically do - w <- readTVar tasks <&> HPSQ.lookup k - maybe1 w (pure Nothing) $ \(_,v) -> do - vv <- readTVar v - if vv > 0 then do - pure Nothing - else do - writeTVar v 1 - pure (Just v) - - manageThreads tasks pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do + manageThreads pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" peers <- getKnownPeers @e <&> HS.fromList @@ -633,8 +730,9 @@ downloadDispatcher brains env = flip runContT pure do for_ peers $ \p -> do here <- readTVarIO pts <&> HM.member p unless here do - a <- async (peerThread p tasks) - atomically $ modifyTVar pts (HM.insert p a) + work <- newTQueueIO + a <- async (peerThread p work) + atomically $ modifyTVar pts (HM.insert p (a,work)) loosers <- atomically do xs <- readTVar pts <&> HM.toList @@ -643,9 +741,9 @@ downloadDispatcher brains env = flip runContT pure do writeTVar pts (HM.fromList alive) pure dead - mapM_ cancel (fmap snd loosers) + mapM_ cancel (fmap (fst.snd) loosers) - peerThread p tasks = flip runContT pure do + peerThread p work = flip runContT pure do sto <- withPeerM env getStorage @@ -660,102 +758,37 @@ downloadDispatcher brains env = flip runContT pure do bmt <- ContT $ withAsync $ runBurstMachine bm - parseQ <- newTQueueIO @_ @(HashRef, ByteString) - - pt <- ContT $ withAsync $ forever do - (hr,bs) <- atomically $ readTQueue parseQ - let refs = extractBlockRefs (coerce hr) bs - for_ refs $ \r -> atomically do - addNewTaskSTM tasks (RequestSize r (Just p)) 0.1 - addNewTaskSTM tasks (RequestSize r Nothing) 1 - - miss <- findMissedBlocks sto hr - atomically do - for_ miss $ \r -> do - addNewTaskSTM tasks (RequestSize r (Just p)) 0.1 - addNewTaskSTM tasks (RequestSize r Nothing) 1 - - tt <- ContT $ withAsync $ forever $ flip runContT pure do - - callCC \again -> callCC \again2 -> do - - q <- readTVarIO tasks <&> HPSQ.keys - - when (L.null q) do - pause @'Seconds 1 - again () - - for_ q $ \case - - RequestSize blk (Just who) | who == p -> do - r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p - case r of - Left{} -> do - atomically $ delTaskSTM tasks (RequestSize blk (Just p)) - again () - - Right s -> do - atomically do - modifyTVar _sizeCache (HM.insert blk s) - delTaskSTM tasks (RequestSize blk (Just p)) - addNewTaskSTM tasks (FetchBlock blk (Just p)) 1 - - RequestSize blk Nothing -> do - r <- lift $ queryBlockSizeFromPeer brains env (coerce blk) p - case r of - Left{} -> again () - - Right s -> do - atomically do - modifyTVar _sizeCache (HM.insert blk s) - delTaskSTM tasks (RequestSize blk (Just p)) - addNewTaskSTM tasks (FetchBlock blk (Just p)) 1 - - FetchBlock blk w -> do - s <- atomically $ readTVar _sizeCache <&> HM.lookup blk + twork <- ContT $ withAsync $ forever do + w <- atomically $ readTQueue work + case w of + RequestSize h answ -> do + here <- hasBlock sto (coerce h) <&> isJust + unless here do + debug $ yellow "RequestSize" <+> pretty h <+> pretty p + s <- queryBlockSizeFromPeer brains env (coerce h) p case s of - Just (Just size) -> do + Left{} -> none + Right s -> liftIO (answ s) - debug $ "START DOWNLOAD!" <+> pretty blk <+> pretty p + FetchBlock h s answ -> flip fix 0 $ \next i -> do + here <- hasBlock sto (coerce h) <&> isJust + if here then do + liftIO $ answ BlockAlreadyHere + else do + debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p + bu <- lift $ getCurrentBurst bm + r <- lift $ downloadFromPeer (TimeoutSec 30) bu (KnownSize s) env (coerce h) p - here <- hasBlock sto (coerce blk) <&> isJust + case r of + Right bs -> liftIO $ answ (BlockFetched bs) - when here $ again () - - seized' <- seizeTask tasks (FetchBlock blk w) - - seized <- case seized' of - Nothing -> again2 () - Just x -> pure x - - bu <- lift $ getCurrentBurst bm - what <- lift $ downloadFromPeer (TimeoutSec 5) bu brains env (coerce blk) p - case what of - Left{} -> do - lift $ burstMachineAddErrors bm 1 - releaseTask seized - again () - - Right bs -> do - atomically do - tss <- readTVar tasks <&> HPSQ.toList - - let tssAlive = flip L.filter tss $ \case - (RequestSize b _,_,_) | b == blk -> False - (FetchBlock b _,_,_) | b == blk -> False - _ -> True - - writeTVar tasks (HPSQ.fromList tssAlive) - writeTQueue parseQ (blk, bs) - - _ -> none - - _ -> none + Left{} | i >= 5 -> liftIO $ answ BlockFetchError + | otherwise -> next (succ i) bs <- ContT $ withAsync $ forever do pause @'Seconds 10 debug $ yellow "I'm thread" <+> pretty p - void $ waitAnyCatchCancel [bmt,tt,bs] + void $ waitAnyCatchCancel [bmt,bs,twork]