BasicBrains probe

This commit is contained in:
voidlizard 2024-11-01 09:49:52 +03:00
parent 1b9a1f30df
commit 4f1f1a0e7e
2 changed files with 86 additions and 58 deletions

View File

@ -3,13 +3,13 @@
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language TypeOperators #-} {-# Language TypeOperators #-}
{-# Language RecordWildCards #-}
module Brains module Brains
( module Brains ( module Brains
, module HBS2.Peer.Brains , module HBS2.Peer.Brains
) where ) where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Peer.Proto.RefChan(ForRefChans) import HBS2.Peer.Proto.RefChan(ForRefChans)
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Net.Proto import HBS2.Net.Proto
@ -25,10 +25,9 @@ import PeerLogger
import PeerConfig import PeerConfig
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Applicative
import Control.Exception import Control.Exception
import Control.Monad
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Cont
import Crypto.Saltine.Core.Box qualified as Encrypt import Crypto.Saltine.Core.Box qualified as Encrypt
import Database.SQLite.Simple import Database.SQLite.Simple
import Database.SQLite.Simple.FromField import Database.SQLite.Simple.FromField
@ -50,7 +49,9 @@ import System.Directory
import System.FilePath import System.FilePath
import System.Random (randomRIO) import System.Random (randomRIO)
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
import UnliftIO (MonadUnliftIO(..),async,race) import UnliftIO (MonadUnliftIO(..),race,withAsync)
import Streaming.Prelude qualified as S
data PeerBrainsDb data PeerBrainsDb
@ -75,7 +76,7 @@ newtype CommitCmd = CommitCmd { onCommited :: IO () }
data BasicBrains e = data BasicBrains e =
BasicBrains BasicBrains
{ _brainsPeers :: TVar [Peer e] { _brainsPeers :: TVar (HashSet (Peer e))
, _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int ) , _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int )
, _brainsExpire :: Cache (Hash HbSync) () , _brainsExpire :: Cache (Hash HbSync) ()
, _brainsRemoved :: Cache HashRef () , _brainsRemoved :: Cache HashRef ()
@ -85,6 +86,7 @@ data BasicBrains e =
, _brainsDelDownload :: TQueue (Hash HbSync) , _brainsDelDownload :: TQueue (Hash HbSync)
, _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer , _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer
, _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String)) , _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String))
, _brainsProbe :: TVar AnyProbe
} }
makeLenses 'BasicBrains makeLenses 'BasicBrains
@ -131,7 +133,7 @@ instance ( Hashable (Peer e)
onKnownPeers br ps = do onKnownPeers br ps = do
trace $ "BRAINS: onKnownPeers" <+> pretty ps trace $ "BRAINS: onKnownPeers" <+> pretty ps
let tv = view brainsPeers br let tv = view brainsPeers br
liftIO $ atomically $ writeTVar tv ps liftIO $ atomically $ writeTVar tv (HashSet.fromList ps)
updateOP br $ do updateOP br $ do
transactional br $ do transactional br $ do
deleteKnownPeers br deleteKnownPeers br
@ -177,7 +179,7 @@ instance ( Hashable (Peer e)
insertDownload b (HashRef <$> p) (HashRef h) insertDownload b (HashRef <$> p) (HashRef h)
shouldPostponeBlock b h = do shouldPostponeBlock b h = do
peers <- liftIO $ readTVarIO (view brainsPeers b) peers <- liftIO $ readTVarIO (view brainsPeers b) <&> HashSet.toList
downs <- liftIO $ readTVarIO (view brainsPostponeDown b) downs <- liftIO $ readTVarIO (view brainsPostponeDown b)
kicked <- liftIO $ Cache.lookup (view brainsRemoved b) (HashRef h) <&> isJust kicked <- liftIO $ Cache.lookup (view brainsRemoved b) (HashRef h) <&> isJust
@ -776,9 +778,17 @@ tableExists conn prefix' tableName = do
prefix = fromMaybe "main" prefix' prefix = fromMaybe "main" prefix'
type ForBasicBrains e = (Hashable (Peer e), Hashable (PubKey 'Sign (Encryption e)))
basicBrainsSetProbe :: forall e m . (MonadIO m, ForBasicBrains e)
=> BasicBrains e
-> AnyProbe
-> m ()
basicBrainsSetProbe BasicBrains{..} p = do
liftIO $ atomically $ writeTVar _brainsProbe p
-- FIXME: eventually-close-db -- FIXME: eventually-close-db
newBasicBrains :: forall e m . ( Hashable (Peer e) newBasicBrains :: forall e m . ( ForBasicBrains e
, Hashable (PubKey 'Sign (Encryption e))
, MonadIO m , MonadIO m
) )
=> PeerConfig => PeerConfig
@ -934,6 +944,7 @@ newBasicBrains cfg = liftIO do
<*> newTQueueIO <*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) <*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds)))
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO (AnyProbe ())
data PeerDownloadsDelOnStart data PeerDownloadsDelOnStart
@ -953,15 +964,17 @@ runBasicBrains :: forall e m . ( e ~ L4Proto
-> BasicBrains e -> BasicBrains e
-> m () -> m ()
runBasicBrains cfg brains = do runBasicBrains cfg brains@BasicBrains{..} = do
let pip = view brainsPipeline brains let pip = view brainsPipeline brains
let expire = view brainsExpire brains let expire = view brainsExpire brains
let sizes = view brainsSizeCache brains let sizes = view brainsSizeCache brains
let commit = view brainsCommit brains let commit = view brainsCommit brains
void $ flip runContT pure do
-- FIXME: async-error-handling -- FIXME: async-error-handling
void $ liftIO $ async $ forever do void $ ContT $ withAsync $ forever $ liftIO do
ewaiters <- race (pause @'Seconds 10) $ do ewaiters <- race (pause @'Seconds 10) $ do
atomically $ do atomically $ do
@ -977,10 +990,21 @@ runBasicBrains cfg brains = do
transactional brains (sequence_ (w:ws)) transactional brains (sequence_ (w:ws))
sequence_ waiters sequence_ waiters
void $ liftIO $ async do void $ ContT $ withAsync $ forever do
del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains) pause @'Seconds 10
p <- liftIO $ readTVarIO _brainsProbe
acceptReport p =<< S.toList_ do
S.yield =<< liftIO (readTVarIO _brainsPeers <&> ("brainsPeers",) . fromIntegral . length)
S.yield =<< liftIO (readTVarIO _brainsPostponeDown <&> ("brainsPostponeDown",) . fromIntegral . HashMap.size)
S.yield =<< liftIO (Cache.size _brainsExpire <&> ("brainsExpire",) . fromIntegral)
S.yield =<< liftIO (Cache.size _brainsRemoved <&> ("brainsRemoved",) . fromIntegral)
S.yield =<< liftIO (Cache.size _brainsSizeCache <&> ("brainsSizeCache",) . fromIntegral)
S.yield =<< liftIO (readTVarIO _brainsPolled <&> ("brainsPolled",) . fromIntegral . HashSet.size)
void $ ContT $ withAsync do
forever do forever do
pause @'Seconds 60 pause @'Seconds 60
del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
updateOP brains (cleanupHashes brains) updateOP brains (cleanupHashes brains)
@ -1003,7 +1027,7 @@ runBasicBrains cfg brains = do
| ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn | ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn
] ) ] )
void $ async $ do void $ ContT $ withAsync $ do
pause @'Seconds 10 pause @'Seconds 10
forM_ polls $ \(t,mi,x) -> do forM_ polls $ \(t,mi,x) -> do
trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi
@ -1026,7 +1050,7 @@ runBasicBrains cfg brains = do
liftIO $ Cache.purgeExpired expire liftIO $ Cache.purgeExpired expire
liftIO $ Cache.purgeExpired sizes liftIO $ Cache.purgeExpired sizes
del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains) del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
for_ del $ \d -> do for_ del $ \d -> do
delDownload @e brains (HashRef d) delDownload @e brains (HashRef d)

View File

@ -807,6 +807,10 @@ runPeer opts = respawnOnError opts $ runResourceT do
brains <- newBasicBrains @e conf brains <- newBasicBrains @e conf
bProbe <- newSimpleProbe "Brains"
addProbe bProbe
basicBrainsSetProbe brains bProbe
brainsThread <- async $ runBasicBrains conf brains brainsThread <- async $ runBasicBrains conf brains
denv <- newDownloadEnv brains denv <- newDownloadEnv brains