From 8ef0c2a6a4c34bee56c90254d1f345e24830ac1e Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sat, 9 Nov 2024 09:54:32 +0300 Subject: [PATCH] wip --- hbs2-peer/app/BlockDownloadNew.hs | 158 ++++++++++++++---------------- 1 file changed, 72 insertions(+), 86 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 836241ec..16038442 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -1,6 +1,7 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language UndecidableInstances #-} {-# Language AllowAmbiguousTypes #-} +{-# Language ImplicitParams #-} module BlockDownloadNew where import HBS2.Prelude.Plated @@ -42,6 +43,8 @@ import Control.Monad.Trans.Cont import Control.Concurrent.STM (flushTQueue,retry) import Data.Map qualified as Map import Data.Map (Map) +import Data.HashPSQ qualified as HPSQ +import Data.HashPSQ ( HashPSQ(..) ) import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM import Data.HashSet (HashSet) @@ -462,6 +465,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do lift $ request peer req + -- pause @'MilliSeconds ( rtt * 0.01 ) + t0 <- getTimeCoarse let watchdog = fix \next -> do @@ -472,11 +477,10 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do either (const none) (const next) r r <- liftIO $ race watchdog do - - atomically do - pieces <- readTVar _sBlockChunks2 - let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] - unless done retry + atomically do + pieces <- readTVar _sBlockChunks2 + let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] + unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next atomically $ flushTQueue chuQ @@ -527,6 +531,14 @@ data S1 = | S1QuerySize (Hash HbSync) | S1CheckMissed (Hash HbSync) + +data S2 = + S2Init (Hash HbSync) + | S2CheckBlock1 (Hash HbSync) ByteString + | S2CheckBlock2 (Hash HbSync) + | S2FetchBlock (Hash HbSync) + | S2Exit + downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m ) @@ -535,12 +547,16 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> m () downloadDispatcher brains env = flip runContT pure do + let t0 = 10.00 + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) - down <- newTVarIO ( mempty :: HashMap (Peer e) (HashMap (Hash HbSync) Integer) ) - use <- newTVarIO ( mempty :: HashMap (Hash HbSync) Integer ) rq <- newTQueueIO + seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int TimeSpec ) + + blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime ) + sto <- withPeerM env getStorage work <- newTQueueIO @@ -554,36 +570,58 @@ downloadDispatcher brains env = flip runContT pure do liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do - debug $ green "Download request" <+> pretty h - let w = do - -- here <- hasBlock sto h <&> isJust - -- let hs = if not here then [h] else mempty - -- missed <- findMissedBlocks sto (HashRef h) - -- for_ ( hs <> fmap coerce missed ) $ \hx -> do - atomically $ writeTQueue rq h - -- pause @'Seconds 0.01 + now <- getTimeCoarse + new <- atomically do + already <- readTVar seen <&> HPSQ.member (HashRef h) + if already then do + pure False + else do + modifyTVar seen ( HPSQ.insert (HashRef h) 1 now ) + modifyTVar blkQ ( HPSQ.insert (HashRef h) 1 t0 ) + pure True + when new do + debug $ green "New download request" <+> pretty h - atomically $ writeTQueue work w + let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList + where tr (k,_,v) = (k, realToFrac v) - ContT $ withAsync (manageThreads pts down use) - ContT $ withAsync (sizeQueryLoop pts rq down) + let shiftPrio = \case + Nothing -> ((), Nothing) + Just (p,v) -> ((), Just (succ p, v * 1.10 )) + + ContT $ withAsync $ forever do + polling (Polling 1 1) missChk $ \h -> do + debug $ blue "CHECK MISSED BLOCKS" <+> pretty h + + missed <- findMissedBlocks sto h + + here <- hasBlock sto (coerce h) <&> isJust + + atomically do + for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0) + if here then do + modifyTVar blkQ (HPSQ.delete (coerce h)) + else + modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h ) + + ContT $ withAsync (manageThreads pts) + -- ContT $ withAsync (sizeQueryLoop pts rq down) forever do pause @'Seconds 10 + size <- atomically $ readTVar blkQ <&> HPSQ.size debug $ yellow $ "I'm download dispatcher" - u <- readTVarIO use <&> length . HM.keys - d <- readTVarIO down <&> HS.size . HS.unions . fmap HM.keysSet . HM.elems - debug $ yellow $ "wip:" <+> pretty d + debug $ yellow $ "wip:" <+> pretty size where - manageThreads pts down use = withPeerM env $ forever do + manageThreads pts = withPeerM env $ forever do pips <- getKnownPeers @e <&> HS.fromList for_ pips $ \p -> do here <- readTVarIO pts <&> HM.member p unless here do - a <- async $ peerDownloadLoop env p down use + a <- async $ peerDownloadLoop env p atomically $ modifyTVar pts (HM.insert p a) debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) @@ -609,73 +647,21 @@ downloadDispatcher brains env = flip runContT pure do pause @'Seconds 5 - sizeQueryLoop pts rq down = flip runContT pure do - sto <- withPeerM env getStorage - wip <- newTVarIO ( mempty :: HashMap (Hash HbSync) NominalDiffTime ) + peerDownloadLoop env p = flip runContT pure do - void $ ContT $ withAsync $ replicateM_ 4 do - flip fix S1Init $ \next -> \case - S1Init -> do + sto <- withPeerM env getStorage - w <- atomically $ readTQueue rq - here <- hasBlock sto w <&> isJust + bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 - if not here then - next (S1QuerySize w) - else - next (S1CheckMissed w) + -- let blokz = atomically do + -- d <- readTVar down <&> fromMaybe mempty . HM.lookup p + -- pure $ HM.toList d - S1QuerySize h -> do - debug $ "QUERY SIZE1" <+> pretty h - atomically $ modifyTVar wip (HM.insert h 10) - next S1Init + -- void $ ContT $ withAsync $ polling (Polling 1 1) blokz $ \h0 -> do + -- debug $ "POLL BLOCK DOWNLOAD" <+> pretty h0 - S1CheckMissed h -> do - -- debug $ "CHECK MISSED!" <+> pretty h - next S1Init - - void $ ContT $ withAsync $ forever do - pause @'Seconds 10 - - let blkz = readTVarIO wip <&> HM.toList - - polling (Polling 1 1) blkz $ \h -> liftIO do - - pips <- readTVarIO pts <&> HM.keys - - forConcurrently_ pips $ \p -> do - - debug $ "QUERY SIZE" <+> pretty h <+> pretty p - r <- queryBlockSizeFromPeer brains env h p - - case r of - Right (Just size) -> do - atomically do - modifyTVar wip (HM.delete h) - modifyTVar down (HM.insertWith (<>) p (HM.singleton h size)) - - Right Nothing -> do - atomically $ modifyTVar wip (HM.adjust ((*1.10).(+60)) h) - - Left{} -> do - atomically $ modifyTVar wip (HM.adjust (*1.05) h) + forever do + pause @'Seconds 10 + debug $ yellow "Peer download loop" <+> pretty p - forever do - pause @'Seconds 10 - - peerDownloadLoop env p down use = forever do - debug $ "Peer download loop" <+> green (pretty p) - hh <- atomically do - u <- readTVar use - q <- readTVar down <&> HM.toList . fromMaybe mempty . HM.lookup p - let blk = headMay [ (k,v) | (k, v) <- q, fromMaybe 0 (HM.lookup k u) == 0 ] - case blk of - Just (h,size) -> do - modifyTVar use (HM.insertWith (+) h 1) - pure (h,size) - - Nothing -> retry - - debug $ red "START TO DOWNLOAD" <+> pretty hh <+> "FROM" <+> pretty p -