From 644d43c5c35a1f42f1314b5e23f8a6d64da3d2a7 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 13 Nov 2024 14:12:57 +0300 Subject: [PATCH] wip --- hbs2-peer/app/BlockDownloadNew.hs | 450 ++++++++++++------------------ 1 file changed, 176 insertions(+), 274 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index e85e19db..eaa2ab70 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -267,7 +267,7 @@ runBurstMachine BurstMachine{..} = do new <- if e2 > e1 then do let d = max 2.0 (current * (1.0 - down)) - nrates <- readTVar _rates <&> drop 5 . Map.toList + nrates <- readTVar _rates <&> drop 3 . Map.toList let newFucked = maybe d snd (headMay nrates) writeTVar _rates (Map.fromList nrates) pure newFucked @@ -565,6 +565,25 @@ data Work = RequestSize HashRef (Maybe Integer -> IO ()) | FetchBlock HashRef Integer (BlockFetchResult -> IO ()) + +-- | Download control block +data DCB = + DCB + { dcbStart :: TimeSpec + , dcbBusy :: TVar Int + , dcbDownloaded :: TVar Bool + } + +newDcbSTM :: TimeSpec -> STM DCB +newDcbSTM ts = DCB ts <$> newTVar 0 <*> newTVar False + +data PSt = + PChoose + | PInit HashRef DCB + | PQuerySize HashRef DCB + | PFetchBlock HashRef DCB Integer + | PReleaseBlock HashRef DCB Bool + downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m ) @@ -573,21 +592,13 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TBQueue Work)) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) _blkNum <- newTVarIO 0 - wip <- newTVarIO ( mempty :: HashMap HashRef NominalDiffTime ) - sizeRq <- newTVarIO ( HPSQ.empty @HashRef @TimeSpec @() ) - sizeRqWip <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) TimeSpec) - sizeCache <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) (Maybe Integer) ) - downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @Double @Integer ) - choosen <- newTVarIO ( mempty :: HashSet HashRef ) - stat <- newTVarIO ( mempty :: HashMap (Peer e) Double ) - fuckup <- newTVarIO ( mempty :: HashMap HashRef (HashSet (Peer e)) ) - -- done <- newTVarIO ( mempty :: HashSet HashRef ) + wip <- newTVarIO ( HPSQ.empty @HashRef @Double @DCB ) - void $ ContT $ withAsync $ manageThreads stat pts + void $ ContT $ withAsync $ manageThreads wip pts sto <- withPeerM env getStorage @@ -598,235 +609,32 @@ downloadDispatcher brains env = flip runContT pure do now <- getTimeCoarse debug $ green "New download request" <+> pretty h atomically do - modifyTVar sizeRq (HPSQ.insert (HashRef h) now ()) - modifyTVar wip (HM.insert (HashRef h) 30) + already <- readTVar wip <&> HPSQ.member (HashRef h) + dcb <- newDcbSTM now + unless already do + modifyTVar wip (HPSQ.insert (HashRef h) 1.0 dcb) - let onBlockSize p h s = do - - atomically do - modifyTVar sizeRqWip (HM.delete (p,h)) - modifyTVar sizeRq (HPSQ.delete h) - - l <- atomically do - w <- readTVar pts <&> fmap snd . HM.lookup p - maybe1 w (pure 0) lengthTBQueue - - -- let color = if isJust s then green else red - -- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p - dtt <- randomRIO (-0.05, 0.05) - -- let dtt = 0 - here <- hasBlock sto (coerce h) <&> isJust - unless here do - dt <- readTVarIO stat <&> (*(1+dtt)) . fromMaybe 1.0 . HM.lookup p - let rate = dt + realToFrac l - atomically do - -- blkNum <- stateTVar _blkNum (\x -> (x, succ x)) - modifyTVar sizeCache (HM.insert (p,h) s) - choo <- readTVar choosen <&> HS.member h - maybe1 s none $ \size -> do - unless choo do - modifyTVar downWip (HPSQ.insert (p,h) rate size) - - parseQ <- newTQueueIO - - let - deleteBlockFromWip :: HashRef -> IO () - deleteBlockFromWip h = do - atomically do - modifyTVar wip (HM.delete h) - modifyTVar sizeRq (HPSQ.delete h) - -- modifyTVar choosen (HS.delete h) - srwq <- readTVar sizeRqWip <&> HM.toList - writeTVar sizeRqWip (HM.fromList $ [ x | x@((_,hi),_) <- srwq, hi /= h ]) - dw <- readTVar downWip <&> HPSQ.toList - writeTVar downWip (HPSQ.fromList $ [ x | x@((_,hi),_,_) <- dw, hi /= h ]) - cs <- readTVar sizeCache <&> HM.toList - writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, hi /= h ]) - - let onBlock p h = \case - - BlockAlreadyHere -> do - debug $ yellow "ALREADY HAVE BLOCK" <+> pretty h - -- deleteBlockFromWip h - - BlockFetched bs -> do - -- debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p - void $ putBlock sto bs - atomically do - writeTQueue parseQ (h, bs) - modifyTVar fuckup (HM.insertWith (<>) h (HS.singleton p)) - - BlockFetchError -> do - now <- getTimeCoarse - debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p - atomically do - modifyTVar sizeRq (HPSQ.insert h now ()) - modifyTVar choosen (HS.delete h) - - ContT $ withAsync $ forever do - let blkz = readTVarIO choosen <&> fmap (,10) . HS.toList - polling (Polling 1 1) blkz $ \h -> do - here <- hasBlock sto (coerce h) <&> isJust - if here then do - liftIO $ deleteBlockFromWip h - else do - now <- getTimeCoarse - atomically do - modifyTVar sizeRq (HPSQ.insert h now ()) - modifyTVar choosen (HS.delete h) - - ContT $ withAsync $ forever do - let blkz = readTVarIO wip <&> fmap (,10) . HM.keys - polling (Polling 1 1) blkz $ \h -> do - - debug $ "CLEANUP BLOCK" <+> pretty h - here <- hasBlock sto (coerce h) <&> isJust - - when here $ do - liftIO $ deleteBlockFromWip h - - ContT $ withAsync $ forever do - pause @'Seconds 10 - w <- readTVarIO downWip <&> HPSQ.size - when (w == 0) do - atomically $ writeTVar choosen mempty - - ContT $ withAsync $ forever $ (>> pause @'Seconds 300) do - atomically $ writeTVar fuckup mempty - - ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do + ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do + debug "Sweep blocks" atomically do - w <- readTVar wip - cs <- readTVar sizeCache <&> HM.toList - writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ]) + total <- readTVar wip <&> HPSQ.toList - ContT $ withAsync do - let blkz = readTVarIO wip <&> HM.toList - polling (Polling 1 1) blkz $ \h -> do - debug $ "POLL BLOCK" <+> pretty h - atomically $ modifyTVar wip (HM.adjust (*1.10) h) - here <- hasBlock sto (coerce h) <&> isJust - now <- getTimeCoarse - unless here $ atomically do - modifyTVar sizeRq (HPSQ.insert h now ()) - modifyTVar choosen (HS.delete h) + alive <- for total $ \e@(h,_,DCB{..}) -> do + down <- readTVar dcbDownloaded + if down then + pure Nothing + else + pure (Just e) - ContT $ withAsync $ forever do - (h,_) <- atomically $ readTQueue parseQ - missed <- findMissedBlocks sto h - now <- getTimeCoarse - for_ (HS.fromList missed) $ \h -> do - here <- hasBlock sto (coerce h) <&> isJust - unless here do - atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) - - ContT $ withAsync $ forever do - - reqs <- atomically do - xs <- readTVar sizeRq <&> HPSQ.toList - when (L.null xs) retry - writeTVar sizeRq HPSQ.empty - pure [ h | (h,_,_) <- xs ] - - peers <- readTVarIO pts <&> HM.toList - - let tasks = [ (p, w, h) | (p,(_,w)) <- peers, h <- reqs ] - - now <- getTimeCoarse - for_ tasks $ \(p, w, h) -> do - - atomically do - full <- isFullTBQueue w - unless full do - writeTBQueue w (RequestSize h (onBlockSize p h)) - modifyTVar sizeRqWip (HM.insert (p,h) now) - - ContT $ withAsync $ forever do - - (w1,dw,peerz) <- atomically do - - peers0 <- readTVar pts - - peers1 <- for (HM.toList peers0) $ \e@(_,(_,q)) -> do - full <- isFullTBQueue q - if full then do - pure (Left e) - else do - pure (Right e) - - let peers = HM.fromList (rights peers1) - - let n = HM.size peers - - when (n == 0) retry - - already <- readTVar choosen - - dw <- readTVar downWip - - let total = [ x - | x@((p,h),_,_) <- HPSQ.toList dw - , HM.member p peers - , not (HS.member h already) - ] & L.take 100 - - when (L.null total) retry - - let queue = total - - let qq = [ (h, HS.singleton p) - | ((p,h),_,_) <- queue - ] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>) - - when (L.null qq) retry - - for_ total $ \(k,_,_) -> do - modifyTVar downWip (HPSQ.delete k) - - pure (qq,dw,peers) - - for_ w1 $ \(h,pps) -> do - i <- randomIO @Int <&> (`mod` V.length pps) . abs - debug $ blue "CHOOSIN PEER" <+> pretty h <+> pretty i <+> pretty (V.length pps) - flip runContT pure do - - p0 <- ContT $ maybe1 (pps !? i) (warn $ red "FUCKED PEER!") - - (_,who) <- ContT $ maybe1 (HM.lookup p0 peerz) (warn $ red "FUCKED QUEUE") - - (_,size) <- ContT $ maybe1 (HPSQ.lookup (p0,h) dw) (warn $ red "FUCKED SIZE") - - atomically do - choo <- readTVar choosen <&> HS.member h - unless choo do - full <- isFullTBQueue who - if full then do - -- FIXME: wtf? - modifyTVar downWip (HPSQ.insert (p0,h) 0.01 size) - else do - writeTBQueue who (FetchBlock h size (onBlock p0 h)) - modifyTVar choosen (HS.insert h) + writeTVar wip (HPSQ.fromList (catMaybes alive)) forever $ (>> pause @'Seconds 10) do - sw0 <- readTVarIO wip <&> HM.size - srqw <- readTVarIO sizeRqWip <&> HM.size - sdw <- readTVarIO downWip <&> HPSQ.size - scsize <- readTVarIO sizeCache <&> HM.size - chooSize <- readTVarIO choosen <&> HS.size - - fuck <- readTVarIO fuckup <&> HM.toList - let fucks = [ 1 | (h,p) <- fuck, HS.size p > 1 ] & sum - + sw0 <- readTVarIO wip <&> HPSQ.size debug $ yellow $ "wip0" <+> pretty sw0 - <+> "wip1:" <+> pretty srqw - <+> "wip2" <+> pretty sdw - <+> "wip3" <+> pretty chooSize - <+> "sizes" <+> pretty scsize - <+> "fuckup" <+> pretty fucks where - manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do + manageThreads wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" peers <- getKnownPeers @e <&> HS.fromList @@ -834,9 +642,8 @@ downloadDispatcher brains env = flip runContT pure do for_ peers $ \p -> do here <- readTVarIO pts <&> HM.member p unless here do - work <- newTBQueueIO (16*1024*10) - a <- async (peerThread stat p work) - atomically $ modifyTVar pts (HM.insert p (a,work)) + a <- async (peerThread p wip) + atomically $ modifyTVar pts (HM.insert p a) loosers <- atomically do xs <- readTVar pts <&> HM.toList @@ -845,24 +652,50 @@ downloadDispatcher brains env = flip runContT pure do writeTVar pts (HM.fromList alive) pure dead - mapM_ cancel (fmap (fst.snd) loosers) + mapM_ cancel (fmap snd loosers) - peerThread stat p work = flip runContT pure do + peerThread p wip = flip runContT pure do btimes <- newTVarIO ( mempty :: [Double] ) + + _errors <- newTVarIO 0 _avg <- newTVarIO 600 sto <- withPeerM env getStorage _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) - bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.10 0.25 + bm <- liftIO $ newBurstMachine 2.5 256 (Just 50) 0.10 0.25 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p debug $ yellow "Started thread for" <+> pretty p + -- TODO: sweep size cache! + + ContT $ withAsync $ forever $ (>> pause @'Seconds 60) do + atomically $ writeTVar _errors 0 + + tsweep <- ContT $ withAsync do + let hashes = readTVarIO _sizeCache <&> fmap (,60) . HM.keys + polling (Polling 1 10) hashes $ \h -> do + atomically do + here <- readTVar wip <&> HPSQ.member h + unless here do + modifyTVar _sizeCache (HM.delete h) + + parseQ <- newTQueueIO + + tparse <- ContT $ withAsync $ forever do + what <- atomically $ readTQueue parseQ + missed <- findMissedBlocks sto what + now <- getTimeCoarse + atomically do + for_ missed $ \hi -> do + dcb <- newDcbSTM now + modifyTVar wip (HPSQ.insert hi 1.0 dcb) + bmt <- ContT $ withAsync $ runBurstMachine bm tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do @@ -870,53 +703,122 @@ downloadDispatcher brains env = flip runContT pure do unless (L.null tss) do let avg = sum tss / realToFrac (L.length tss) atomically do - modifyTVar stat (HM.insert p avg) writeTVar _avg avg twork <- ContT $ withAsync $ forever do - w <- atomically $ readTBQueue work - case w of - RequestSize h answ -> do - here <- hasBlock sto (coerce h) - case here of - Just s -> liftIO (answ (Just s)) + flip fix PChoose $ \go -> \case + + PChoose -> do + + what <- atomically do + blocks <- readTVar wip <&> HPSQ.toList + let todo = blocks + flip fix todo $ \loop w -> do + erno <- readTVar _errors + if erno > 10 then + pure Nothing + else do + case w of + [] -> retry + (h,_,dcb):xs -> do + wpsize <- readTVar wip <&> HPSQ.size + let trsh = if wpsize < 10 then 3 else 0 + busy <- readTVar (dcbBusy dcb) + down <- readTVar (dcbDownloaded dcb) + absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h + if busy > trsh || down || absent then + loop xs + else do + modifyTVar (dcbBusy dcb) succ + pure $ Just (h,dcb) + + case what of + Just (hx, dcb) -> go (PInit hx dcb) Nothing -> do - s <- queryBlockSizeFromPeer brains env (coerce h) p - case s of - Left{} -> liftIO (answ Nothing) - Right s -> liftIO (answ s) + erno <- readTVarIO _errors + if erno > 50 then do + pause @'Seconds 60 + else do + pause @'Seconds 1.0 + go PChoose - FetchBlock h s answ -> flip fix 0 $ \next i -> do - here <- hasBlock sto (coerce h) <&> isJust - if here then do - liftIO $ answ BlockAlreadyHere + PInit hx dcb -> do + + debug $ yellow "Block choosen" <+> pretty p <+> pretty hx + + hereSize <- readTVarIO _sizeCache <&> HM.lookup hx + + case hereSize of + Just (Just size) -> do + go (PFetchBlock hx dcb size) + + Just Nothing -> do + debug $ blue "Release block" <+> pretty p <+> pretty hx + go (PReleaseBlock hx dcb False) + + Nothing -> do + debug $ blue "Query size" <+> pretty p <+> pretty hx + go (PQuerySize hx dcb) + + PQuerySize hx dcb -> do + s <- queryBlockSizeFromPeer brains env (coerce hx) p + case s of + Right (Just size) -> do + debug $ green "HAS BLOCK" <+> pretty p <+> pretty hx <+> pretty size + atomically $ modifyTVar _sizeCache (HM.insert hx (Just size)) + go (PFetchBlock hx dcb size) + + Right Nothing -> do + debug $ red "HAS NO BLOCK" <+> pretty p <+> pretty hx + atomically $ modifyTVar _sizeCache (HM.insert hx Nothing) + go (PReleaseBlock hx dcb False) + + Left{} -> do + atomically $ modifyTVar _errors succ + go (PReleaseBlock hx dcb False) + + PFetchBlock hx dcb size -> do + + bu <- lift $ getCurrentBurst bm + + t0 <- getTimeCoarse + r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize size) env (coerce hx) p + t1 <- getTimeCoarse + + case r of + Right bs -> do + let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9 + + avg <- readTVarIO _avg + + when (dtsec > avg * 1.05) do + burstMachineAddErrors bm 1 + + atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) + atomically $ writeTVar (dcbDownloaded dcb) True + + go (PReleaseBlock hx dcb True) + + Left e -> do + burstMachineAddErrors bm 1 + atomically $ modifyTVar _errors succ + debug $ red "BLOCK DOWNLOAD FUCKED" <+> pretty p <+> pretty hx <+> viaShow e + + go (PReleaseBlock hx dcb False) + + PReleaseBlock hx dcb done -> do + atomically do + if not done then do + modifyTVar (dcbBusy dcb) pred else do - -- debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p - bu <- lift $ getCurrentBurst bm - - t0 <- getTimeCoarse - r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p - t1 <- getTimeCoarse - - case r of - Right bs -> do - let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9 - - avg <- readTVarIO _avg - - when (dtsec > avg) do - burstMachineAddErrors bm 1 - - atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) - liftIO $ answ (BlockFetched bs) - - Left{} | i >= 2 -> liftIO $ answ BlockFetchError - | otherwise -> next (succ i) + -- modifyTVar (dcbBusy dcb) pred + modifyTVar wip (HPSQ.delete hx) + writeTQueue parseQ hx bs <- ContT $ withAsync $ forever do pause @'Seconds 10 debug $ yellow "I'm thread" <+> pretty p - void $ waitAnyCatchCancel [bmt,bs,twork,tstat] + void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep,tparse]