dynamic polling management

This commit is contained in:
Dmitry Zuikov 2023-10-13 10:06:17 +03:00
parent a6e8ee4fc9
commit 64400a425e
13 changed files with 361 additions and 208 deletions

View File

@ -79,6 +79,7 @@ common shared-properties
, mtl , mtl
, prettyprinter , prettyprinter
, prettyprinter-ansi-terminal , prettyprinter-ansi-terminal
, random
, resourcet , resourcet
, safe , safe
, saltine , saltine

View File

@ -1,19 +1,19 @@
module HBS2Git.Evolve (evolve) where module HBS2Git.Evolve (evolve,makePolled) where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import HBS2.OrDie import HBS2.Net.Proto.Service
import HBS2.Peer.RPC.API.Peer
import HBS2Git.Types import HBS2Git.Types
import HBS2.Git.Types
import HBS2Git.Config import HBS2Git.Config
import HBS2Git.PrettyStuff import HBS2Git.PrettyStuff
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.Functor
import Data.List qualified as List import Data.List qualified as List
import Prettyprinter.Render.Terminal
import System.Directory import System.Directory
import System.Random
import System.FilePath import System.FilePath
import UnliftIO import UnliftIO
@ -37,6 +37,12 @@ evolve = void $ runMaybeT do
generateCookie generateCookie
makePolled :: (MonadIO m, HasRPC m) => RepoRef -> m ()
makePolled ref = do
rpc <- getRPC <&> rpcPeer
n <- liftIO $ randomRIO (4,7)
void $ callService @RpcPollAdd rpc (fromRefLogKey ref, "reflog", n)
generateCookie :: MonadIO m => m () generateCookie :: MonadIO m => m ()
generateCookie = void $ runMaybeT do generateCookie = void $ runMaybeT do
file <- cookieFile file <- cookieFile

View File

@ -19,6 +19,7 @@ import HBS2Git.GitRepoLog
import HBS2Git.App import HBS2Git.App
import HBS2Git.Config import HBS2Git.Config
import HBS2Git.State import HBS2Git.State
import HBS2Git.Evolve
import HBS2Git.KeysMetaData import HBS2Git.KeysMetaData
import HBS2.Git.Local.CLI import HBS2.Git.Local.CLI
@ -121,6 +122,7 @@ importRefLogNew :: ( MonadIO m
, MonadCatch m , MonadCatch m
, MonadMask m , MonadMask m
, HasStorage m , HasStorage m
, HasRPC m
, HasEncryptionKeys m , HasEncryptionKeys m
, HasImportOpts opts , HasImportOpts opts
) )
@ -136,6 +138,8 @@ importRefLogNew opts ref = runResourceT do
temp <- liftIO getCanonicalTemporaryDirectory temp <- liftIO getCanonicalTemporaryDirectory
(_,dir) <- allocate (createTempDirectory temp myTempDir) removeDirectoryRecursive (_,dir) <- allocate (createTempDirectory temp myTempDir) removeDirectoryRecursive
lift $ makePolled ref
db <- makeDbPath ref >>= dbEnv db <- makeDbPath ref >>= dbEnv
do do

View File

