BasicBrains probe

This commit is contained in:
voidlizard 2024-11-01 09:49:52 +03:00
parent ebf21312db
commit 686f4167cb
2 changed files with 86 additions and 58 deletions

View File

@ -3,13 +3,13 @@
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language TypeOperators #-} {-# Language TypeOperators #-}
{-# Language RecordWildCards #-}
module Brains module Brains
( module Brains ( module Brains
, module HBS2.Peer.Brains , module HBS2.Peer.Brains
) where ) where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Peer.Proto.RefChan(ForRefChans) import HBS2.Peer.Proto.RefChan(ForRefChans)
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Net.Proto import HBS2.Net.Proto
@ -25,10 +25,9 @@ import PeerLogger
import PeerConfig import PeerConfig
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Applicative
import Control.Exception import Control.Exception
import Control.Monad
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Cont
import Crypto.Saltine.Core.Box qualified as Encrypt import Crypto.Saltine.Core.Box qualified as Encrypt
import Database.SQLite.Simple import Database.SQLite.Simple
import Database.SQLite.Simple.FromField import Database.SQLite.Simple.FromField
@ -50,7 +49,9 @@ import System.Directory
import System.FilePath import System.FilePath
import System.Random (randomRIO) import System.Random (randomRIO)
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
import UnliftIO (MonadUnliftIO(..),async,race) import UnliftIO (MonadUnliftIO(..),race,withAsync)
import Streaming.Prelude qualified as S
data PeerBrainsDb data PeerBrainsDb
@ -75,7 +76,7 @@ newtype CommitCmd = CommitCmd { onCommited :: IO () }
data BasicBrains e = data BasicBrains e =
BasicBrains BasicBrains
{ _brainsPeers :: TVar [Peer e] { _brainsPeers :: TVar (HashSet (Peer e))
, _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int ) , _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int )
, _brainsExpire :: Cache (Hash HbSync) () , _brainsExpire :: Cache (Hash HbSync) ()
, _brainsRemoved :: Cache HashRef () , _brainsRemoved :: Cache HashRef ()
@ -85,6 +86,7 @@ data BasicBrains e =
, _brainsDelDownload :: TQueue (Hash HbSync) , _brainsDelDownload :: TQueue (Hash HbSync)
, _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer , _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer
, _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String)) , _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String))
, _brainsProbe :: TVar AnyProbe
} }
makeLenses 'BasicBrains makeLenses 'BasicBrains
@ -131,7 +133,7 @@ instance ( Hashable (Peer e)
onKnownPeers br ps = do onKnownPeers br ps = do
trace $ "BRAINS: onKnownPeers" <+> pretty ps trace $ "BRAINS: onKnownPeers" <+> pretty ps
let tv = view brainsPeers br let tv = view brainsPeers br
liftIO $ atomically $ writeTVar tv ps liftIO $ atomically $ writeTVar tv (HashSet.fromList ps)
updateOP br $ do updateOP br $ do
transactional br $ do transactional br $ do
deleteKnownPeers br deleteKnownPeers br
@ -177,7 +179,7 @@ instance ( Hashable (Peer e)
insertDownload b (HashRef <$> p) (HashRef h) insertDownload b (HashRef <$> p) (HashRef h)
shouldPostponeBlock b h = do shouldPostponeBlock b h = do
peers <- liftIO $ readTVarIO (view brainsPeers b) peers <- liftIO $ readTVarIO (view brainsPeers b) <&> HashSet.toList
downs <- liftIO $ readTVarIO (view brainsPostponeDown b) downs <- liftIO $ readTVarIO (view brainsPostponeDown b)
kicked <- liftIO $ Cache.lookup (view brainsRemoved b) (HashRef h) <&> isJust kicked <- liftIO $ Cache.lookup (view brainsRemoved b) (HashRef h) <&> isJust
@ -776,9 +778,17 @@ tableExists conn prefix' tableName = do
prefix = fromMaybe "main" prefix' prefix = fromMaybe "main" prefix'
type ForBasicBrains e = (Hashable (Peer e), Hashable (PubKey 'Sign (Encryption e)))
basicBrainsSetProbe :: forall e m . (MonadIO m, ForBasicBrains e)
=> BasicBrains e
-> AnyProbe
-> m ()
basicBrainsSetProbe BasicBrains{..} p = do
liftIO $ atomically $ writeTVar _brainsProbe p
-- FIXME: eventually-close-db -- FIXME: eventually-close-db
newBasicBrains :: forall e m . ( Hashable (Peer e) newBasicBrains :: forall e m . ( ForBasicBrains e
, Hashable (PubKey 'Sign (Encryption e))
, MonadIO m , MonadIO m
) )
=> PeerConfig => PeerConfig
@ -934,6 +944,7 @@ newBasicBrains cfg = liftIO do
<*> newTQueueIO <*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) <*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds)))
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO (AnyProbe ())
data PeerDownloadsDelOnStart data PeerDownloadsDelOnStart
@ -953,82 +964,95 @@ runBasicBrains :: forall e m . ( e ~ L4Proto
-> BasicBrains e -> BasicBrains e
-> m () -> m ()
runBasicBrains cfg brains = do runBasicBrains cfg brains@BasicBrains{..} = do
let pip = view brainsPipeline brains let pip = view brainsPipeline brains
let expire = view brainsExpire brains let expire = view brainsExpire brains
let sizes = view brainsSizeCache brains let sizes = view brainsSizeCache brains
let commit = view brainsCommit brains let commit = view brainsCommit brains
-- FIXME: async-error-handling void $ flip runContT pure do
void $ liftIO $ async $ forever do
ewaiters <- race (pause @'Seconds 10) $ do -- FIXME: async-error-handling
atomically $ do void $ ContT $ withAsync $ forever $ liftIO do
c <- readTQueue commit
cs <- flushTQueue commit
pure (c:cs)
let waiters = fromRight mempty ewaiters & fmap onCommited ewaiters <- race (pause @'Seconds 10) $ do
atomically $ do
c <- readTQueue commit
cs <- flushTQueue commit
pure (c:cs)
w <- atomically $ readTQueue pip let waiters = fromRight mempty ewaiters & fmap onCommited
ws <- atomically $ flushTQueue pip
transactional brains (sequence_ (w:ws)) w <- atomically $ readTQueue pip
sequence_ waiters ws <- atomically $ flushTQueue pip
void $ liftIO $ async do transactional brains (sequence_ (w:ws))
del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains) sequence_ waiters
void $ ContT $ withAsync $ forever do
pause @'Seconds 10
p <- liftIO $ readTVarIO _brainsProbe
acceptReport p =<< S.toList_ do
S.yield =<< liftIO (readTVarIO _brainsPeers <&> ("brainsPeers",) . fromIntegral . length)
S.yield =<< liftIO (readTVarIO _brainsPostponeDown <&> ("brainsPostponeDown",) . fromIntegral . HashMap.size)
S.yield =<< liftIO (Cache.size _brainsExpire <&> ("brainsExpire",) . fromIntegral)
S.yield =<< liftIO (Cache.size _brainsRemoved <&> ("brainsRemoved",) . fromIntegral)
S.yield =<< liftIO (Cache.size _brainsSizeCache <&> ("brainsSizeCache",) . fromIntegral)
S.yield =<< liftIO (readTVarIO _brainsPolled <&> ("brainsPolled",) . fromIntegral . HashSet.size)
void $ ContT $ withAsync do
forever do forever do
pause @'Seconds 60 pause @'Seconds 60
del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
updateOP brains (cleanupHashes brains) updateOP brains (cleanupHashes brains)
for_ del $ \d -> do for_ del $ \d -> do
delDownload @e brains (HashRef d) delDownload @e brains (HashRef d)
trace "runBasicBrains init" trace "runBasicBrains init"
let (PeerConfig syn) = cfg let (PeerConfig syn) = cfg
let delDowns = runReader (cfgValue @PeerDownloadsDelOnStart) cfg :: FeatureSwitch let delDowns = runReader (cfgValue @PeerDownloadsDelOnStart) cfg :: FeatureSwitch
when (delDowns == FeatureOn ) do when (delDowns == FeatureOn ) do
debug $ yellow "CLEAN ALL DOWNLOADS" debug $ yellow "CLEAN ALL DOWNLOADS"
updateOP brains (delAllDownloads brains) updateOP brains (delAllDownloads brains)
commitNow brains False commitNow brains False
let polls = catMaybes ( let polls = catMaybes (
[ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref) [ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref)
| ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn | ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn
] ) ] )
void $ async $ do void $ ContT $ withAsync $ do
pause @'Seconds 10 pause @'Seconds 10
forM_ polls $ \(t,mi,x) -> do forM_ polls $ \(t,mi,x) -> do
trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi
updateOP brains $ do updateOP brains $ do
let conn = view brainsDb brains let conn = view brainsDb brains
liftIO $ execute conn [qc| liftIO $ execute conn [qc|
insert into {poll_table} (ref,type,interval) insert into {poll_table} (ref,type,interval)
values (?,?,?) values (?,?,?)
on conflict do update set interval = excluded.interval on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi) |] (show $ pretty (AsBase58 x), show $ pretty t, mi)
-- commitNow brains True -- commitNow brains True
void $ forever do void $ forever do
pause @'Seconds 20 pause @'Seconds 20
ee <- liftIO $ Cache.toList expire ee <- liftIO $ Cache.toList expire
let eee = [ h | (h,_,Just{}) <- ee ] let eee = [ h | (h,_,Just{}) <- ee ]
forM_ eee $ \h -> do forM_ eee $ \h -> do
cleanupPostponed brains h cleanupPostponed brains h
liftIO $ Cache.purgeExpired expire liftIO $ Cache.purgeExpired expire
liftIO $ Cache.purgeExpired sizes liftIO $ Cache.purgeExpired sizes
del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains) del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
for_ del $ \d -> do for_ del $ \d -> do
delDownload @e brains (HashRef d) delDownload @e brains (HashRef d)

View File

@ -807,6 +807,10 @@ runPeer opts = respawnOnError opts $ runResourceT do
brains <- newBasicBrains @e conf brains <- newBasicBrains @e conf
bProbe <- newSimpleProbe "Brains"
addProbe bProbe
basicBrainsSetProbe brains bProbe
brainsThread <- async $ runBasicBrains conf brains brainsThread <- async $ runBasicBrains conf brains
denv <- newDownloadEnv brains denv <- newDownloadEnv brains