From 0a194b2e7c50f79712b11315383c8b510ae41fc8 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sun, 5 Feb 2023 13:58:22 +0300 Subject: [PATCH] better block processing --- hbs2-peer/app/BlockDownload.hs | 241 ++++++++++++++++++++++++++++----- 1 file changed, 204 insertions(+), 37 deletions(-) diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index abdd4762..3b1d1fb2 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -13,6 +13,7 @@ import HBS2.Hash import HBS2.Merkle import HBS2.Net.PeerLocator import HBS2.Net.Proto +import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated @@ -21,6 +22,7 @@ import HBS2.System.Logger.Simple import PeerInfo +import Numeric ( showGFloat ) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader @@ -76,17 +78,29 @@ newtype instance SessionKey e (BlockChunks e) = DownloadSessionKey (Peer e, Cookie e) deriving stock (Generic,Typeable) +data BsFSM = Initial + | Downloading + | Postpone --- data MyBlkInfo e = --- MyBlkInfo (Peer e) Integer - -- deriving stock (Eq,Ord) +data BlockState = + BlockState + { _bsStart :: TimeSpec + , _bsTimes :: Int + , _bsState :: BsFSM + , _bsWipTo :: Double + } + +makeLenses 'BlockState data DownloadEnv e = DownloadEnv - { _downloadQ :: TQueue (Hash HbSync) - , _peerBusy :: TVar (HashMap (Peer e) ()) - , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) - , _blockWip :: Cache (Hash HbSync) () + { _downloadQ :: TQueue (Hash HbSync) + , _peerBusy :: TVar (HashMap (Peer e) ()) + , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) + , _blockWip :: Cache (Hash HbSync) () + , _blockState :: TVar (HashMap (Hash HbSync) BlockState) + , _blockPostponed :: Cache (Hash HbSync) () + , _blockInQ :: TVar (HashMap (Hash HbSync) ()) } makeLenses 'DownloadEnv @@ -100,6 +114,9 @@ newDownloadEnv = liftIO do <*> newTVarIO mempty <*> newTVarIO mempty <*> Cache.newCache (Just defBlockWipTimeout) + <*> newTVarIO mempty + <*> Cache.newCache Nothing + <*> newTVarIO mempty newtype BlockDownloadM e m a = BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } @@ -117,19 +134,89 @@ runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a withDownload e m = runReaderT ( fromBlockDownloadM m ) e +setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m () +setBlockState h s = do + sh <- asks (view blockState) + liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s) + + +calcWaitTime :: MonadIO m => BlockDownloadM e m Double +calcWaitTime = do + wip <- asks (view blockWip) >>= liftIO . Cache.size + let wipn = realToFrac wip * 4 + let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 ) + pure waiting + +touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState +touchBlockState h st = do + sh <- asks (view blockState) + t <- liftIO $ getTime MonotonicCoarse + wo <- calcWaitTime + + let s = BlockState t 0 st wo + + sn <- liftIO $ atomically $ do + modifyTVar sh (HashMap.alter (doAlter s) h) + readTVar sh <&> fromMaybe s . HashMap.lookup h + + case view bsState sn of + Initial -> do + + let t0 = view bsStart sn + let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9 + + wip <- asks (view blockWip) >>= liftIO . Cache.size + + let waiting = view bsWipTo sn + + if dt > waiting then do -- FIXME: remove-hardcode + debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "") + let sn1 = sn { _bsState = Postpone } + liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1) + pure sn1 + else do + pure sn + + _ -> pure sn + + where + doAlter s1 = \case + Nothing -> Just s1 + Just s -> Just $ over bsTimes succ s + +getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState +getBlockState h = do + sh <- asks (view blockState) + touchBlockState h Initial + addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () addDownload h = do - q <- asks (view downloadQ) - wip <- asks (view blockWip) - liftIO do - atomically $ writeTQueue q h - Cache.insert wip h () + tinq <- asks (view blockInQ) + + doAdd <- liftIO $ atomically $ stateTVar tinq + \hm -> case HashMap.lookup h hm of + Nothing -> (True, HashMap.insert h () hm) + Just{} -> (False, HashMap.insert h () hm) + when doAdd $ do + + q <- asks (view downloadQ) + wip <- asks (view blockWip) + + liftIO do + atomically $ writeTQueue q h + Cache.insert wip h () + + void $ touchBlockState h Initial removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m () removeFromWip h = do wip <- asks (view blockWip) + st <- asks (view blockState) + po <- asks (view blockPostponed) liftIO $ Cache.delete wip h + liftIO $ Cache.delete po h + liftIO $ atomically $ modifyTVar' st (HashMap.delete h) withFreePeer :: (MyPeer e, MonadIO m) => Peer e @@ -166,7 +253,30 @@ dismissPeer p = do getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) getBlockForDownload = do q <- asks (view downloadQ) - liftIO $ atomically $ readTQueue q + inq <- asks (view blockInQ) + h <- liftIO $ atomically $ readTQueue q + liftIO $ atomically $ modifyTVar inq (HashMap.delete h) + pure h + +withBlockForDownload :: MonadIO m + => (Hash HbSync -> BlockDownloadM e m ()) + -> BlockDownloadM e m () + +withBlockForDownload action = do + + cache <- asks (view blockPostponed) + + h <- getBlockForDownload + s <- getBlockState h + + let postpone = toTimeSpec @'Seconds 10 -- FIXME: remove-hardcode + + case view bsState s of + Postpone -> do + debug $ "posponed:" <+> pretty h + liftIO $ Cache.insert' cache (Just postpone) h () + + _ -> action h addBlockInfo :: (MonadIO m, MyPeer e) => Peer e @@ -328,7 +438,7 @@ downloadFromWithPeer peer thisBkSize h = do updatePeerInfo True pinfo newBurst' <- liftIO $ readTVarIO burstSizeT - let newBurst = floor (realToFrac newBurst' * 0.5 ) + let newBurst = max defBurst $ floor (realToFrac newBurst' * 0.5 ) liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN) @@ -427,6 +537,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , EventListener e (BlockInfo e) m , EventListener e (BlockChunks e) m , EventListener e (BlockAnnounce e) m + , EventListener e (PeerHandshake e) m , EventEmitter e (BlockChunks e) m , Sessions e (BlockChunks e) m , Sessions e (PeerInfo e) m @@ -460,6 +571,8 @@ blockDownloadLoop env0 = do pinfo <- fetch True npi (PeerInfoKey p) id updatePeerInfo False pinfo + void $ liftIO $ async $ withPeerM e $ withDownload env0 (tossPostponed e) + -- TODO: peer info loop void $ liftIO $ async $ forever $ withPeerM e $ do pause @'Seconds 20 @@ -510,45 +623,99 @@ blockDownloadLoop env0 = do fix \next -> do - h <- getBlockForDownload + withBlockForDownload $ \h -> do - here <- liftIO $ hasBlock stor h <&> isJust + here <- liftIO $ hasBlock stor h <&> isJust - unless here do + unless here do - peers <- getPeersForBlock h + peers <- getPeersForBlock h - when (null peers) $ do + when (null peers) $ do - lift do -- in PeerM - subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do - withDownload env (addBlockInfo p1 hx s) + lift do -- in PeerM + subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do + withDownload env (addBlockInfo p1 hx s) - pips <- knownPeers @e pl - for_ pips $ \pip -> request pip (GetBlockSize @e h) + pips <- knownPeers @e pl + for_ pips $ \pip -> request pip (GetBlockSize @e h) - p <- knownPeers @e pl >>= liftIO . shuffleM + p <- knownPeers @e pl >>= liftIO . shuffleM - -- debug $ "known peers" <+> pretty p - -- debug $ "peers/blocks" <+> pretty peers + -- debug $ "known peers" <+> pretty p + -- debug $ "peers/blocks" <+> pretty peers - p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster + p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster - let withAllShit f = withPeerM e $ withDownload env f + let withAllShit f = withPeerM e $ withDownload env f - maybe1 p0 (again h) $ \(p1,size) -> do - withFreePeer p1 (again h) $ - liftIO do - re <- race ( pause defBlockWaitMax ) $ - withAllShit $ downloadFromWithPeer p1 size h + maybe1 p0 (again h) $ \(p1,size) -> do - case re of - Left{} -> withAllShit (again h) - Right{} -> withAllShit (processBlock h) + st <- getBlockState h + setBlockState h (set bsState Downloading st) + + withFreePeer p1 (again h) $ + liftIO do + re <- race ( pause defBlockWaitMax ) $ + withAllShit $ downloadFromWithPeer p1 size h + + case re of + Left{} -> withAllShit (again h) + Right{} -> withAllShit (processBlock h) next +tossPostponed :: forall e m . ( MonadIO m + , EventListener e (PeerHandshake e) m + , MyPeer e + ) + => PeerEnv e + -> BlockDownloadM e m () + +tossPostponed penv = do + + env <- ask + + waitQ <- liftIO newTQueueIO + + cache <- asks (view blockPostponed) + + lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do + liftIO $ atomically $ writeTQueue waitQ () + + forever do + r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTQueue waitQ ) + + let allBack = either (const False) (const True) r + + blks <- liftIO $ Cache.toList cache + + w <- calcWaitTime + + debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "") + <+> pretty (length blks) + + for_ blks $ \case + (k,_,Nothing) | not allBack -> pure () + | otherwise -> pushBack cache k + (k,_,Just{}) -> pushBack cache k + + where + pushBack cache k = do + w <- calcWaitTime + liftIO $ Cache.delete cache k + st <- getBlockState k + t0 <- liftIO $ getTime MonotonicCoarse + setBlockState k ( set bsStart t0 + . set bsState Initial + . set bsWipTo w + $ st + ) + debug $ "returning block to downloads" <+> pretty k + addDownload k + + -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) -- So don't be confused with types