BasicBrains probe

This commit is contained in:
voidlizard 2024-11-01 09:49:52 +03:00
parent 1ec2ab21c9
commit 7b38ed9581
2 changed files with 86 additions and 58 deletions

View File

@ -3,13 +3,13 @@
{-# Language UndecidableInstances #-}
{-# Language TemplateHaskell #-}
{-# Language TypeOperators #-}
{-# Language RecordWildCards #-}
module Brains
( module Brains
, module HBS2.Peer.Brains
) where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Peer.Proto.RefChan(ForRefChans)
import HBS2.Data.Types.Refs
import HBS2.Net.Proto
@ -25,10 +25,9 @@ import PeerLogger
import PeerConfig
import Control.Concurrent.STM
import Control.Applicative
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Trans.Cont
import Crypto.Saltine.Core.Box qualified as Encrypt
import Database.SQLite.Simple
import Database.SQLite.Simple.FromField
@ -50,7 +49,9 @@ import System.Directory
import System.FilePath
import System.Random (randomRIO)
import Text.InterpolatedString.Perl6 (qc)
import UnliftIO (MonadUnliftIO(..),async,race)
import UnliftIO (MonadUnliftIO(..),race,withAsync)
import Streaming.Prelude qualified as S
data PeerBrainsDb
@ -75,7 +76,7 @@ newtype CommitCmd = CommitCmd { onCommited :: IO () }
data BasicBrains e =
BasicBrains
{ _brainsPeers :: TVar [Peer e]
{ _brainsPeers :: TVar (HashSet (Peer e))
, _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int )
, _brainsExpire :: Cache (Hash HbSync) ()
, _brainsRemoved :: Cache HashRef ()
@ -85,6 +86,7 @@ data BasicBrains e =
, _brainsDelDownload :: TQueue (Hash HbSync)
, _brainsSizeCache :: Cache (Peer e, Hash HbSync) Integer
, _brainsPolled :: TVar (HashSet (PubKey 'Sign (Encryption e), String))
, _brainsProbe :: TVar AnyProbe
}
makeLenses 'BasicBrains
@ -131,7 +133,7 @@ instance ( Hashable (Peer e)
onKnownPeers br ps = do
trace $ "BRAINS: onKnownPeers" <+> pretty ps
let tv = view brainsPeers br
liftIO $ atomically $ writeTVar tv ps
liftIO $ atomically $ writeTVar tv (HashSet.fromList ps)
updateOP br $ do
transactional br $ do
deleteKnownPeers br
@ -177,7 +179,7 @@ instance ( Hashable (Peer e)
insertDownload b (HashRef <$> p) (HashRef h)
shouldPostponeBlock b h = do
peers <- liftIO $ readTVarIO (view brainsPeers b)
peers <- liftIO $ readTVarIO (view brainsPeers b) <&> HashSet.toList
downs <- liftIO $ readTVarIO (view brainsPostponeDown b)
kicked <- liftIO $ Cache.lookup (view brainsRemoved b) (HashRef h) <&> isJust
@ -776,9 +778,17 @@ tableExists conn prefix' tableName = do
prefix = fromMaybe "main" prefix'
type ForBasicBrains e = (Hashable (Peer e), Hashable (PubKey 'Sign (Encryption e)))
basicBrainsSetProbe :: forall e m . (MonadIO m, ForBasicBrains e)
=> BasicBrains e
-> AnyProbe
-> m ()
basicBrainsSetProbe BasicBrains{..} p = do
liftIO $ atomically $ writeTVar _brainsProbe p
-- FIXME: eventually-close-db
newBasicBrains :: forall e m . ( Hashable (Peer e)
, Hashable (PubKey 'Sign (Encryption e))
newBasicBrains :: forall e m . ( ForBasicBrains e
, MonadIO m
)
=> PeerConfig
@ -934,6 +944,7 @@ newBasicBrains cfg = liftIO do
<*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds)))
<*> newTVarIO mempty
<*> newTVarIO (AnyProbe ())
data PeerDownloadsDelOnStart
@ -953,82 +964,95 @@ runBasicBrains :: forall e m . ( e ~ L4Proto
-> BasicBrains e
-> m ()
runBasicBrains cfg brains = do
runBasicBrains cfg brains@BasicBrains{..} = do
let pip = view brainsPipeline brains
let expire = view brainsExpire brains
let sizes = view brainsSizeCache brains
let commit = view brainsCommit brains
-- FIXME: async-error-handling
void $ liftIO $ async $ forever do
void $ flip runContT pure do
ewaiters <- race (pause @'Seconds 10) $ do
atomically $ do
c <- readTQueue commit
cs <- flushTQueue commit
pure (c:cs)
-- FIXME: async-error-handling
void $ ContT $ withAsync $ forever $ liftIO do
let waiters = fromRight mempty ewaiters & fmap onCommited
ewaiters <- race (pause @'Seconds 10) $ do
atomically $ do
c <- readTQueue commit
cs <- flushTQueue commit
pure (c:cs)
w <- atomically $ readTQueue pip
ws <- atomically $ flushTQueue pip
let waiters = fromRight mempty ewaiters & fmap onCommited
transactional brains (sequence_ (w:ws))
sequence_ waiters
w <- atomically $ readTQueue pip
ws <- atomically $ flushTQueue pip
void $ liftIO $ async do
del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains)
transactional brains (sequence_ (w:ws))
sequence_ waiters
void $ ContT $ withAsync $ forever do
pause @'Seconds 10
p <- liftIO $ readTVarIO _brainsProbe
acceptReport p =<< S.toList_ do
S.yield =<< liftIO (readTVarIO _brainsPeers <&> ("brainsPeers",) . fromIntegral . length)
S.yield =<< liftIO (readTVarIO _brainsPostponeDown <&> ("brainsPostponeDown",) . fromIntegral . HashMap.size)
S.yield =<< liftIO (Cache.size _brainsExpire <&> ("brainsExpire",) . fromIntegral)
S.yield =<< liftIO (Cache.size _brainsRemoved <&> ("brainsRemoved",) . fromIntegral)
S.yield =<< liftIO (Cache.size _brainsSizeCache <&> ("brainsSizeCache",) . fromIntegral)
S.yield =<< liftIO (readTVarIO _brainsPolled <&> ("brainsPolled",) . fromIntegral . HashSet.size)
void $ ContT $ withAsync do
forever do
pause @'Seconds 60
del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
updateOP brains (cleanupHashes brains)
for_ del $ \d -> do
delDownload @e brains (HashRef d)
trace "runBasicBrains init"
trace "runBasicBrains init"
let (PeerConfig syn) = cfg
let (PeerConfig syn) = cfg
let delDowns = runReader (cfgValue @PeerDownloadsDelOnStart) cfg :: FeatureSwitch
let delDowns = runReader (cfgValue @PeerDownloadsDelOnStart) cfg :: FeatureSwitch
when (delDowns == FeatureOn ) do
debug $ yellow "CLEAN ALL DOWNLOADS"
updateOP brains (delAllDownloads brains)
commitNow brains False
when (delDowns == FeatureOn ) do
debug $ yellow "CLEAN ALL DOWNLOADS"
updateOP brains (delAllDownloads brains)
commitNow brains False
let polls = catMaybes (
[ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref)
| ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn
] )
let polls = catMaybes (
[ (tp,n,) <$> fromStringMay @(PubKey 'Sign (Encryption e)) (Text.unpack ref)
| ListVal (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn
] )
void $ async $ do
pause @'Seconds 10
forM_ polls $ \(t,mi,x) -> do
trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi
updateOP brains $ do
let conn = view brainsDb brains
liftIO $ execute conn [qc|
insert into {poll_table} (ref,type,interval)
values (?,?,?)
on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi)
-- commitNow brains True
void $ ContT $ withAsync $ do
pause @'Seconds 10
forM_ polls $ \(t,mi,x) -> do
trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi
updateOP brains $ do
let conn = view brainsDb brains
liftIO $ execute conn [qc|
insert into {poll_table} (ref,type,interval)
values (?,?,?)
on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi)
-- commitNow brains True
void $ forever do
pause @'Seconds 20
ee <- liftIO $ Cache.toList expire
let eee = [ h | (h,_,Just{}) <- ee ]
forM_ eee $ \h -> do
cleanupPostponed brains h
void $ forever do
pause @'Seconds 20
ee <- liftIO $ Cache.toList expire
let eee = [ h | (h,_,Just{}) <- ee ]
forM_ eee $ \h -> do
cleanupPostponed brains h
liftIO $ Cache.purgeExpired expire
liftIO $ Cache.purgeExpired sizes
liftIO $ Cache.purgeExpired expire
liftIO $ Cache.purgeExpired sizes
del <- liftIO $ atomically $ flushTQueue (_brainsDelDownload brains)
for_ del $ \d -> do
delDownload @e brains (HashRef d)
del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
for_ del $ \d -> do
delDownload @e brains (HashRef d)

View File

@ -807,6 +807,10 @@ runPeer opts = respawnOnError opts $ runResourceT do
brains <- newBasicBrains @e conf
bProbe <- newSimpleProbe "Brains"
addProbe bProbe
basicBrainsSetProbe brains bProbe
brainsThread <- async $ runBasicBrains conf brains
denv <- newDownloadEnv brains