From 7b38ed958175da1fc2eefc58ef9ae2d934701b50 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Fri, 1 Nov 2024 09:49:52 +0300 Subject: [PATCH] BasicBrains probe --- hbs2-peer/app/Brains.hs | 140 ++++++++++++++++++++++---------------- hbs2-peer/app/PeerMain.hs | 4 ++ 2 files changed, 86 insertions(+), 58 deletions(-) diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index abb40bbd..99e83708 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -3,13 +3,13 @@ {-# Language UndecidableInstances #-} {-# Language TemplateHaskell #-} {-# Language TypeOperators #-} +{-# Language RecordWildCards #-} module Brains ( module Brains , module HBS2.Peer.Brains ) where import HBS2.Prelude.Plated -import HBS2.Clock import HBS2.Peer.Proto.RefChan(ForRefChans) import HBS2.Data.Types.Refs import HBS2.Net.Proto @@ -25,10 +25,9 @@ import PeerLogger import PeerConfig import Control.Concurrent.STM -import Control.Applicative import Control.Exception -import Control.Monad import Control.Monad.Reader +import Control.Monad.Trans.Cont import Crypto.Saltine.Core.Box qualified as Encrypt import Database.SQLite.Simple import Database.SQLite.Simple.FromField @@ -50,7 +49,9 @@ import System.Directory import System.FilePath import System.Random (randomRIO) import Text.InterpolatedString.Perl6 (qc) -import UnliftIO (MonadUnliftIO(..),async,race) +import UnliftIO (MonadUnliftIO(..),race,withAsync) + +import Streaming.Prelude qualified as S data PeerBrainsDb @@ -75,7 +76,7 @@ newtype CommitCmd = CommitCmd { onCommited :: IO () } data BasicBrains e = BasicBrains - { _brainsPeers :: TVar [Peer e] + { _brainsPeers :: TVar (HashSet (Peer e)) , _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int ) , _brainsExpire :: Cache (Hash HbSync) () , _brainsRemoved :: Cache HashRef () @@ -85,6 +86,7 @@ data BasicBrains e = , _brainsDelDownload :: TQueue (Hash HbSync) , _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer , _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String)) + , _brainsProbe :: TVar AnyProbe } makeLenses 'BasicBrains @@ -131,7 +133,7 @@ instance ( Hashable (Peer e) onKnownPeers br ps = do trace $ "BRAINS: onKnownPeers" <+> pretty ps let tv = view brainsPeers br - liftIO $ atomically $ writeTVar tv ps + liftIO $ atomically $ writeTVar tv (HashSet.fromList ps) updateOP br $ do transactional br $ do deleteKnownPeers br @@ -177,7 +179,7 @@ instance ( Hashable (Peer e) insertDownload b (HashRef <$> p) (HashRef h) shouldPostponeBlock b h = do - peers <- liftIO $ readTVarIO (view brainsPeers b) + peers <- liftIO $ readTVarIO (view brainsPeers b) <&> HashSet.toList downs <- liftIO $ readTVarIO (view brainsPostponeDown b) kicked <- liftIO $ Cache.lookup (view brainsRemoved b) (HashRef h) <&> isJust @@ -776,9 +778,17 @@ tableExists conn prefix' tableName = do prefix = fromMaybe "main" prefix' +type ForBasicBrains e = (Hashable (Peer e), Hashable (PubKey 'Sign (Encryption e))) + +basicBrainsSetProbe :: forall e m . (MonadIO m, ForBasicBrains e) + => BasicBrains e + -> AnyProbe + -> m () +basicBrainsSetProbe BasicBrains{..} p = do + liftIO $ atomically $ writeTVar _brainsProbe p + -- FIXME: eventually-close-db -newBasicBrains :: forall e m . ( Hashable (Peer e) - , Hashable (PubKey 'Sign (Encryption e)) +newBasicBrains :: forall e m . ( ForBasicBrains e , MonadIO m ) => PeerConfig @@ -934,6 +944,7 @@ newBasicBrains cfg = liftIO do <*> newTQueueIO <*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) <*> newTVarIO mempty + <*> newTVarIO (AnyProbe ()) data PeerDownloadsDelOnStart @@ -953,82 +964,95 @@ runBasicBrains :: forall e m . ( e ~ L4Proto -> BasicBrains e -> m () -runBasicBrains cfg brains = do +runBasicBrains cfg brains@BasicBrains{..} = do let pip = view brainsPipeline brains let expire = view brainsExpire brains let sizes = view brainsSizeCache brains let commit = view brainsCommit brains - -- FIXME: async-error-handling - void $ liftIO $ async $ forever do + void $ flip runContT pure do - ewaiters <- race (pause @'Seconds 10) $ do - atomically $ do - c <- readTQueue commit - cs <- flushTQueue commit - pure (c:cs) + -- FIXME: async-error-handling + void $ ContT $ withAsync $ forever $ liftIO do - let waiters = fromRight mempty ewaiters & fmap onCommited + ewaiters <- race (pause @'Seconds 10) $ do + atomically $ do + c <- readTQueue commit + cs <- flushTQueue commit + pure (c:cs) - w <- atomically $ readTQueue pip - ws <- atomically $ flushTQueue pip + let waiters = fromRight mempty ewaiters & fmap onCommited - transactional brains (sequence_ (w:ws)) - sequence_ waiters + w <- atomically $ readTQueue pip + ws <- atomically $ flushTQueue pip - void $ liftIO $ async do - del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains) + transactional brains (sequence_ (w:ws)) + sequence_ waiters + + void $ ContT $ withAsync $ forever do + pause @'Seconds 10 + p <- liftIO $ readTVarIO _brainsProbe + acceptReport p =<< S.toList_ do + S.yield =<< liftIO (readTVarIO _brainsPeers <&> ("brainsPeers",) . fromIntegral . length) + S.yield =<< liftIO (readTVarIO _brainsPostponeDown <&> ("brainsPostponeDown",) . fromIntegral . HashMap.size) + S.yield =<< liftIO (Cache.size _brainsExpire <&> ("brainsExpire",) . fromIntegral) + S.yield =<< liftIO (Cache.size _brainsRemoved <&> ("brainsRemoved",) . fromIntegral) + S.yield =<< liftIO (Cache.size _brainsSizeCache <&> ("brainsSizeCache",) . fromIntegral) + S.yield =<< liftIO (readTVarIO _brainsPolled <&> ("brainsPolled",) . fromIntegral . HashSet.size) + + void $ ContT $ withAsync do forever do pause @'Seconds 60 + del <- liftIO $ atomically $ flushTQueue _brainsDelDownload updateOP brains (cleanupHashes brains) for_ del $ \d -> do delDownload @e brains (HashRef d) - trace "runBasicBrains init" + trace "runBasicBrains init" - let (PeerConfig syn) = cfg + let (PeerConfig syn) = cfg - let delDowns = runReader (cfgValue @PeerDownloadsDelOnStart) cfg :: FeatureSwitch + let delDowns = runReader (cfgValue @PeerDownloadsDelOnStart) cfg :: FeatureSwitch - when (delDowns == FeatureOn ) do - debug $ yellow "CLEAN ALL DOWNLOADS" - updateOP brains (delAllDownloads brains) - commitNow brains False + when (delDowns == FeatureOn ) do + debug $ yellow "CLEAN ALL DOWNLOADS" + updateOP brains (delAllDownloads brains) + commitNow brains False - let polls = catMaybes ( - [ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref) - | ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn - ] ) + let polls = catMaybes ( + [ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref) + | ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn + ] ) - void $ async $ do - pause @'Seconds 10 - forM_ polls $ \(t,mi,x) -> do - trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi - updateOP brains $ do - let conn = view brainsDb brains - liftIO $ execute conn [qc| - insert into {poll_table} (ref,type,interval) - values (?,?,?) - on conflict do update set interval = excluded.interval - |] (show $ pretty (AsBase58 x), show $ pretty t, mi) - -- commitNow brains True + void $ ContT $ withAsync $ do + pause @'Seconds 10 + forM_ polls $ \(t,mi,x) -> do + trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi + updateOP brains $ do + let conn = view brainsDb brains + liftIO $ execute conn [qc| + insert into {poll_table} (ref,type,interval) + values (?,?,?) + on conflict do update set interval = excluded.interval + |] (show $ pretty (AsBase58 x), show $ pretty t, mi) + -- commitNow brains True - void $ forever do - pause @'Seconds 20 - ee <- liftIO $ Cache.toList expire - let eee = [ h | (h,_,Just{}) <- ee ] - forM_ eee $ \h -> do - cleanupPostponed brains h + void $ forever do + pause @'Seconds 20 + ee <- liftIO $ Cache.toList expire + let eee = [ h | (h,_,Just{}) <- ee ] + forM_ eee $ \h -> do + cleanupPostponed brains h - liftIO $ Cache.purgeExpired expire - liftIO $ Cache.purgeExpired sizes + liftIO $ Cache.purgeExpired expire + liftIO $ Cache.purgeExpired sizes - del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains) - for_ del $ \d -> do - delDownload @e brains (HashRef d) + del <- liftIO $ atomically $ flushTQueue _brainsDelDownload + for_ del $ \d -> do + delDownload @e brains (HashRef d) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 5e7839ed..df6a6a1f 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -807,6 +807,10 @@ runPeer opts = respawnOnError opts $ runResourceT do brains <- newBasicBrains @e conf + bProbe <- newSimpleProbe "Brains" + addProbe bProbe + basicBrainsSetProbe brains bProbe + brainsThread <- async $ runBasicBrains conf brains denv <- newDownloadEnv brains