@ -1,24 +1,25 @@
{-# Language AllowAmbiguousTypes #-} {-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
module Brains where module Brains
( module Brains
, module HBS2.Peer.Brains
) where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Clock import HBS2.Clock
import HBS2.Data.Types.Refs
import HBS2.Net.Proto.RefChan(ForRefChans) import HBS2.Net.Proto.RefChan(ForRefChans)
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Hash import HBS2.Hash
import HBS2.Base58 import HBS2.Base58
import HBS2.Net.IP.Addr import HBS2.Net.IP.Addr
import HBS2.Net.Auth.Credentials
import HBS2.Peer.Brains
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import PeerConfig import PeerConfig
import Crypto.Saltine.Core.Box qualified as Encrypt import Crypto.Saltine.Core.Box qualified as Encrypt
import Data.Maybe
import Control.Monad import Control.Monad
import Control.Exception import Control.Exception
import Control.Concurrent.STM import Control.Concurrent.STM
@ -45,140 +46,6 @@ data PeerBrainsDb
instance HasCfgKey PeerBrainsDb (Maybe String) where instance HasCfgKey PeerBrainsDb (Maybe String) where
key = "brains" key = "brains"
class HasBrains e a where
listPolledRefs :: MonadIO m => a -> String -> m [(PubKey 'Sign (Encryption e), Int)]
listPolledRefs _ _ = pure mempty
isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ = pure False
onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m ()
onClientTCPConnected _ _ = const none
getClientTCP :: MonadIO m => a -> m [(PeerAddr e,Word64)]
getClientTCP = const $ pure mempty
setActiveTCPSessions :: MonadIO m => a -> [(PeerAddr e, Word64)] -> m ()
setActiveTCPSessions _ _ = none
listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e]
listTCPPexCandidates _ = pure mempty
onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
onKnownPeers _ _ = none
onBlockSize :: ( MonadIO m
, IsPeerAddr e m
)
=> a
-> Peer e
-> Hash HbSync
-> Integer
-> m ()
onBlockSize _ _ _ _ = none
onBlockDownloadAttempt :: ( MonadIO m
, IsPeerAddr e m
)
=> a
-> Peer e
-> Hash HbSync
-> m ()
onBlockDownloadAttempt _ _ _ = none
onBlockDownloaded :: MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m ()
onBlockDownloaded _ _ _ = none
onBlockPostponed :: MonadIO m
=> a
-> Hash HbSync
-> m ()
onBlockPostponed _ _ = none
claimBlockCameFrom :: MonadIO m
=> a
-> Hash HbSync
-> Hash HbSync
-> m ()
claimBlockCameFrom _ _ _ = none
shouldPostponeBlock :: MonadIO m
=> a
-> Hash HbSync
-> m Bool
shouldPostponeBlock _ _ = pure False
shouldDownloadBlock :: MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m Bool
shouldDownloadBlock _ _ _ = pure False
advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e))
=> a
-> Hash HbSync
-> m [PeerAddr e]
advisePeersForBlock _ _ = pure mempty
blockSize :: forall m . MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m (Maybe Integer)
blockSize _ _ _ = pure Nothing
isReflogProcessed :: (MonadIO m)
=> a
-> Hash HbSync
-> m Bool
isReflogProcessed _ _ = pure False
setReflogProcessed :: (MonadIO m)
=> a
-> Hash HbSync
-> m ()
setReflogProcessed _ _ = pure ()
type NoBrains = ()
instance Pretty (Peer e) => HasBrains e NoBrains where
data SomeBrains e = forall a . HasBrains e a => SomeBrains a
instance HasBrains e (SomeBrains e) where
listPolledRefs (SomeBrains a) = listPolledRefs @e a
isPolledRef (SomeBrains a) = isPolledRef @e a
onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a
getClientTCP (SomeBrains a) = getClientTCP @e a
setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a
listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a
onKnownPeers (SomeBrains a) = onKnownPeers a
onBlockSize (SomeBrains a) = onBlockSize a
onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a
onBlockDownloaded (SomeBrains a) = onBlockDownloaded a
onBlockPostponed (SomeBrains a) = onBlockPostponed @e a
claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a
shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a
shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a
advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a
blockSize (SomeBrains a) = blockSize @e a
isReflogProcessed (SomeBrains a) = isReflogProcessed @e a
setReflogProcessed (SomeBrains a) = setReflogProcessed @e a
newtype CommitCmd = CommitCmd { onCommited :: IO () } newtype CommitCmd = CommitCmd { onCommited :: IO () }
@ -296,18 +163,45 @@ instance ( Hashable (Peer e)
setReflogProcessed b h = do setReflogProcessed b h = do
updateOP b $ insertReflogProcessed b h updateOP b $ insertReflogProcessed b h
listPolledRefs brains tp = do addPolledRef brains r s i = do
updateOP brains $ do
let conn = view brainsDb brains
liftIO $ execute conn sql (show $ pretty (AsBase58 r), s, i)
where
sql = [qc|
insert into statedb.poll (ref,type,interval)
values (?,?,?)
on conflict do update set interval = excluded.interval
|]
delPolledRef brains r = do
updateOP brains $ do
let conn = view brainsDb brains
liftIO $ execute conn sql (Only (show $ pretty (AsBase58 r)))
where
sql = [qc|
delete from statedb.poll
where ref = ?
|]
listPolledRefs brains mtp = do
liftIO $ do liftIO $ do
let conn = view brainsDb brains let conn = view brainsDb brains
query conn [qc|select ref, interval from poll where type = ?|] (Only tp) case mtp of
<&> fmap (\(r,i) -> (,i) <$> fromStringMay r ) Nothing -> postprocess <$>
<&> catMaybes query_ conn [qc|select ref, type, interval from statedb.poll|]
Just tp -> postprocess <$>
query conn [qc|select ref, type, interval from statedb.poll where type = ?|] (Only tp)
where
postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r )
isPolledRef brains ref = do isPolledRef brains ref = do
liftIO do liftIO do
let conn = view brainsDb brains let conn = view brainsDb brains
query @_ @(Only Int) conn [qc| query @_ @(Only Int) conn [qc|
select 1 from poll select 1 from statedb.poll
where ref = ? where ref = ?
limit 1 limit 1
|] ( Only ( show $ pretty (AsBase58 ref) ) ) |] ( Only ( show $ pretty (AsBase58 ref) ) )
@ -707,6 +601,8 @@ newBasicBrains cfg = liftIO do
conn <- open brains conn <- open brains
debug $ "BRAINS:" <+> "state" <+> pretty stateDb
execute_ conn [qc|ATTACH DATABASE '{stateDb}' as statedb|] execute_ conn [qc|ATTACH DATABASE '{stateDb}' as statedb|]
execute_ conn [qc| execute_ conn [qc|
@ -775,15 +671,14 @@ newBasicBrains cfg = liftIO do
|] |]
execute_ conn [qc| execute_ conn [qc|
create table if not exists poll create table if not exists statedb.poll
( ref text not null ( ref text not null
, type text not null , type text not null
, interval int not null , interval int not null
, primary key (ref,type) , primary key (ref)
) )
|] |]
execute_ conn [qc| execute_ conn [qc|
create table if not exists peer_asymmkey create table if not exists peer_asymmkey
( peer text not null ( peer text not null
@ -852,7 +747,7 @@ runBasicBrains cfg brains = do
updateOP brains $ do updateOP brains $ do
let conn = view brainsDb brains let conn = view brainsDb brains
liftIO $ execute conn [qc| liftIO $ execute conn [qc|
insert into poll (ref,type,interval) insert into statedb.poll (ref,type,interval)
values (?,?,?) values (?,?,?)
on conflict do update set interval = excluded.interval on conflict do update set interval = excluded.interval
|] (show $ pretty (AsBase58 x), show $ pretty t, mi) |] (show $ pretty (AsBase58 x), show $ pretty t, mi)

View File

@ -77,6 +77,7 @@ import HBS2.Peer.RPC.API.RefChan
import RPC2(RPC2Context(..)) import RPC2(RPC2Context(..))
import Codec.Serialise as Serialise import Codec.Serialise as Serialise
import Control.Concurrent (myThreadId)
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception as Exception import Control.Exception as Exception
import Control.Monad.Reader import Control.Monad.Reader
@ -86,7 +87,6 @@ import Crypto.Saltine (sodiumInit)
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS import Data.ByteString qualified as BS
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
import Data.Function
import Data.List qualified as L import Data.List qualified as L
import Data.Map (Map) import Data.Map (Map)
import Data.Map qualified as Map import Data.Map qualified as Map
@ -104,6 +104,7 @@ import System.Mem
import System.Metrics import System.Metrics
import System.Posix.Process import System.Posix.Process
import System.Environment import System.Environment
import Prettyprinter.Render.Terminal
import UnliftIO.Exception qualified as U import UnliftIO.Exception qualified as U
-- import UnliftIO.STM -- import UnliftIO.STM
@ -163,7 +164,7 @@ data PeerOpts =
, _listenRpc :: Maybe String , _listenRpc :: Maybe String
, _peerCredFile :: Maybe FilePath , _peerCredFile :: Maybe FilePath
, _peerConfig :: Maybe FilePath , _peerConfig :: Maybe FilePath
, _peerRespawn :: Maybe Bool , _peerRespawn :: Bool
} }
deriving stock (Data) deriving stock (Data)
@ -225,6 +226,7 @@ runCLI = do
<> command "refchan" (info pRefChan (progDesc "refchan commands")) <> command "refchan" (info pRefChan (progDesc "refchan commands"))
<> command "peers" (info pPeers (progDesc "show known peers")) <> command "peers" (info pPeers (progDesc "show known peers"))
<> command "pexinfo" (info pPexInfo (progDesc "show pex")) <> command "pexinfo" (info pPexInfo (progDesc "show pex"))
<> command "poll" (info pPoll (progDesc "polling management"))
<> command "log" (info pLog (progDesc "set logging level")) <> command "log" (info pLog (progDesc "set logging level"))
-- FIXME: bring-back-dialogue-over-separate-socket -- FIXME: bring-back-dialogue-over-separate-socket
-- <> command "dial" (info pDialog (progDesc "dialog commands")) -- <> command "dial" (info pDialog (progDesc "dialog commands"))
@ -250,8 +252,10 @@ runCLI = do
c <- optional confOpt c <- optional confOpt
resp <- optional $ flag' True ( long "respawn" <> short 'R' <> help "respawn process") resp <- (optional $ flag' True ( long "no-respawn" <> short 'R' <> help "NO respawn"))
<&> isNothing
-- NOTE: respawn-by-default-now
pure $ PeerOpts pref l r k c resp pure $ PeerOpts pref l r k c resp
withOpts m g = do withOpts m g = do
@ -389,6 +393,52 @@ runCLI = do
Right Nothing -> exitFailure Right Nothing -> exitFailure
Right (Just h) -> print (pretty h) >> exitSuccess Right (Just h) -> print (pretty h) >> exitSuccess
pPoll = hsubparser ( command "list" (info pPollList (progDesc "list current pollers" ))
<> command "add" (info pPollAdd (progDesc "add poller" ))
<> command "del" (info pPollDel (progDesc "del poller" ))
)
pPollAdd = do
rpc <- pRpcCommon
r <- argument refP (metavar "REF")
t <- strArgument (metavar "TYPE")
i <- argument auto (metavar "INTERVAL")
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
callService @RpcPollAdd caller (r, t, i) >>= \case
Left e -> die (show e)
_ -> liftIO do
hPutDoc stdout $ "added poller for" <+> pretty (AsBase58 r)
exitSuccess
pPollDel = do
rpc <- pRpcCommon
r <- argument refP (metavar "REF")
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
callService @RpcPollDel caller r >>= \case
Left e -> die (show e)
_ -> liftIO do
hPutDoc stdout $ "deleted poller for" <+> pretty (AsBase58 r)
exitSuccess
pPollList = do
rpc <- pRpcCommon
-- ref <- strArgument ( metavar "REFLOG-KEY" )
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
void $ runMaybeT do
polls <- toMPlus =<< callService @RpcPollList caller ()
forM_ polls $ \(r,what,t) -> do
liftIO $ hPutDoc stdout $ fill 44 (pretty (AsBase58 r))
-- TODO: align-right
<+> fill 3 (pretty t)
<+> pretty what
<> line
refP :: ReadM (PubKey 'Sign HBS2Basic)
refP = maybeReader fromStringMay
myException :: SomeException -> IO () myException :: SomeException -> IO ()
myException e = err ( show e ) myException e = err ( show e )
@ -448,10 +498,11 @@ instance ( Monad m
response = lift . response response = lift . response
respawn :: PeerOpts -> IO () respawn :: PeerOpts -> IO ()
respawn opts = case view peerRespawn opts of respawn opts =
Just True -> do if not (view peerRespawn opts) then do
exitFailure
else do
let secs = 5 let secs = 5
notice $ "RESPAWNING in" <+> viaShow secs <> "s" notice $ "RESPAWNING in" <+> viaShow secs <> "s"
pause @'Seconds secs pause @'Seconds secs
@ -460,19 +511,19 @@ respawn opts = case view peerRespawn opts of
print (self, args) print (self, args)
executeFile self False args Nothing executeFile self False args Nothing
_ -> exitFailure
runPeer :: forall e s . ( e ~ L4Proto runPeer :: forall e s . ( e ~ L4Proto
, FromStringMaybe (PeerAddr e) , FromStringMaybe (PeerAddr e)
, s ~ Encryption e , s ~ Encryption e
, HasStorage (PeerM e IO) , HasStorage (PeerM e IO)
)=> PeerOpts -> IO () )=> PeerOpts -> IO ()
runPeer opts = Exception.handle (\e -> myException e runPeer opts = U.handle (\e -> myException e
>> performGC >> performGC
>> respawn opts >> respawn opts
) $ runResourceT do ) $ runResourceT do
threadSelf <- liftIO myThreadId
metrics <- liftIO newStore metrics <- liftIO newStore
xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString
@ -847,10 +898,14 @@ runPeer opts = Exception.handle (\e -> myException e
let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do
withPeerM env mx withPeerM env mx
`U.withException` \e -> case (fromException e) of `U.withException` \e -> case fromException e of
Just (e' :: AsyncCancelled) -> pure () Just (e' :: AsyncCancelled) -> pure ()
Nothing -> err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) Nothing -> do
err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e)
throwM e -- threadSelf (SomeException e)
debug $ "peerThread Finished:" <+> t debug $ "peerThread Finished:" <+> t
workers <- W.execWriterT do workers <- W.execWriterT do
peerThread "local multicast" $ forever $ do peerThread "local multicast" $ forever $ do
@ -858,8 +913,6 @@ runPeer opts = Exception.handle (\e -> myException e
debug "sending local peer announce" debug "sending local peer announce"
request localMulticast (PeerAnnounce @e pnonce) request localMulticast (PeerAnnounce @e pnonce)
-- peerThread "tcpWorker" (tcpWorker conf)
peerThread "httpWorker" (httpWorker conf peerMeta denv) peerThread "httpWorker" (httpWorker conf peerMeta denv)
peerThread "checkMetrics" (checkMetrics metrics) peerThread "checkMetrics" (checkMetrics metrics)
@ -891,7 +944,7 @@ runPeer opts = Exception.handle (\e -> myException e
peerThread "downloadQueue" (downloadQueue conf denv) peerThread "downloadQueue" (downloadQueue conf denv)
peerThread "reflogWorker" (reflogWorker @e conf rwa) peerThread "reflogWorker" (reflogWorker @e conf (SomeBrains brains) rwa)
peerThread "refChanWorker" (refChanWorker @e rce (SomeBrains brains)) peerThread "refChanWorker" (refChanWorker @e rce (SomeBrains brains))
@ -997,6 +1050,7 @@ runPeer opts = Exception.handle (\e -> myException e
, rpcPeerEnv = penv , rpcPeerEnv = penv
, rpcLocalMultiCast = localMulticast , rpcLocalMultiCast = localMulticast
, rpcStorage = AnyStorage s , rpcStorage = AnyStorage s
, rpcBrains = SomeBrains brains
, rpcDoFetch = liftIO . fetchHash penv denv , rpcDoFetch = liftIO . fetchHash penv denv
, rpcDoRefChanHeadPost = refChanHeadPostAction , rpcDoRefChanHeadPost = refChanHeadPostAction
, rpcDoRefChanPropose = refChanProposeAction , rpcDoRefChanPropose = refChanProposeAction

View File

@ -1,8 +1,7 @@
module RPC2.Peer module RPC2.Peer
( module RPC2.Peer ( module RPC2.Peer
, module HBS2.Peer.RPC.API.Peer , module HBS2.Peer.RPC.API.Peer
, module RPC2.LogLevel -- , module RPC2.LogLevel
-- , SetLogging(..)
) where ) where
import HBS2.Peer.RPC.API.Peer import HBS2.Peer.RPC.API.Peer
@ -15,6 +14,6 @@ import RPC2.Poke()
import RPC2.RefLog() import RPC2.RefLog()
import RPC2.RefChan() import RPC2.RefChan()
import RPC2.Die() import RPC2.Die()
import RPC2.LogLevel import RPC2.LogLevel()
-- import RPC2.LogLevel(SetLogging(..)) import RPC2.Poll()

View File

@ -0,0 +1,35 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module RPC2.Poll where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Service
import HBS2.Peer.Brains
import HBS2.System.Logger.Simple
import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.Internal.Types
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollList where
handleMethod _ = do
brains <- getRpcContext @PeerAPI <&> rpcBrains
debug $ "rpc.pollList"
listPolledRefs @L4Proto brains Nothing
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollAdd where
handleMethod (r,t,i) = do
brains <- getRpcContext @PeerAPI <&> rpcBrains
debug $ "rpc.pollAdd"
addPolledRef @L4Proto brains r t i
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollDel where
handleMethod r = do
brains <- getRpcContext @PeerAPI <&> rpcBrains
debug $ "rpc.pollDel"
delPolledRef @L4Proto brains r

View File

@ -42,7 +42,6 @@ import PeerConfig
import BlockDownload import BlockDownload
import Brains import Brains
import Data.Dynamic
import Codec.Serialise import Codec.Serialise
import Control.Concurrent.STM (flushTQueue) import Control.Concurrent.STM (flushTQueue)
import Control.Exception () import Control.Exception ()
@ -51,19 +50,14 @@ import Control.Monad.Reader
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.Cache (Cache) import Data.Cache (Cache)
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
import Data.ByteString (ByteString)
import Data.Either
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet import Data.HashSet qualified as HashSet
import Data.Heap () import Data.Heap ()
import Data.Coerce
-- import Data.Heap qualified as Heap
import Data.List qualified as List import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Data.Text qualified as Text import Data.Text qualified as Text
import Lens.Micro.Platform import Lens.Micro.Platform
import Data.Generics.Product
import UnliftIO import UnliftIO
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
@ -662,7 +656,9 @@ refChanWorker env brains = do
refChanPoll = do refChanPoll = do
let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) let listRefs = listPolledRefs @e brains (Just "refchan")
<&> fmap (\(a,_,b) -> (a,b))
<&> fmap (over _2 ( (*60) . fromIntegral) )
polling (Polling 5 5) listRefs $ \ref -> do polling (Polling 5 5) listRefs $ \ref -> do
debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref)

View File

@ -21,10 +21,10 @@ import HBS2.Merkle
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import Brains
import PeerConfig import PeerConfig
import PeerTypes import PeerTypes
import Data.Functor
import Data.Function(fix) import Data.Function(fix)
import Data.Maybe import Data.Maybe
import Data.Foldable(for_) import Data.Foldable(for_)
@ -115,10 +115,11 @@ reflogWorker :: forall e s m . ( MonadIO m, MyPeer e
, Pretty (AsBase58 (PubKey 'Sign s)) , Pretty (AsBase58 (PubKey 'Sign s))
) )
=> PeerConfig => PeerConfig
-> SomeBrains e
-> RefLogWorkerAdapter e -> RefLogWorkerAdapter e
-> m () -> m ()
reflogWorker conf adapter = do reflogWorker conf brains adapter = do
sto <- getStorage sto <- getStorage
@ -201,34 +202,14 @@ reflogWorker conf adapter = do
let (PeerConfig syn) = conf let (PeerConfig syn) = conf
let mkRef = fromStringMay . Text.unpack :: (Text -> Maybe (PubKey 'Sign s)) poller <- liftIO $ async do
let listRefs = listPolledRefs @e brains (Just "reflog")
<&> fmap (\(a,_,b) -> (a,b))
<&> fmap (over _2 ( (*60) . fromIntegral) )
let defPoll = lastDef 10 [ x polling (Polling 5 5) listRefs $ \ref -> do
| ListVal @C (Key "poll-default" [SymbolVal "reflog", LitIntVal x]) <- syn debug $ "POLLING REFLOG" <+> pretty (AsBase58 ref)
] reflogFetch adapter ref
let polls = HashMap.fromListWith min $ catMaybes (
[ (,x) <$> mkRef ref
| ListVal @C (Key "poll" [SymbolVal "reflog", LitIntVal x, LitStrVal ref]) <- syn
]
<>
[ (,defPoll) <$> mkRef ref
| ListVal @C (Key "subscribe" [SymbolVal "reflog", LitStrVal ref]) <- syn
] )
let pollIntervals = HashMap.fromListWith (<>) [ (i, [r]) | (r,i) <- HashMap.toList polls ]
& HashMap.toList
pollers' <- liftIO $ async $ do
pause @'Seconds 10
forM pollIntervals $ \(i,refs) -> liftIO do
async $ forever $ do
for_ refs $ \r -> do
trace $ "POLL REFERENCE" <+> pretty (AsBase58 r) <+> pretty i <> "m"
reflogFetch adapter r
pause (fromIntegral i :: Timeout 'Minutes)
w1 <- liftIO $ async $ forever $ replicateConcurrently_ 4 do w1 <- liftIO $ async $ forever $ replicateConcurrently_ 4 do
@ -275,8 +256,7 @@ reflogWorker conf adapter = do
-- trace "I'm a reflog update worker" -- trace "I'm a reflog update worker"
pollers <- liftIO $ wait pollers' void $ liftIO $ waitAnyCatchCancel [w1, poller]
void $ liftIO $ waitAnyCatchCancel $ w1 : pollers
where where

View File

@ -38,6 +38,7 @@ common common-deps
, network-multicast , network-multicast
, optparse-applicative , optparse-applicative
, prettyprinter , prettyprinter
, prettyprinter-ansi-terminal
, random , random
, random-shuffle , random-shuffle
, resourcet , resourcet
@ -132,6 +133,7 @@ library
default-language: Haskell2010 default-language: Haskell2010
exposed-modules: exposed-modules:
HBS2.Peer.Brains
HBS2.Peer.RPC.Class HBS2.Peer.RPC.Class
HBS2.Peer.RPC.API.Peer HBS2.Peer.RPC.API.Peer
HBS2.Peer.RPC.API.RefLog HBS2.Peer.RPC.API.RefLog
@ -176,6 +178,7 @@ executable hbs2-peer
, RPC2.Peers , RPC2.Peers
, RPC2.PexInfo , RPC2.PexInfo
, RPC2.Ping , RPC2.Ping
, RPC2.Poll
, RPC2.RefLog , RPC2.RefLog
, RPC2.RefChan , RPC2.RefChan
, PeerTypes , PeerTypes

View File

@ -0,0 +1,162 @@
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
module HBS2.Peer.Brains where
import HBS2.Prelude.Plated
import HBS2.Net.Proto
import HBS2.Hash
import Data.Word
-- TODO: rename
class HasBrains e a where
listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)]
listPolledRefs _ _ = pure mempty
isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ = pure False
delPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m ()
delPolledRef _ _ = pure ()
addPolledRef :: MonadIO m
=> a
-> PubKey 'Sign (Encryption e)
-> String
-> Int
-> m ()
addPolledRef _ _ _ _ = pure ()
onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m ()
onClientTCPConnected _ _ = const none
getClientTCP :: MonadIO m => a -> m [(PeerAddr e,Word64)]
getClientTCP = const $ pure mempty
setActiveTCPSessions :: MonadIO m => a -> [(PeerAddr e, Word64)] -> m ()
setActiveTCPSessions _ _ = none
listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e]
listTCPPexCandidates _ = pure mempty
onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
onKnownPeers _ _ = none
onBlockSize :: ( MonadIO m
, IsPeerAddr e m
)
=> a
-> Peer e
-> Hash HbSync
-> Integer
-> m ()
onBlockSize _ _ _ _ = none
onBlockDownloadAttempt :: ( MonadIO m
, IsPeerAddr e m
)
=> a
-> Peer e
-> Hash HbSync
-> m ()
onBlockDownloadAttempt _ _ _ = none
onBlockDownloaded :: MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m ()
onBlockDownloaded _ _ _ = none
onBlockPostponed :: MonadIO m
=> a
-> Hash HbSync
-> m ()
onBlockPostponed _ _ = none
claimBlockCameFrom :: MonadIO m
=> a
-> Hash HbSync
-> Hash HbSync
-> m ()
claimBlockCameFrom _ _ _ = none
shouldPostponeBlock :: MonadIO m
=> a
-> Hash HbSync
-> m Bool
shouldPostponeBlock _ _ = pure False
shouldDownloadBlock :: MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m Bool
shouldDownloadBlock _ _ _ = pure False
advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e))
=> a
-> Hash HbSync
-> m [PeerAddr e]
advisePeersForBlock _ _ = pure mempty
blockSize :: forall m . MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m (Maybe Integer)
blockSize _ _ _ = pure Nothing
isReflogProcessed :: (MonadIO m)
=> a
-> Hash HbSync
-> m Bool
isReflogProcessed _ _ = pure False
setReflogProcessed :: (MonadIO m)
=> a
-> Hash HbSync
-> m ()
setReflogProcessed _ _ = pure ()
type NoBrains = ()
instance Pretty (Peer e) => HasBrains e NoBrains where
data SomeBrains e = forall a . HasBrains e a => SomeBrains a
instance HasBrains e (SomeBrains e) where
listPolledRefs (SomeBrains a) = listPolledRefs @e a
isPolledRef (SomeBrains a) = isPolledRef @e a
delPolledRef (SomeBrains a) = delPolledRef @e a
addPolledRef (SomeBrains a) = addPolledRef @e a
onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a
getClientTCP (SomeBrains a) = getClientTCP @e a
setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a
listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a
onKnownPeers (SomeBrains a) = onKnownPeers a
onBlockSize (SomeBrains a) = onBlockSize a
onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a
onBlockDownloaded (SomeBrains a) = onBlockDownloaded a
onBlockPostponed (SomeBrains a) = onBlockPostponed @e a
claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a
shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a
shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a
advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a
blockSize (SomeBrains a) = blockSize @e a
isReflogProcessed (SomeBrains a) = isReflogProcessed @e a
setReflogProcessed (SomeBrains a) = setReflogProcessed @e a

