blocks per second measurements

This commit is contained in:
Dmitry Zuikov 2023-01-25 08:11:37 +03:00
parent bcb799f7e3
commit aa739d64e7
4 changed files with 117 additions and 12 deletions

View File

@ -238,8 +238,6 @@ sweep = do
ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers)
liftIO $ print $ pretty "sweep"
liftIO $ Cache.purgeExpired ex
toSweep <- HashMap.toList <$> liftIO (readTVarIO sw)

View File

@ -41,7 +41,7 @@ class ( Monad m
, Eq (SessionKey e p)
, Hashable (SessionKey e p)
, Typeable (SessionData e p)
) => Sessions e p m | p -> e where
) => Sessions e p m where

View File

@ -120,12 +120,15 @@ executable test-peer-run
, async
, bytestring
, cache
, clock
, containers
, data-default
, directory
, filepath
, hashable
, microlens-platform
, mtl
, mwc-random
, prettyprinter
, QuickCheck
, random
@ -138,5 +141,4 @@ executable test-peer-run
, transformers
, uniplate
, vector
, data-default
, mwc-random

View File

@ -49,7 +49,11 @@ import System.Directory
import System.Exit
import System.FilePath.Posix
import System.IO
import System.Clock
import Safe
import Data.Hashable
import Type.Reflection
import Data.Fixed
import System.Random.MWC
import qualified Data.Vector.Unboxed as U
@ -183,6 +187,82 @@ handleBlockInfo (p, h, sz') = do
data DownloadTask e = DownloadTask (Hash HbSync) (Maybe (Peer e, Integer))
data Stats e =
Stats
{ _blkNum :: !Int
, _blkNumLast :: !Int
, _timeLast :: !TimeSpec
}
deriving stock (Typeable,Generic)
makeLenses 'Stats
instance Default (Stats e) where
def = Stats 0 0 0
newStatsIO :: MonadIO m => m (Stats e)
newStatsIO = pure $ Stats 0 0 0
type instance SessionData e (Stats e) = Stats e
instance Serialise TimeSpec
instance Serialise (Stats e)
data instance SessionKey e (Stats e) = StatsKey
deriving stock (Typeable,Eq)
instance Typeable (SessionKey e (Stats e)) => Hashable (SessionKey e (Stats e)) where
hashWithSalt salt _ = hashWithSalt salt (someTypeRep p)
where
p = Proxy @(SessionKey e (Stats e))
-- FIXME: for some reason Session typeclass
-- requires HasProtocol.
-- It seems somehow logical. But not convenient
instance HasProtocol Fake (Stats Fake) where
type instance ProtocolId (Stats Fake) = 0xFFFFFFFE
type instance Encoded Fake = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
newtype Speed = Speed (Fixed E1)
deriving newtype (Ord, Eq, Num, Real, Fractional, Show)
instance Pretty Speed where
pretty (Speed n) = pretty (show n)
updateStats :: forall e m . (MonadIO m, Sessions e (Stats e) m)
=> Bool -> Int -> m (Stats e)
updateStats updTime blknum = do
de <- newStatsIO
stats <- fetch @e True de StatsKey id
t <- if updTime then do
liftIO $ getTime Monotonic
else
pure (view timeLast stats)
let blkNumNew = view blkNum stats + blknum
let blast = if updTime then
blkNumNew
else
view blkNumLast stats
let newStats = set blkNum blkNumNew
. set timeLast t
. set blkNumLast blast
$ stats
update @e de StatsKey (const newStats)
pure newStats
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
-- , e ~ Fake
, Serialise (Encoded e)
@ -198,6 +278,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
-- , EventEmitter e (BlockInfo e) m
, Sessions e (BlockInfo e) m
, Sessions e (BlockChunks e) m
, Sessions e (Stats e) m
, HasStorage m
, Num (Peer e)
, Pretty (Peer e)
@ -209,6 +290,7 @@ blockDownloadLoop cw = do
stor <- getStorage
stats0 <- newStatsIO
let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg"
]
@ -226,22 +308,40 @@ blockDownloadLoop cw = do
liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask h (Just (p,s)))
fix \next -> do
env <- ask
-- debug $ "WIP:" <+> pretty wip
job <- liftIO $ atomically $ Q.readTBQueue blq
void $ liftIO $ async $ forever $ withPeerM env $ do
wip <- liftIO $ blocksInProcess cw
stats <- fetch @e True stats0 StatsKey id
t2 <- liftIO $ getTime Monotonic
let tdiff = realToFrac (toNanoSecs t2 - toNanoSecs (view timeLast stats)) / 1e9
let blkdiff = realToFrac $ view blkNum stats - view blkNumLast stats
let speed = if tdiff > 0 then blkdiff / tdiff else 0 :: Speed
void $ updateStats @e True 0
debug $ "I'm alive!:" <+> pretty wip <+> pretty speed
pause ( 5 :: Timeout 'Seconds )
fix \next -> do
ejob <- liftIO $ race ( pause ( 5 :: Timeout 'Seconds) )
( atomically $ Q.readTBQueue blq )
let job = either (const Nothing) Just ejob
wip <- liftIO $ blocksInProcess cw
if wip > 200 then do
pause ( 1 :: Timeout 'Seconds )
else do
case job of
DownloadTask hx (Just (p,s)) -> do
Nothing -> pure ()
Just (DownloadTask hx (Just (p,s))) -> do
initDownload True blq p hx s
DownloadTask h Nothing -> do
Just (DownloadTask h Nothing) -> do
peers <- getPeerLocator @e >>= knownPeers @e
@ -315,7 +415,7 @@ blockDownloadLoop cw = do
liftIO $ addJob pip $ withPeerM env $ do
sto <- getStorage
liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h
-- liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h
bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h)
-- debug $ pretty (show bt)
@ -354,6 +454,8 @@ mkAdapter :: forall e m . ( m ~ PeerM e IO
, HasProtocol e (BlockChunks e)
, Hashable (SessionKey e (BlockChunks e))
, Sessions e (BlockChunks e) (ResponseM e m)
, Sessions e (Stats e) (ResponseM e m)
, Default (SessionData e (Stats e))
, EventEmitter e (BlockChunks e) m
, Pretty (Peer e)
, Block ByteString ~ ByteString
@ -430,6 +532,9 @@ mkAdapter cww = do
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
when ( h1 == h ) $ do
liftIO $ commitBlock cww cKey h
updateStats @e False 1
expire cKey
-- debug "hash matched!"
emit @e (BlockChunksEventKey h) (BlockReady h)