diff --git a/hbs2-git/hbs2-git.cabal b/hbs2-git/hbs2-git.cabal index cc18aebe..b354fb45 100644 --- a/hbs2-git/hbs2-git.cabal +++ b/hbs2-git/hbs2-git.cabal @@ -79,6 +79,7 @@ common shared-properties , mtl , prettyprinter , prettyprinter-ansi-terminal + , random , resourcet , safe , saltine diff --git a/hbs2-git/lib/HBS2Git/Evolve.hs b/hbs2-git/lib/HBS2Git/Evolve.hs index 27a8a8c7..43da9a8c 100644 --- a/hbs2-git/lib/HBS2Git/Evolve.hs +++ b/hbs2-git/lib/HBS2Git/Evolve.hs @@ -1,19 +1,19 @@ -module HBS2Git.Evolve (evolve) where +module HBS2Git.Evolve (evolve,makePolled) where import HBS2.Prelude.Plated import HBS2.System.Logger.Simple -import HBS2.OrDie +import HBS2.Net.Proto.Service + +import HBS2.Peer.RPC.API.Peer import HBS2Git.Types -import HBS2.Git.Types import HBS2Git.Config import HBS2Git.PrettyStuff import Control.Monad.Trans.Maybe -import Data.Functor import Data.List qualified as List -import Prettyprinter.Render.Terminal import System.Directory +import System.Random import System.FilePath import UnliftIO @@ -37,6 +37,12 @@ evolve = void $ runMaybeT do generateCookie +makePolled :: (MonadIO m, HasRPC m) => RepoRef -> m () +makePolled ref = do + rpc <- getRPC <&> rpcPeer + n <- liftIO $ randomRIO (4,7) + void $ callService @RpcPollAdd rpc (fromRefLogKey ref, "reflog", n) + generateCookie :: MonadIO m => m () generateCookie = void $ runMaybeT do file <- cookieFile diff --git a/hbs2-git/lib/HBS2Git/Import.hs b/hbs2-git/lib/HBS2Git/Import.hs index 8b721865..404673f1 100644 --- a/hbs2-git/lib/HBS2Git/Import.hs +++ b/hbs2-git/lib/HBS2Git/Import.hs @@ -19,6 +19,7 @@ import HBS2Git.GitRepoLog import HBS2Git.App import HBS2Git.Config import HBS2Git.State +import HBS2Git.Evolve import HBS2Git.KeysMetaData import HBS2.Git.Local.CLI @@ -121,6 +122,7 @@ importRefLogNew :: ( MonadIO m , MonadCatch m , MonadMask m , HasStorage m + , HasRPC m , HasEncryptionKeys m , HasImportOpts opts ) @@ -136,6 +138,8 @@ importRefLogNew opts ref = runResourceT do temp <- liftIO getCanonicalTemporaryDirectory (_,dir) <- allocate (createTempDirectory temp myTempDir) removeDirectoryRecursive + lift $ makePolled ref + db <- makeDbPath ref >>= dbEnv do diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 819dfe5c..0b01d3e7 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -1,24 +1,25 @@ {-# Language AllowAmbiguousTypes #-} {-# Language UndecidableInstances #-} {-# Language TemplateHaskell #-} -module Brains where +module Brains + ( module Brains + , module HBS2.Peer.Brains + ) where import HBS2.Prelude.Plated import HBS2.Clock -import HBS2.Data.Types.Refs import HBS2.Net.Proto.RefChan(ForRefChans) import HBS2.Net.Proto import HBS2.Hash import HBS2.Base58 import HBS2.Net.IP.Addr -import HBS2.Net.Auth.Credentials +import HBS2.Peer.Brains import HBS2.System.Logger.Simple import PeerConfig import Crypto.Saltine.Core.Box qualified as Encrypt -import Data.Maybe import Control.Monad import Control.Exception import Control.Concurrent.STM @@ -45,140 +46,6 @@ data PeerBrainsDb instance HasCfgKey PeerBrainsDb (Maybe String) where key = "brains" -class HasBrains e a where - - listPolledRefs :: MonadIO m => a -> String -> m [(PubKey 'Sign (Encryption e), Int)] - listPolledRefs _ _ = pure mempty - - isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool - isPolledRef _ _ = pure False - - onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m () - onClientTCPConnected _ _ = const none - - getClientTCP :: MonadIO m => a -> m [(PeerAddr e,Word64)] - getClientTCP = const $ pure mempty - - setActiveTCPSessions :: MonadIO m => a -> [(PeerAddr e, Word64)] -> m () - setActiveTCPSessions _ _ = none - - listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e] - listTCPPexCandidates _ = pure mempty - - onKnownPeers :: MonadIO m => a -> [Peer e] -> m () - onKnownPeers _ _ = none - - onBlockSize :: ( MonadIO m - , IsPeerAddr e m - ) - => a - -> Peer e - -> Hash HbSync - -> Integer - -> m () - onBlockSize _ _ _ _ = none - - onBlockDownloadAttempt :: ( MonadIO m - , IsPeerAddr e m - ) - => a - -> Peer e - -> Hash HbSync - -> m () - - onBlockDownloadAttempt _ _ _ = none - - onBlockDownloaded :: MonadIO m - => a - -> Peer e - -> Hash HbSync - -> m () - - onBlockDownloaded _ _ _ = none - - onBlockPostponed :: MonadIO m - => a - -> Hash HbSync - -> m () - - onBlockPostponed _ _ = none - - claimBlockCameFrom :: MonadIO m - => a - -> Hash HbSync - -> Hash HbSync - -> m () - - claimBlockCameFrom _ _ _ = none - - shouldPostponeBlock :: MonadIO m - => a - -> Hash HbSync - -> m Bool - shouldPostponeBlock _ _ = pure False - - - shouldDownloadBlock :: MonadIO m - => a - -> Peer e - -> Hash HbSync - -> m Bool - shouldDownloadBlock _ _ _ = pure False - - advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e)) - => a - -> Hash HbSync - -> m [PeerAddr e] - advisePeersForBlock _ _ = pure mempty - - blockSize :: forall m . MonadIO m - => a - -> Peer e - -> Hash HbSync - -> m (Maybe Integer) - - blockSize _ _ _ = pure Nothing - - isReflogProcessed :: (MonadIO m) - => a - -> Hash HbSync - -> m Bool - - isReflogProcessed _ _ = pure False - - setReflogProcessed :: (MonadIO m) - => a - -> Hash HbSync - -> m () - - setReflogProcessed _ _ = pure () - - -type NoBrains = () - -instance Pretty (Peer e) => HasBrains e NoBrains where - -data SomeBrains e = forall a . HasBrains e a => SomeBrains a - -instance HasBrains e (SomeBrains e) where - listPolledRefs (SomeBrains a) = listPolledRefs @e a - isPolledRef (SomeBrains a) = isPolledRef @e a - onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a - getClientTCP (SomeBrains a) = getClientTCP @e a - setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a - listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a - onKnownPeers (SomeBrains a) = onKnownPeers a - onBlockSize (SomeBrains a) = onBlockSize a - onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a - onBlockDownloaded (SomeBrains a) = onBlockDownloaded a - onBlockPostponed (SomeBrains a) = onBlockPostponed @e a - claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a - shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a - shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a - advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a - blockSize (SomeBrains a) = blockSize @e a - isReflogProcessed (SomeBrains a) = isReflogProcessed @e a - setReflogProcessed (SomeBrains a) = setReflogProcessed @e a newtype CommitCmd = CommitCmd { onCommited :: IO () } @@ -296,18 +163,45 @@ instance ( Hashable (Peer e) setReflogProcessed b h = do updateOP b $ insertReflogProcessed b h - listPolledRefs brains tp = do + addPolledRef brains r s i = do + + updateOP brains $ do + let conn = view brainsDb brains + liftIO $ execute conn sql (show $ pretty (AsBase58 r), s, i) + where + sql = [qc| + insert into statedb.poll (ref,type,interval) + values (?,?,?) + on conflict do update set interval = excluded.interval + |] + + delPolledRef brains r = do + updateOP brains $ do + let conn = view brainsDb brains + liftIO $ execute conn sql (Only (show $ pretty (AsBase58 r))) + where + sql = [qc| + delete from statedb.poll + where ref = ? + |] + + listPolledRefs brains mtp = do liftIO $ do let conn = view brainsDb brains - query conn [qc|select ref, interval from poll where type = ?|] (Only tp) - <&> fmap (\(r,i) -> (,i) <$> fromStringMay r ) - <&> catMaybes + case mtp of + Nothing -> postprocess <$> + query_ conn [qc|select ref, type, interval from statedb.poll|] + + Just tp -> postprocess <$> + query conn [qc|select ref, type, interval from statedb.poll where type = ?|] (Only tp) + where + postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r ) isPolledRef brains ref = do liftIO do let conn = view brainsDb brains query @_ @(Only Int) conn [qc| - select 1 from poll + select 1 from statedb.poll where ref = ? limit 1 |] ( Only ( show $ pretty (AsBase58 ref) ) ) @@ -707,6 +601,8 @@ newBasicBrains cfg = liftIO do conn <- open brains + debug $ "BRAINS:" <+> "state" <+> pretty stateDb + execute_ conn [qc|ATTACH DATABASE '{stateDb}' as statedb|] execute_ conn [qc| @@ -775,15 +671,14 @@ newBasicBrains cfg = liftIO do |] execute_ conn [qc| - create table if not exists poll + create table if not exists statedb.poll ( ref text not null , type text not null , interval int not null - , primary key (ref,type) + , primary key (ref) ) |] - execute_ conn [qc| create table if not exists peer_asymmkey ( peer text not null @@ -852,7 +747,7 @@ runBasicBrains cfg brains = do updateOP brains $ do let conn = view brainsDb brains liftIO $ execute conn [qc| - insert into poll (ref,type,interval) + insert into statedb.poll (ref,type,interval) values (?,?,?) on conflict do update set interval = excluded.interval |] (show $ pretty (AsBase58 x), show $ pretty t, mi) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index f4f78c49..828accb3 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -77,6 +77,7 @@ import HBS2.Peer.RPC.API.RefChan import RPC2(RPC2Context(..)) import Codec.Serialise as Serialise +import Control.Concurrent (myThreadId) import Control.Concurrent.STM import Control.Exception as Exception import Control.Monad.Reader @@ -86,7 +87,6 @@ import Crypto.Saltine (sodiumInit) import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.Cache qualified as Cache -import Data.Function import Data.List qualified as L import Data.Map (Map) import Data.Map qualified as Map @@ -104,6 +104,7 @@ import System.Mem import System.Metrics import System.Posix.Process import System.Environment +import Prettyprinter.Render.Terminal import UnliftIO.Exception qualified as U -- import UnliftIO.STM @@ -163,7 +164,7 @@ data PeerOpts = , _listenRpc :: Maybe String , _peerCredFile :: Maybe FilePath , _peerConfig :: Maybe FilePath - , _peerRespawn :: Maybe Bool + , _peerRespawn :: Bool } deriving stock (Data) @@ -225,6 +226,7 @@ runCLI = do <> command "refchan" (info pRefChan (progDesc "refchan commands")) <> command "peers" (info pPeers (progDesc "show known peers")) <> command "pexinfo" (info pPexInfo (progDesc "show pex")) + <> command "poll" (info pPoll (progDesc "polling management")) <> command "log" (info pLog (progDesc "set logging level")) -- FIXME: bring-back-dialogue-over-separate-socket -- <> command "dial" (info pDialog (progDesc "dialog commands")) @@ -250,8 +252,10 @@ runCLI = do c <- optional confOpt - resp <- optional $ flag' True ( long "respawn" <> short 'R' <> help "respawn process") + resp <- (optional $ flag' True ( long "no-respawn" <> short 'R' <> help "NO respawn")) + <&> isNothing + -- NOTE: respawn-by-default-now pure $ PeerOpts pref l r k c resp withOpts m g = do @@ -389,6 +393,52 @@ runCLI = do Right Nothing -> exitFailure Right (Just h) -> print (pretty h) >> exitSuccess + + pPoll = hsubparser ( command "list" (info pPollList (progDesc "list current pollers" )) + <> command "add" (info pPollAdd (progDesc "add poller" )) + <> command "del" (info pPollDel (progDesc "del poller" )) + ) + + + pPollAdd = do + rpc <- pRpcCommon + r <- argument refP (metavar "REF") + t <- strArgument (metavar "TYPE") + i <- argument auto (metavar "INTERVAL") + pure $ withMyRPC @PeerAPI rpc $ \caller -> do + callService @RpcPollAdd caller (r, t, i) >>= \case + Left e -> die (show e) + _ -> liftIO do + hPutDoc stdout $ "added poller for" <+> pretty (AsBase58 r) + exitSuccess + + pPollDel = do + rpc <- pRpcCommon + r <- argument refP (metavar "REF") + pure $ withMyRPC @PeerAPI rpc $ \caller -> do + callService @RpcPollDel caller r >>= \case + Left e -> die (show e) + _ -> liftIO do + hPutDoc stdout $ "deleted poller for" <+> pretty (AsBase58 r) + exitSuccess + + pPollList = do + rpc <- pRpcCommon + -- ref <- strArgument ( metavar "REFLOG-KEY" ) + pure $ withMyRPC @PeerAPI rpc $ \caller -> do + void $ runMaybeT do + polls <- toMPlus =<< callService @RpcPollList caller () + forM_ polls $ \(r,what,t) -> do + liftIO $ hPutDoc stdout $ fill 44 (pretty (AsBase58 r)) + -- TODO: align-right + <+> fill 3 (pretty t) + <+> pretty what + <> line + + refP :: ReadM (PubKey 'Sign HBS2Basic) + refP = maybeReader fromStringMay + + myException :: SomeException -> IO () myException e = err ( show e ) @@ -448,10 +498,11 @@ instance ( Monad m response = lift . response - respawn :: PeerOpts -> IO () -respawn opts = case view peerRespawn opts of - Just True -> do +respawn opts = + if not (view peerRespawn opts) then do + exitFailure + else do let secs = 5 notice $ "RESPAWNING in" <+> viaShow secs <> "s" pause @'Seconds secs @@ -460,19 +511,19 @@ respawn opts = case view peerRespawn opts of print (self, args) executeFile self False args Nothing - _ -> exitFailure - runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) , s ~ Encryption e , HasStorage (PeerM e IO) )=> PeerOpts -> IO () -runPeer opts = Exception.handle (\e -> myException e +runPeer opts = U.handle (\e -> myException e >> performGC >> respawn opts ) $ runResourceT do + threadSelf <- liftIO myThreadId + metrics <- liftIO newStore xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString @@ -847,10 +898,14 @@ runPeer opts = Exception.handle (\e -> myException e let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do withPeerM env mx - `U.withException` \e -> case (fromException e) of + `U.withException` \e -> case fromException e of Just (e' :: AsyncCancelled) -> pure () - Nothing -> err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) + Nothing -> do + err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) + throwM e -- threadSelf (SomeException e) + debug $ "peerThread Finished:" <+> t + workers <- W.execWriterT do peerThread "local multicast" $ forever $ do @@ -858,8 +913,6 @@ runPeer opts = Exception.handle (\e -> myException e debug "sending local peer announce" request localMulticast (PeerAnnounce @e pnonce) - -- peerThread "tcpWorker" (tcpWorker conf) - peerThread "httpWorker" (httpWorker conf peerMeta denv) peerThread "checkMetrics" (checkMetrics metrics) @@ -891,7 +944,7 @@ runPeer opts = Exception.handle (\e -> myException e peerThread "downloadQueue" (downloadQueue conf denv) - peerThread "reflogWorker" (reflogWorker @e conf rwa) + peerThread "reflogWorker" (reflogWorker @e conf (SomeBrains brains) rwa) peerThread "refChanWorker" (refChanWorker @e rce (SomeBrains brains)) @@ -997,6 +1050,7 @@ runPeer opts = Exception.handle (\e -> myException e , rpcPeerEnv = penv , rpcLocalMultiCast = localMulticast , rpcStorage = AnyStorage s + , rpcBrains = SomeBrains brains , rpcDoFetch = liftIO . fetchHash penv denv , rpcDoRefChanHeadPost = refChanHeadPostAction , rpcDoRefChanPropose = refChanProposeAction diff --git a/hbs2-peer/app/RPC2/Peer.hs b/hbs2-peer/app/RPC2/Peer.hs index 2174aa5c..6f962ea7 100644 --- a/hbs2-peer/app/RPC2/Peer.hs +++ b/hbs2-peer/app/RPC2/Peer.hs @@ -1,8 +1,7 @@ module RPC2.Peer ( module RPC2.Peer , module HBS2.Peer.RPC.API.Peer - , module RPC2.LogLevel - -- , SetLogging(..) + -- , module RPC2.LogLevel ) where import HBS2.Peer.RPC.API.Peer @@ -15,6 +14,6 @@ import RPC2.Poke() import RPC2.RefLog() import RPC2.RefChan() import RPC2.Die() -import RPC2.LogLevel --- import RPC2.LogLevel(SetLogging(..)) +import RPC2.LogLevel() +import RPC2.Poll() diff --git a/hbs2-peer/app/RPC2/Poll.hs b/hbs2-peer/app/RPC2/Poll.hs new file mode 100644 index 00000000..839c174e --- /dev/null +++ b/hbs2-peer/app/RPC2/Poll.hs @@ -0,0 +1,35 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language UndecidableInstances #-} +module RPC2.Poll where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Service +import HBS2.Peer.Brains +import HBS2.System.Logger.Simple + +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.Internal.Types + +instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollList where + + handleMethod _ = do + brains <- getRpcContext @PeerAPI <&> rpcBrains + debug $ "rpc.pollList" + listPolledRefs @L4Proto brains Nothing + + +instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollAdd where + + handleMethod (r,t,i) = do + brains <- getRpcContext @PeerAPI <&> rpcBrains + debug $ "rpc.pollAdd" + addPolledRef @L4Proto brains r t i + +instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollDel where + + handleMethod r = do + brains <- getRpcContext @PeerAPI <&> rpcBrains + debug $ "rpc.pollDel" + delPolledRef @L4Proto brains r + + diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 4d958e12..177fc499 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -42,7 +42,6 @@ import PeerConfig import BlockDownload import Brains -import Data.Dynamic import Codec.Serialise import Control.Concurrent.STM (flushTQueue) import Control.Exception () @@ -51,19 +50,14 @@ import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.Cache (Cache) import Data.Cache qualified as Cache -import Data.ByteString (ByteString) -import Data.Either import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.HashSet qualified as HashSet import Data.Heap () -import Data.Coerce --- import Data.Heap qualified as Heap import Data.List qualified as List import Data.Maybe import Data.Text qualified as Text import Lens.Micro.Platform -import Data.Generics.Product import UnliftIO import Streaming.Prelude qualified as S @@ -662,7 +656,9 @@ refChanWorker env brains = do refChanPoll = do - let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) + let listRefs = listPolledRefs @e brains (Just "refchan") + <&> fmap (\(a,_,b) -> (a,b)) + <&> fmap (over _2 ( (*60) . fromIntegral) ) polling (Polling 5 5) listRefs $ \ref -> do debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index 30055d32..94174f09 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -21,10 +21,10 @@ import HBS2.Merkle import HBS2.System.Logger.Simple +import Brains import PeerConfig import PeerTypes -import Data.Functor import Data.Function(fix) import Data.Maybe import Data.Foldable(for_) @@ -115,10 +115,11 @@ reflogWorker :: forall e s m . ( MonadIO m, MyPeer e , Pretty (AsBase58 (PubKey 'Sign s)) ) => PeerConfig + -> SomeBrains e -> RefLogWorkerAdapter e -> m () -reflogWorker conf adapter = do +reflogWorker conf brains adapter = do sto <- getStorage @@ -201,34 +202,14 @@ reflogWorker conf adapter = do let (PeerConfig syn) = conf - let mkRef = fromStringMay . Text.unpack :: (Text -> Maybe (PubKey 'Sign s)) + poller <- liftIO $ async do + let listRefs = listPolledRefs @e brains (Just "reflog") + <&> fmap (\(a,_,b) -> (a,b)) + <&> fmap (over _2 ( (*60) . fromIntegral) ) - let defPoll = lastDef 10 [ x - | ListVal @C (Key "poll-default" [SymbolVal "reflog", LitIntVal x]) <- syn - ] - - let polls = HashMap.fromListWith min $ catMaybes ( - [ (,x) <$> mkRef ref - | ListVal @C (Key "poll" [SymbolVal "reflog", LitIntVal x, LitStrVal ref]) <- syn - ] - <> - [ (,defPoll) <$> mkRef ref - | ListVal @C (Key "subscribe" [SymbolVal "reflog", LitStrVal ref]) <- syn - ] ) - - let pollIntervals = HashMap.fromListWith (<>) [ (i, [r]) | (r,i) <- HashMap.toList polls ] - & HashMap.toList - - - pollers' <- liftIO $ async $ do - pause @'Seconds 10 - forM pollIntervals $ \(i,refs) -> liftIO do - async $ forever $ do - for_ refs $ \r -> do - trace $ "POLL REFERENCE" <+> pretty (AsBase58 r) <+> pretty i <> "m" - reflogFetch adapter r - - pause (fromIntegral i :: Timeout 'Minutes) + polling (Polling 5 5) listRefs $ \ref -> do + debug $ "POLLING REFLOG" <+> pretty (AsBase58 ref) + reflogFetch adapter ref w1 <- liftIO $ async $ forever $ replicateConcurrently_ 4 do @@ -275,8 +256,7 @@ reflogWorker conf adapter = do -- trace "I'm a reflog update worker" - pollers <- liftIO $ wait pollers' - void $ liftIO $ waitAnyCatchCancel $ w1 : pollers + void $ liftIO $ waitAnyCatchCancel [w1, poller] where diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 527989c1..16660132 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -38,6 +38,7 @@ common common-deps , network-multicast , optparse-applicative , prettyprinter + , prettyprinter-ansi-terminal , random , random-shuffle , resourcet @@ -132,6 +133,7 @@ library default-language: Haskell2010 exposed-modules: + HBS2.Peer.Brains HBS2.Peer.RPC.Class HBS2.Peer.RPC.API.Peer HBS2.Peer.RPC.API.RefLog @@ -176,6 +178,7 @@ executable hbs2-peer , RPC2.Peers , RPC2.PexInfo , RPC2.Ping + , RPC2.Poll , RPC2.RefLog , RPC2.RefChan , PeerTypes diff --git a/hbs2-peer/lib/HBS2/Peer/Brains.hs b/hbs2-peer/lib/HBS2/Peer/Brains.hs new file mode 100644 index 00000000..9b9e3b03 --- /dev/null +++ b/hbs2-peer/lib/HBS2/Peer/Brains.hs @@ -0,0 +1,162 @@ +{-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} +module HBS2.Peer.Brains where + + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Hash + +import Data.Word + +-- TODO: rename +class HasBrains e a where + + listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)] + listPolledRefs _ _ = pure mempty + + isPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m Bool + isPolledRef _ _ = pure False + + delPolledRef :: MonadIO m => a -> PubKey 'Sign (Encryption e) -> m () + delPolledRef _ _ = pure () + + addPolledRef :: MonadIO m + => a + -> PubKey 'Sign (Encryption e) + -> String + -> Int + -> m () + + addPolledRef _ _ _ _ = pure () + + onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m () + onClientTCPConnected _ _ = const none + + getClientTCP :: MonadIO m => a -> m [(PeerAddr e,Word64)] + getClientTCP = const $ pure mempty + + setActiveTCPSessions :: MonadIO m => a -> [(PeerAddr e, Word64)] -> m () + setActiveTCPSessions _ _ = none + + listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e] + listTCPPexCandidates _ = pure mempty + + onKnownPeers :: MonadIO m => a -> [Peer e] -> m () + onKnownPeers _ _ = none + + onBlockSize :: ( MonadIO m + , IsPeerAddr e m + ) + => a + -> Peer e + -> Hash HbSync + -> Integer + -> m () + onBlockSize _ _ _ _ = none + + onBlockDownloadAttempt :: ( MonadIO m + , IsPeerAddr e m + ) + => a + -> Peer e + -> Hash HbSync + -> m () + + onBlockDownloadAttempt _ _ _ = none + + onBlockDownloaded :: MonadIO m + => a + -> Peer e + -> Hash HbSync + -> m () + + onBlockDownloaded _ _ _ = none + + onBlockPostponed :: MonadIO m + => a + -> Hash HbSync + -> m () + + onBlockPostponed _ _ = none + + claimBlockCameFrom :: MonadIO m + => a + -> Hash HbSync + -> Hash HbSync + -> m () + + claimBlockCameFrom _ _ _ = none + + shouldPostponeBlock :: MonadIO m + => a + -> Hash HbSync + -> m Bool + shouldPostponeBlock _ _ = pure False + + + shouldDownloadBlock :: MonadIO m + => a + -> Peer e + -> Hash HbSync + -> m Bool + shouldDownloadBlock _ _ _ = pure False + + advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e)) + => a + -> Hash HbSync + -> m [PeerAddr e] + advisePeersForBlock _ _ = pure mempty + + blockSize :: forall m . MonadIO m + => a + -> Peer e + -> Hash HbSync + -> m (Maybe Integer) + + blockSize _ _ _ = pure Nothing + + isReflogProcessed :: (MonadIO m) + => a + -> Hash HbSync + -> m Bool + + isReflogProcessed _ _ = pure False + + setReflogProcessed :: (MonadIO m) + => a + -> Hash HbSync + -> m () + + setReflogProcessed _ _ = pure () + + +type NoBrains = () + +instance Pretty (Peer e) => HasBrains e NoBrains where + +data SomeBrains e = forall a . HasBrains e a => SomeBrains a + +instance HasBrains e (SomeBrains e) where + listPolledRefs (SomeBrains a) = listPolledRefs @e a + isPolledRef (SomeBrains a) = isPolledRef @e a + delPolledRef (SomeBrains a) = delPolledRef @e a + addPolledRef (SomeBrains a) = addPolledRef @e a + onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a + getClientTCP (SomeBrains a) = getClientTCP @e a + setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a + listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a + onKnownPeers (SomeBrains a) = onKnownPeers a + onBlockSize (SomeBrains a) = onBlockSize a + onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a + onBlockDownloaded (SomeBrains a) = onBlockDownloaded a + onBlockPostponed (SomeBrains a) = onBlockPostponed @e a + claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a + shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a + shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a + advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a + blockSize (SomeBrains a) = blockSize @e a + isReflogProcessed (SomeBrains a) = isReflogProcessed @e a + setReflogProcessed (SomeBrains a) = setReflogProcessed @e a + + diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs index 112770ac..438c84aa 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs @@ -22,6 +22,10 @@ data RpcFetch data RpcLogLevel data RpcDie +data RpcPollList +data RpcPollAdd +data RpcPollDel + type PeerAPI = '[ RpcPoke , RpcPing , RpcAnnounce @@ -30,6 +34,9 @@ type PeerAPI = '[ RpcPoke , RpcPexInfo , RpcLogLevel , RpcDie + , RpcPollList + , RpcPollAdd + , RpcPollDel ] instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where @@ -64,6 +71,15 @@ type instance Output RpcPeers = [(PubKey 'Sign HBS2Basic, PeerAddr L4Proto)] type instance Input RpcFetch = HashRef type instance Output RpcFetch = () +type instance Input RpcPollList= () +type instance Output RpcPollList = [(PubKey 'Sign HBS2Basic, String, Int)] + +type instance Input RpcPollAdd = (PubKey 'Sign HBS2Basic, String, Int) +type instance Output RpcPollAdd = () + +type instance Input RpcPollDel = PubKey 'Sign HBS2Basic +type instance Output RpcPollDel = () + type instance Input RpcLogLevel = SetLogging type instance Output RpcLogLevel = () diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs index 2cb8321e..0cf6496f 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs @@ -11,6 +11,7 @@ import HBS2.Data.Types.SignedBox import HBS2.Net.Messaging.Unix import HBS2.Net.Proto.Service import HBS2.Peer.RPC.Class +import HBS2.Peer.Brains import Data.Config.Suckless.Syntax import Data.Config.Suckless.Parse @@ -28,6 +29,7 @@ data RPC2Context = , rpcPeerEnv :: PeerEnv L4Proto , rpcLocalMultiCast :: Peer L4Proto , rpcStorage :: AnyStorage + , rpcBrains :: SomeBrains L4Proto , rpcDoFetch :: HashRef -> IO () , rpcDoRefChanHeadPost :: HashRef -> IO () , rpcDoRefChanPropose :: (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) -> IO ()