From aa739d64e7c540408579c6019459b525a30fa0de Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 25 Jan 2023 08:11:37 +0300 Subject: [PATCH] blocks per second measurements --- hbs2-core/lib/HBS2/Actors/Peer.hs | 2 - hbs2-core/lib/HBS2/Net/Proto/Sessions.hs | 2 +- hbs2-tests/hbs2-tests.cabal | 6 +- hbs2-tests/test/Peer2Main.hs | 119 +++++++++++++++++++++-- 4 files changed, 117 insertions(+), 12 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 3219d29b..2b4e9d00 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -238,8 +238,6 @@ sweep = do ex <- asks (view envExpireTimes) sw <- asks (view envSweepers) - liftIO $ print $ pretty "sweep" - liftIO $ Cache.purgeExpired ex toSweep <- HashMap.toList <$> liftIO (readTVarIO sw) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs index 7f6b260e..d40faa7f 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs @@ -41,7 +41,7 @@ class ( Monad m , Eq (SessionKey e p) , Hashable (SessionKey e p) , Typeable (SessionData e p) - ) => Sessions e p m | p -> e where + ) => Sessions e p m where diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index fe3e2a91..f7f12f4d 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -120,12 +120,15 @@ executable test-peer-run , async , bytestring , cache + , clock , containers + , data-default , directory , filepath , hashable , microlens-platform , mtl + , mwc-random , prettyprinter , QuickCheck , random @@ -138,5 +141,4 @@ executable test-peer-run , transformers , uniplate , vector - , data-default - , mwc-random + diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index ca2dd72e..3b732e63 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -49,7 +49,11 @@ import System.Directory import System.Exit import System.FilePath.Posix import System.IO +import System.Clock import Safe +import Data.Hashable +import Type.Reflection +import Data.Fixed import System.Random.MWC import qualified Data.Vector.Unboxed as U @@ -183,6 +187,82 @@ handleBlockInfo (p, h, sz') = do data DownloadTask e = DownloadTask (Hash HbSync) (Maybe (Peer e, Integer)) +data Stats e = + Stats + { _blkNum :: !Int + , _blkNumLast :: !Int + , _timeLast :: !TimeSpec + } + deriving stock (Typeable,Generic) + +makeLenses 'Stats + +instance Default (Stats e) where + def = Stats 0 0 0 + +newStatsIO :: MonadIO m => m (Stats e) +newStatsIO = pure $ Stats 0 0 0 + +type instance SessionData e (Stats e) = Stats e + +instance Serialise TimeSpec +instance Serialise (Stats e) + +data instance SessionKey e (Stats e) = StatsKey + deriving stock (Typeable,Eq) + +instance Typeable (SessionKey e (Stats e)) => Hashable (SessionKey e (Stats e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(SessionKey e (Stats e)) + + +-- FIXME: for some reason Session typeclass +-- requires HasProtocol. +-- It seems somehow logical. But not convenient + +instance HasProtocol Fake (Stats Fake) where + type instance ProtocolId (Stats Fake) = 0xFFFFFFFE + type instance Encoded Fake = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +newtype Speed = Speed (Fixed E1) + deriving newtype (Ord, Eq, Num, Real, Fractional, Show) + +instance Pretty Speed where + pretty (Speed n) = pretty (show n) + + +updateStats :: forall e m . (MonadIO m, Sessions e (Stats e) m) + => Bool -> Int -> m (Stats e) + +updateStats updTime blknum = do + de <- newStatsIO + stats <- fetch @e True de StatsKey id + + t <- if updTime then do + liftIO $ getTime Monotonic + else + pure (view timeLast stats) + + let blkNumNew = view blkNum stats + blknum + + let blast = if updTime then + blkNumNew + else + view blkNumLast stats + + let newStats = set blkNum blkNumNew + . set timeLast t + . set blkNumLast blast + $ stats + + update @e de StatsKey (const newStats) + + pure newStats + + blockDownloadLoop :: forall e m . ( m ~ PeerM e IO -- , e ~ Fake , Serialise (Encoded e) @@ -198,6 +278,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO -- , EventEmitter e (BlockInfo e) m , Sessions e (BlockInfo e) m , Sessions e (BlockChunks e) m + , Sessions e (Stats e) m , HasStorage m , Num (Peer e) , Pretty (Peer e) @@ -209,6 +290,7 @@ blockDownloadLoop cw = do stor <- getStorage + stats0 <- newStatsIO let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" ] @@ -226,22 +308,40 @@ blockDownloadLoop cw = do liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask h (Just (p,s))) - fix \next -> do + env <- ask - -- debug $ "WIP:" <+> pretty wip - - job <- liftIO $ atomically $ Q.readTBQueue blq + void $ liftIO $ async $ forever $ withPeerM env $ do wip <- liftIO $ blocksInProcess cw + stats <- fetch @e True stats0 StatsKey id + t2 <- liftIO $ getTime Monotonic + + let tdiff = realToFrac (toNanoSecs t2 - toNanoSecs (view timeLast stats)) / 1e9 + let blkdiff = realToFrac $ view blkNum stats - view blkNumLast stats + let speed = if tdiff > 0 then blkdiff / tdiff else 0 :: Speed + void $ updateStats @e True 0 + debug $ "I'm alive!:" <+> pretty wip <+> pretty speed + pause ( 5 :: Timeout 'Seconds ) + + fix \next -> do + + ejob <- liftIO $ race ( pause ( 5 :: Timeout 'Seconds) ) + ( atomically $ Q.readTBQueue blq ) + + let job = either (const Nothing) Just ejob + + wip <- liftIO $ blocksInProcess cw if wip > 200 then do pause ( 1 :: Timeout 'Seconds ) else do case job of - DownloadTask hx (Just (p,s)) -> do + Nothing -> pure () + + Just (DownloadTask hx (Just (p,s))) -> do initDownload True blq p hx s - DownloadTask h Nothing -> do + Just (DownloadTask h Nothing) -> do peers <- getPeerLocator @e >>= knownPeers @e @@ -315,7 +415,7 @@ blockDownloadLoop cw = do liftIO $ addJob pip $ withPeerM env $ do sto <- getStorage - liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h + -- liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) -- debug $ pretty (show bt) @@ -354,6 +454,8 @@ mkAdapter :: forall e m . ( m ~ PeerM e IO , HasProtocol e (BlockChunks e) , Hashable (SessionKey e (BlockChunks e)) , Sessions e (BlockChunks e) (ResponseM e m) + , Sessions e (Stats e) (ResponseM e m) + , Default (SessionData e (Stats e)) , EventEmitter e (BlockChunks e) m , Pretty (Peer e) , Block ByteString ~ ByteString @@ -430,6 +532,9 @@ mkAdapter cww = do -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ when ( h1 == h ) $ do liftIO $ commitBlock cww cKey h + + updateStats @e False 1 + expire cKey -- debug "hash matched!" emit @e (BlockChunksEventKey h) (BlockReady h)