From 43d0739ec7b487dcf969734fc0cb0f431390ac3c Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 11 Nov 2024 16:35:34 +0300 Subject: [PATCH] good but fucked a little bit --- hbs2-peer/app/BlockDownloadNew.hs | 190 +++++++++++++++++++++--------- 1 file changed, 135 insertions(+), 55 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index dee2784d..f82024ae 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -60,6 +60,7 @@ import Data.Maybe import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy (ByteString) import Data.Vector qualified as V +import Data.Vector ((!?)) import Data.ByteString qualified as BS import Data.List qualified as L import Data.Coerce @@ -69,8 +70,11 @@ import Control.Concurrent.STM qualified as STM import UnliftIO.Concurrent import System.Random import Lens.Micro.Platform -import Streaming.Prelude qualified as S +import System.Random qualified as R +import System.Random.Shuffle qualified as Shuffle + +import Streaming.Prelude qualified as S data DownloadError e = @@ -141,7 +145,7 @@ queryBlockSizeFromPeer cache e h peer = do s <- lift $ findBlockSize @e cache _peerSignKey h - debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s + -- debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s maybe none (exit . Just) s @@ -558,7 +562,7 @@ data BlockFetchResult = | BlockAlreadyHere data Work = - RequestSize HashRef (Maybe Integer -> IO ()) + RequestSize HashRef (Maybe Integer -> IO ()) | FetchBlock HashRef Integer (BlockFetchResult -> IO ()) downloadDispatcher :: forall e m . ( e ~ L4Proto @@ -572,14 +576,18 @@ downloadDispatcher brains env = flip runContT pure do pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue Work)) -- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) ) + _blkNum <- newTVarIO 0 wip <- newTVarIO ( mempty :: HashMap HashRef NominalDiffTime ) sizeRq <- newTVarIO ( HPSQ.empty @HashRef @TimeSpec @() ) sizeRqWip <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) TimeSpec) sizeCache <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) (Maybe Integer) ) - downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @TimeSpec @Integer ) + downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @Double @Integer ) choosen <- newTVarIO ( mempty :: HashSet HashRef ) + stat <- newTVarIO ( mempty :: HashMap (Peer e) Double ) + fuckup <- newTVarIO ( mempty :: HashMap HashRef (HashSet (Peer e)) ) + -- done <- newTVarIO ( mempty :: HashSet HashRef ) - void $ ContT $ withAsync $ manageThreads pts + void $ ContT $ withAsync $ manageThreads stat pts sto <- withPeerM env getStorage @@ -594,15 +602,23 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar wip (HM.insert (HashRef h) 10) let onBlockSize p h s = do - let color = if isJust s then green else red - debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p - now <- getTimeCoarse + atomically do - modifyTVar sizeRq (HPSQ.delete h) - modifyTVar sizeCache (HM.insert (p,h) s) - modifyTVar sizeRqWip (HM.delete (p,h)) - maybe1 s none $ \size -> do - modifyTVar downWip (HPSQ.insert (p,h) now size) + modifyTVar sizeRqWip (HM.delete (p,h)) + modifyTVar sizeRq (HPSQ.delete h) + + -- let color = if isJust s then green else red + -- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p + -- dtt <- randomRIO (-0.05, 0.05) + let dtt = 0 + here <- hasBlock sto (coerce h) <&> isJust + unless here do + dt <- readTVarIO stat <&> (+dtt) . fromMaybe 1.0 . HM.lookup p + atomically do + -- blkNum <- stateTVar _blkNum (\x -> (x, succ x)) + modifyTVar sizeCache (HM.insert (p,h) s) + maybe1 s none $ \size -> do + modifyTVar downWip (HPSQ.insert (p,h) dt size) parseQ <- newTQueueIO @@ -624,17 +640,32 @@ downloadDispatcher brains env = flip runContT pure do BlockAlreadyHere -> do debug $ yellow "ALREADY HAVE BLOCK" <+> pretty h - deleteBlockFromWip h + -- deleteBlockFromWip h BlockFetched bs -> do - debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p + -- debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p + void $ putBlock sto bs atomically $ writeTQueue parseQ (h, bs) - deleteBlockFromWip h + -- atomically $ modifyTVar done (HS.insert h) + -- deleteBlockFromWip h BlockFetchError -> do now <- getTimeCoarse debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) + atomically $ modifyTVar choosen (HS.delete h) + + ContT $ withAsync $ forever do + let blkz = readTVarIO choosen <&> fmap (,5) . HS.toList + polling (Polling 1 1) blkz $ \h -> do + here <- hasBlock sto (coerce h) <&> isJust + if here then do + liftIO $ deleteBlockFromWip h + else do + now <- getTimeCoarse + atomically do + modifyTVar sizeRq (HPSQ.insert h now ()) + modifyTVar choosen (HS.delete h) ContT $ withAsync $ forever do let blkz = readTVarIO wip <&> fmap (,10) . HM.keys @@ -644,16 +675,22 @@ downloadDispatcher brains env = flip runContT pure do when here $ do liftIO $ deleteBlockFromWip h - -- ContT $ withAsync do - -- let blkz = readTVarIO wip <&> HM.toList - -- polling (Polling 1 1) blkz $ \h -> do - -- debug $ "POLL BLOCK" <+> pretty h - -- atomically $ modifyTVar wip (HM.adjust (*1.10) h) - -- here <- hasBlock sto (coerce h) <&> isJust - -- now <- getTimeCoarse - -- unless here $ atomically do - -- modifyTVar sizeRq (HPSQ.insert h now ()) - -- modifyTVar choosen (HS.delete h) + ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do + atomically do + w <- readTVar wip + cs <- readTVar sizeCache <&> HM.toList + writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ]) + +-- ContT $ withAsync do +-- let blkz = readTVarIO wip <&> HM.toList +-- polling (Polling 1 1) blkz $ \h -> do +-- debug $ "POLL BLOCK" <+> pretty h +-- atomically $ modifyTVar wip (HM.adjust (*1.10) h) +-- here <- hasBlock sto (coerce h) <&> isJust +-- now <- getTimeCoarse +-- unless here $ atomically do +-- modifyTVar sizeRq (HPSQ.insert h now ()) +-- modifyTVar choosen (HS.delete h) ContT $ withAsync $ forever do (h,bs) <- atomically $ readTQueue parseQ @@ -685,26 +722,49 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar sizeRqWip (HM.insert (p,h) now) ContT $ withAsync $ forever do - atomically do - peers <- readTVar pts - let n = HM.size peers * 20 - when ( n == 0 ) retry + (w1,dw,peerz) <- atomically do - (w,rest) <- readTVar downWip <&> L.splitAt n . HPSQ.toList + peers <- readTVar pts - writeTVar downWip (HPSQ.fromList rest) + let n = HM.size peers - when (L.null w) retry + when (n == 0) retry - for_ w $ \e@((p,h),prio,size) -> do - case HM.lookup p peers of - Nothing -> modifyTVar downWip (HPSQ.insert (p,h) prio size) - Just (_,q) -> do - here <- readTVar choosen <&> HS.member h - unless here do - writeTQueue q (FetchBlock h size (onBlock p h)) - modifyTVar choosen (HS.insert h) + already <- readTVar choosen + + dw <- readTVar downWip + + let total = [ x | x@((p,_),_,_) <- L.take 256 (HPSQ.toList dw), HM.member p peers ] + + let queue = total + + let qq = [ (h, HS.singleton p) + | ((p,h),_,_) <- queue, not (HS.member h already), HM.member p peers + ] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>) + + when (L.null qq) retry + + for_ total $ \(k,_,_) -> do + modifyTVar downWip (HPSQ.delete k) + + pure (qq,dw,peers) + + for_ w1 $ \(h,pps) -> do + i <- randomIO @Int <&> (`mod` V.length pps) . abs + debug $ blue "CHOOSIN PEER" <+> pretty h <+> pretty i <+> pretty (V.length pps) + flip runContT pure do + + p0 <- ContT $ maybe1 (pps !? i) (warn $ red "FUCKED PEER!") + + (_,who) <- ContT $ maybe1 (HM.lookup p0 peerz) (warn $ red "FUCKED QUEUE") + + (_,size) <- ContT $ maybe1 (HPSQ.lookup (p0,h) dw) (warn $ red "FUCKED SIZE") + + atomically do + modifyTVar fuckup (HM.insertWith (<>) h (HS.singleton p0)) + writeTQueue who (FetchBlock h size (onBlock p0 h)) + modifyTVar choosen (HS.insert h) forever $ (>> pause @'Seconds 10) do @@ -713,16 +773,21 @@ downloadDispatcher brains env = flip runContT pure do sdw <- readTVarIO downWip <&> HPSQ.size scsize <- readTVarIO sizeCache <&> HM.size chooSize <- readTVarIO choosen <&> HS.size + + fuck <- readTVarIO fuckup <&> HM.toList + let fucks = [ 1 | (h,p) <- fuck, HS.size p > 1 ] & sum + debug $ yellow $ "wip0" <+> pretty sw0 <+> "wip1:" <+> pretty srqw <+> "wip2" <+> pretty sdw <+> "wip3" <+> pretty chooSize <+> "sizes" <+> pretty scsize + <+> "fuckup" <+> pretty fucks where - manageThreads pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do + manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" peers <- getKnownPeers @e <&> HS.fromList @@ -731,7 +796,7 @@ downloadDispatcher brains env = flip runContT pure do here <- readTVarIO pts <&> HM.member p unless here do work <- newTQueueIO - a <- async (peerThread p work) + a <- async (peerThread stat p work) atomically $ modifyTVar pts (HM.insert p (a,work)) loosers <- atomically do @@ -743,7 +808,9 @@ downloadDispatcher brains env = flip runContT pure do mapM_ cancel (fmap (fst.snd) loosers) - peerThread p work = flip runContT pure do + peerThread stat p work = flip runContT pure do + + btimes <- newTVarIO ( mempty :: [Double] ) sto <- withPeerM env getStorage @@ -758,30 +825,43 @@ downloadDispatcher brains env = flip runContT pure do bmt <- ContT $ withAsync $ runBurstMachine bm + tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do + tss <- readTVarIO btimes + unless (L.null tss) do + let avg = sum tss / realToFrac (L.length tss) + atomically $ modifyTVar stat (HM.insert p avg) + twork <- ContT $ withAsync $ forever do w <- atomically $ readTQueue work case w of RequestSize h answ -> do - here <- hasBlock sto (coerce h) <&> isJust - unless here do - debug $ yellow "RequestSize" <+> pretty h <+> pretty p - s <- queryBlockSizeFromPeer brains env (coerce h) p - case s of - Left{} -> none - Right s -> liftIO (answ s) + here <- hasBlock sto (coerce h) + case here of + Just s -> liftIO (answ (Just s)) + Nothing -> do + s <- queryBlockSizeFromPeer brains env (coerce h) p + case s of + Left{} -> liftIO (answ Nothing) + Right s -> liftIO (answ s) FetchBlock h s answ -> flip fix 0 $ \next i -> do here <- hasBlock sto (coerce h) <&> isJust if here then do liftIO $ answ BlockAlreadyHere else do - debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p + -- debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p bu <- lift $ getCurrentBurst bm - r <- lift $ downloadFromPeer (TimeoutSec 30) bu (KnownSize s) env (coerce h) p + + t0 <- getTimeCoarse + r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p + t1 <- getTimeCoarse case r of - Right bs -> liftIO $ answ (BlockFetched bs) + Right bs -> do + let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9 + atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) + liftIO $ answ (BlockFetched bs) Left{} | i >= 5 -> liftIO $ answ BlockFetchError | otherwise -> next (succ i) @@ -790,5 +870,5 @@ downloadDispatcher brains env = flip runContT pure do pause @'Seconds 10 debug $ yellow "I'm thread" <+> pretty p - void $ waitAnyCatchCancel [bmt,bs,twork] + void $ waitAnyCatchCancel [bmt,bs,twork,tstat]