mirror of https://github.com/voidlizard/hbs2
wip, some probes
This commit is contained in:
parent
08816dfc46
commit
49b16606aa
|
@ -592,10 +592,12 @@ data PSt =
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
)
|
)
|
||||||
=> SomeBrains e
|
=> AnyProbe
|
||||||
|
-> SomeBrains e
|
||||||
-> PeerEnv e
|
-> PeerEnv e
|
||||||
-> m ()
|
-> m ()
|
||||||
downloadDispatcher brains env = flip runContT pure do
|
downloadDispatcher probe brains env = flip runContT pure do
|
||||||
|
|
||||||
|
|
||||||
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) )
|
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) )
|
||||||
|
|
||||||
|
@ -632,6 +634,13 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
dupes <- newTVarIO ( mempty :: HashMap HashRef Int )
|
dupes <- newTVarIO ( mempty :: HashMap HashRef Int )
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever $ pause @'Seconds 10 >> do
|
||||||
|
acceptReport probe =<< S.toList_ do
|
||||||
|
wip <- readTVarIO wip <&> HM.size
|
||||||
|
pn <- readTVarIO pts <&> HM.size
|
||||||
|
S.yield ( "wip", fromIntegral wip )
|
||||||
|
S.yield ( "peerThreads", fromIntegral pn )
|
||||||
|
|
||||||
ContT $ withAsync do
|
ContT $ withAsync do
|
||||||
polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do
|
polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do
|
||||||
atomically $ modifyTVar dupes (HM.delete h)
|
atomically $ modifyTVar dupes (HM.delete h)
|
||||||
|
@ -678,6 +687,11 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
|
manageThreads :: (HashRef -> STM ())
|
||||||
|
-> TVar (HashMap HashRef DCB)
|
||||||
|
-> TVar (HashMap (Peer e) (Async (), PeerNonce))
|
||||||
|
-> m ()
|
||||||
|
|
||||||
manageThreads onBlock wip pts = do
|
manageThreads onBlock wip pts = do
|
||||||
_pnum <- newTVarIO 1
|
_pnum <- newTVarIO 1
|
||||||
|
|
||||||
|
@ -686,6 +700,10 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
|
acceptReport probe =<< S.toList_ do
|
||||||
|
n <- readTVarIO _psem <&> HM.size
|
||||||
|
S.yield ( "psem", fromIntegral n )
|
||||||
|
|
||||||
peers <- HM.fromList <$> do
|
peers <- HM.fromList <$> do
|
||||||
pips <- getKnownPeers @e <&> HS.fromList
|
pips <- getKnownPeers @e <&> HS.fromList
|
||||||
S.toList_ $ for_ pips $ \p -> do
|
S.toList_ $ for_ pips $ \p -> do
|
||||||
|
|
|
@ -1199,8 +1199,11 @@ runPeer opts = respawnOnError opts $ do
|
||||||
|
|
||||||
peerThread "pexLoop" (pexLoop @e brains tcp)
|
peerThread "pexLoop" (pexLoop @e brains tcp)
|
||||||
|
|
||||||
|
downloadProbe <- newSimpleProbe "Download"
|
||||||
|
addProbe downloadProbe
|
||||||
|
|
||||||
-- FIXME: new-download-loop
|
-- FIXME: new-download-loop
|
||||||
peerThread "downloadDispatcher" (downloadDispatcher (SomeBrains brains) env)
|
peerThread "downloadDispatcher" (downloadDispatcher downloadProbe (SomeBrains brains) env)
|
||||||
|
|
||||||
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)
|
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue