diff --git a/hbs2-core/lib/HBS2/Storage/Operations/Missed.hs b/hbs2-core/lib/HBS2/Storage/Operations/Missed.hs index b8fae40b..a7cc64af 100644 --- a/hbs2-core/lib/HBS2/Storage/Operations/Missed.hs +++ b/hbs2-core/lib/HBS2/Storage/Operations/Missed.hs @@ -13,6 +13,7 @@ import Streaming.Prelude qualified as S import Streaming.Prelude (Stream, Of(..)) import Control.Monad.Trans.Maybe import Control.Monad +import Data.Coerce import Data.Maybe -- TODO: slow-dangerous @@ -39,6 +40,12 @@ findMissedBlocks2 sto href = do maybe1 blk none $ \bs -> do let w = tryDetect (fromHashRef hx) bs + let refs = extractBlockRefs (coerce hx) bs + + for_ refs $ \r -> do + here <- hasBlock sto (coerce r) <&> isJust + unless here $ lift $ S.yield r + r <- case w of Merkle{} -> lift $ lift $ findMissedBlocks sto hx MerkleAnn t -> lift $ lift do diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 49bb4f74..e85e19db 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -483,7 +483,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do let watchdog = fix \next -> do wx <- readTVarIO _wx <&> realToFrac -- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity - r <- race (pause @'MilliSeconds (max wx waity)) do + r <- race (pause @'MilliSeconds (min wx waity)) do void $ atomically $ readTQueue chuQ either (const none) (const next) r @@ -573,7 +573,7 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue Work)) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TBQueue Work)) -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) _blkNum <- newTVarIO 0 @@ -607,6 +607,10 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar sizeRqWip (HM.delete (p,h)) modifyTVar sizeRq (HPSQ.delete h) + l <- atomically do + w <- readTVar pts <&> fmap snd . HM.lookup p + maybe1 w (pure 0) lengthTBQueue + -- let color = if isJust s then green else red -- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p dtt <- randomRIO (-0.05, 0.05) @@ -614,7 +618,7 @@ downloadDispatcher brains env = flip runContT pure do here <- hasBlock sto (coerce h) <&> isJust unless here do dt <- readTVarIO stat <&> (*(1+dtt)) . fromMaybe 1.0 . HM.lookup p - let rate = dt + let rate = dt + realToFrac l atomically do -- blkNum <- stateTVar _blkNum (\x -> (x, succ x)) modifyTVar sizeCache (HM.insert (p,h) s) @@ -708,11 +712,10 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar choosen (HS.delete h) ContT $ withAsync $ forever do - (h,bs) <- atomically $ readTQueue parseQ - let refs = extractBlockRefs (coerce h) bs + (h,_) <- atomically $ readTQueue parseQ missed <- findMissedBlocks sto h now <- getTimeCoarse - for_ (HS.fromList ( refs <> missed )) $ \h -> do + for_ (HS.fromList missed) $ \h -> do here <- hasBlock sto (coerce h) <&> isJust unless here do atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) @@ -733,14 +736,25 @@ downloadDispatcher brains env = flip runContT pure do for_ tasks $ \(p, w, h) -> do atomically do - writeTQueue w (RequestSize h (onBlockSize p h)) - modifyTVar sizeRqWip (HM.insert (p,h) now) + full <- isFullTBQueue w + unless full do + writeTBQueue w (RequestSize h (onBlockSize p h)) + modifyTVar sizeRqWip (HM.insert (p,h) now) ContT $ withAsync $ forever do (w1,dw,peerz) <- atomically do - peers <- readTVar pts + peers0 <- readTVar pts + + peers1 <- for (HM.toList peers0) $ \e@(_,(_,q)) -> do + full <- isFullTBQueue q + if full then do + pure (Left e) + else do + pure (Right e) + + let peers = HM.fromList (rights peers1) let n = HM.size peers @@ -750,14 +764,18 @@ downloadDispatcher brains env = flip runContT pure do dw <- readTVar downWip - let total = [ x | x@((p,_),_,_) <- L.take 32 (HPSQ.toList dw), HM.member p peers ] + let total = [ x + | x@((p,h),_,_) <- HPSQ.toList dw + , HM.member p peers + , not (HS.member h already) + ] & L.take 100 when (L.null total) retry let queue = total let qq = [ (h, HS.singleton p) - | ((p,h),_,_) <- queue, not (HS.member h already) + | ((p,h),_,_) <- queue ] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>) when (L.null qq) retry @@ -781,8 +799,13 @@ downloadDispatcher brains env = flip runContT pure do atomically do choo <- readTVar choosen <&> HS.member h unless choo do - writeTQueue who (FetchBlock h size (onBlock p0 h)) - modifyTVar choosen (HS.insert h) + full <- isFullTBQueue who + if full then do + -- FIXME: wtf? + modifyTVar downWip (HPSQ.insert (p0,h) 0.01 size) + else do + writeTBQueue who (FetchBlock h size (onBlock p0 h)) + modifyTVar choosen (HS.insert h) forever $ (>> pause @'Seconds 10) do sw0 <- readTVarIO wip <&> HM.size @@ -811,7 +834,7 @@ downloadDispatcher brains env = flip runContT pure do for_ peers $ \p -> do here <- readTVarIO pts <&> HM.member p unless here do - work <- newTQueueIO + work <- newTBQueueIO (16*1024*10) a <- async (peerThread stat p work) atomically $ modifyTVar pts (HM.insert p (a,work)) @@ -833,7 +856,7 @@ downloadDispatcher brains env = flip runContT pure do _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) - bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.05 0.20 + bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.10 0.25 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p @@ -851,7 +874,7 @@ downloadDispatcher brains env = flip runContT pure do writeTVar _avg avg twork <- ContT $ withAsync $ forever do - w <- atomically $ readTQueue work + w <- atomically $ readTBQueue work case w of RequestSize h answ -> do @@ -888,7 +911,7 @@ downloadDispatcher brains env = flip runContT pure do atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) liftIO $ answ (BlockFetched bs) - Left{} | i >= 5 -> liftIO $ answ BlockFetchError + Left{} | i >= 2 -> liftIO $ answ BlockFetchError | otherwise -> next (succ i) bs <- ContT $ withAsync $ forever do diff --git a/hbs2/Main.hs b/hbs2/Main.hs index 8402df68..4027e5d7 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -248,7 +248,7 @@ runCat opts ss = do MerkleAnn (MTreeAnn {_mtaCrypt = NullEncryption }) -> do bs <- runExceptT (readFromMerkle (AnyStorage ss) (SimpleKey mhash)) - >>= orThrowUser "can't read/decode tree" + >>= orThrowPassIO -- User "can't read/decode tree" LBS.putStr bs MerkleAnn ann@(MTreeAnn {_mtaCrypt = EncryptGroupNaClSymm gkh _}) -> do