View File

@ -22,6 +22,10 @@ data RpcFetch
data RpcLogLevel data RpcLogLevel
data RpcDie data RpcDie
data RpcPollList
data RpcPollAdd
data RpcPollDel
type PeerAPI = '[ RpcPoke type PeerAPI = '[ RpcPoke
, RpcPing , RpcPing
, RpcAnnounce , RpcAnnounce
@ -30,6 +34,9 @@ type PeerAPI = '[ RpcPoke
, RpcPexInfo , RpcPexInfo
, RpcLogLevel , RpcLogLevel
, RpcDie , RpcDie
, RpcPollList
, RpcPollAdd
, RpcPollDel
] ]
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
@ -64,6 +71,15 @@ type instance Output RpcPeers = [(PubKey 'Sign HBS2Basic, PeerAddr L4Proto)]
type instance Input RpcFetch = HashRef type instance Input RpcFetch = HashRef
type instance Output RpcFetch = () type instance Output RpcFetch = ()
type instance Input RpcPollList= ()
type instance Output RpcPollList = [(PubKey 'Sign HBS2Basic, String, Int)]
type instance Input RpcPollAdd = (PubKey 'Sign HBS2Basic, String, Int)
type instance Output RpcPollAdd = ()
type instance Input RpcPollDel = PubKey 'Sign HBS2Basic
type instance Output RpcPollDel = ()
type instance Input RpcLogLevel = SetLogging type instance Input RpcLogLevel = SetLogging
type instance Output RpcLogLevel = () type instance Output RpcLogLevel = ()

View File

@ -11,6 +11,7 @@ import HBS2.Data.Types.SignedBox
import HBS2.Net.Messaging.Unix import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto.Service import HBS2.Net.Proto.Service
import HBS2.Peer.RPC.Class import HBS2.Peer.RPC.Class
import HBS2.Peer.Brains
import Data.Config.Suckless.Syntax import Data.Config.Suckless.Syntax
import Data.Config.Suckless.Parse import Data.Config.Suckless.Parse
@ -28,6 +29,7 @@ data RPC2Context =
, rpcPeerEnv :: PeerEnv L4Proto , rpcPeerEnv :: PeerEnv L4Proto
, rpcLocalMultiCast :: Peer L4Proto , rpcLocalMultiCast :: Peer L4Proto
, rpcStorage :: AnyStorage , rpcStorage :: AnyStorage
, rpcBrains :: SomeBrains L4Proto
, rpcDoFetch :: HashRef -> IO () , rpcDoFetch :: HashRef -> IO ()
, rpcDoRefChanHeadPost :: HashRef -> IO () , rpcDoRefChanHeadPost :: HashRef -> IO ()
, rpcDoRefChanPropose :: (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) -> IO () , rpcDoRefChanPropose :: (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) -> IO ()