diff --git a/examples/refchan-qblf/app/RefChanQBLFMain.hs b/examples/refchan-qblf/app/RefChanQBLFMain.hs index 938920c0..77b28000 100644 --- a/examples/refchan-qblf/app/RefChanQBLFMain.hs +++ b/examples/refchan-qblf/app/RefChanQBLFMain.hs @@ -388,9 +388,6 @@ instance (ForConsensus m, MonadUnliftIO m) => IsQBLF ConsensusQBLF (App m) wher pure new -instance (HasConf (ReaderT Config IO)) where - getConf = ask - instance HasStorage (ReaderT AnyStorage IO) where getStorage = ask diff --git a/flake.lock b/flake.lock index 1188bc8e..cba1b7bc 100644 --- a/flake.lock +++ b/flake.lock @@ -295,11 +295,11 @@ ] }, "locked": { - "lastModified": 1695116151, - "narHash": "sha256-AjjfTL41SRZFy9HjQ6XKvS9kjfplkJKBIkcBvi1mKkc=", + "lastModified": 1696297671, + "narHash": "sha256-jPWuqQlXKRnkU2A19nwtzDHI6bnICzFwDffx2qj/sCM=", "owner": "voidlizard", "repo": "suckless-conf", - "rev": "eef15613402380b9b67c68a0e8a22a71250daa98", + "rev": "a0919addd3f43b7cfddb6c35568495b4a295f1f2", "type": "github" }, "original": { diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 60dff981..1199babf 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -26,6 +26,8 @@ import HBS2.Prelude.Plated import HBS2.Storage import HBS2.System.Logger.Simple +import Data.Config.Suckless.KeyValue (HasConf(..)) + import Control.Applicative import Control.Monad.Trans.Maybe import Control.Concurrent.Async @@ -58,14 +60,6 @@ import Prettyprinter hiding (pipe) data AnyMessage enc e = AnyMessage !Integer !(Encoded e) deriving stock (Generic) -class Monad m => HasOwnPeer e m where - ownPeer :: m (Peer e) - - -data Fabriq e = forall bus . (Messaging bus e (Encoded e)) => Fabriq bus - -class HasFabriq e m where - getFabriq :: m (Fabriq e) class ( Messaging (Fabriq e) e (AnyMessage (Encoded e) e) , Eq (Encoded e) @@ -199,6 +193,9 @@ runResponseM :: forall e m a . (Monad m) runResponseM peer f = runReaderT (fromResponse f) (ResponseEnv peer) +instance HasConf m => HasConf (ResponseM e m) where + getConf = lift getConf + instance Monad m => HasOwnPeer e (PeerM e m) where ownPeer = asks (view envSelf) diff --git a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs index 0b865f95..df63c8f3 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs @@ -4,6 +4,7 @@ module HBS2.Actors.Peer.Types where import HBS2.Prelude import HBS2.Storage import HBS2.Net.Proto.Types +import HBS2.Net.Messaging import HBS2.Hash import Control.Monad.Trans.Class @@ -21,6 +22,8 @@ instance {-# OVERLAPPABLE #-} -- liftIO $ print "LIMIT DOES NOT WORK" -- pure True +-- instance HasConf m => HasConf (ResponseM e m) + instance (IsKey HbSync) => Storage AnyStorage HbSync ByteString IO where putBlock (AnyStorage s) = putBlock s @@ -48,3 +51,13 @@ class (Monad m, HasProtocol e p) => HasGossip e p m where gossip :: p -> m () +class Monad m => HasOwnPeer e m where + ownPeer :: m (Peer e) + + +data Fabriq e = forall bus . (Messaging bus e (Encoded e)) => Fabriq bus + +class HasFabriq e m where + getFabriq :: m (Fabriq e) + + diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index 60ef43ca..52bd973b 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -1,8 +1,10 @@ {-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} module HBS2.Net.Messaging.Unix where import HBS2.Prelude.Plated import HBS2.Net.Proto.Types +import HBS2.Actors.Peer.Types import HBS2.Net.Messaging import HBS2.Clock @@ -10,6 +12,7 @@ import HBS2.System.Logger.Simple import Control.Monad.Trans.Resource import Control.Monad +import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.Function @@ -151,7 +154,6 @@ runMessagingUnix env = do run <- async $ forever $ runResourceT do (so, sa) <- liftIO $ accept sock - -- FIXME: fixing-unix-sockets -- Вот тут: нумеруем клиентов, в PeerAddr ставим -- строку или номер. @@ -163,8 +165,6 @@ runMessagingUnix env = do withSession do - ti <- liftIO myThreadId - let that = msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum) void $ allocate ( createQueues env that ) dropQueuesFor @@ -323,9 +323,13 @@ instance Messaging MessagingUnix UNIX ByteString where atomically $ writeTQueue q msg receive bus _ = liftIO do + let q = msgUnixRecv bus atomically $ peekTQueue q >> flushTQueue q +instance (Monad m, Messaging MessagingUnix UNIX (Encoded UNIX)) => HasFabriq UNIX (ReaderT MessagingUnix m) where + getFabriq = asks Fabriq - +instance Monad m => HasOwnPeer UNIX (ReaderT MessagingUnix m) where + ownPeer = asks msgUnixSelf diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 35b9b6c3..0def4283 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -6,6 +6,8 @@ module HBS2.Net.Proto.Definition ) where +-- FIXME: move-module-to-hbs2-peer + import HBS2.Clock import HBS2.Defaults import HBS2.Hash @@ -23,6 +25,7 @@ import HBS2.Net.Proto.PeerExchange import HBS2.Net.Proto.PeerMeta import HBS2.Net.Proto.RefLog import HBS2.Net.Proto.RefChan +import HBS2.Net.Proto.Service import HBS2.Net.Messaging.Unix (UNIX) import HBS2.Prelude.Plated @@ -196,7 +199,6 @@ instance HasProtocol L4Proto (DialResp L4Proto) where decode = dialRespDecode . BSL.toStrict encode = BSL.fromStrict . dialRespEncode - instance Serialise (RefChanValidate UNIX) => HasProtocol UNIX (RefChanValidate UNIX) where type instance ProtocolId (RefChanValidate UNIX) = 0xFFFA0001 type instance Encoded UNIX = ByteString @@ -217,6 +219,7 @@ instance MonadIO m => HasNonces (RefChanValidate UNIX) m where n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) pure $ BS.take 8 n + instance HasTimeLimits UNIX (RefChanValidate UNIX) IO where tryLockForPeriod _ _ = pure True diff --git a/hbs2-core/lib/HBS2/Net/Proto/Service.hs b/hbs2-core/lib/HBS2/Net/Proto/Service.hs index eb04ba03..51d81de8 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Service.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Service.hs @@ -9,6 +9,8 @@ import HBS2.Net.Messaging.Unix import HBS2.Net.Proto import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple + import Codec.Serialise import Control.Monad.Reader import Control.Monad.Trans.Resource @@ -121,16 +123,23 @@ makeRequestR input = do idx = findMethodIndex @method @api +runWithContext :: r -> ReaderT r m a -> m a +runWithContext co m = runReaderT m co + makeServer :: forall api e m . ( MonadIO m , EnumAll api (Int, SomeHandler m) m , Response e (ServiceProto api e) m , HasProtocol e (ServiceProto api e) + , HasDeferred e (ServiceProto api e) m , Pretty (Peer e) ) => ServiceProto api e -> m () -makeServer msg = dispatch @api @e msg >>= response +makeServer msg = do + deferred proxy $ dispatch @api @e msg >>= response + where + proxy = Proxy @(ServiceProto api e) data ServiceCaller api e = ServiceCaller @@ -225,3 +234,7 @@ makeClient :: forall api e m . ( MonadIO m makeClient = notifyServiceCaller + +instance (HasProtocol e (ServiceProto api e)) => HasTimeLimits e (ServiceProto api e) IO where + tryLockForPeriod _ _ = pure True + diff --git a/hbs2-core/test/Main.hs b/hbs2-core/test/Main.hs index 8f6b31ce..832a7606 100644 --- a/hbs2-core/test/Main.hs +++ b/hbs2-core/test/Main.hs @@ -3,9 +3,6 @@ module Main where import TestFakeMessaging import TestActors import DialogSpec -import PrototypeGenericService --- import TestUniqProtoId --- import TestCrypto import Test.Tasty import Test.Tasty.HUnit @@ -18,7 +15,6 @@ main = testCase "testFakeMessaging1" testFakeMessaging1 , testCase "testActorsBasic" testActorsBasic , testDialog - , testCase "protoGenericService" protoGenericService ] diff --git a/hbs2-peer/app/CLI/RefChan.hs b/hbs2-peer/app/CLI/RefChan.hs index 68404e10..5ab4cc26 100644 --- a/hbs2-peer/app/CLI/RefChan.hs +++ b/hbs2-peer/app/CLI/RefChan.hs @@ -6,19 +6,23 @@ import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition() import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Types +import HBS2.Net.Messaging.Unix import HBS2.Data.Types.SignedBox import HBS2.OrDie -import RPC +-- FIXME: to-remove-old-rpc +import RPC2 +import RPC2.API +import RPC2.Service.Unix import Options.Applicative import Data.ByteString qualified as BS import Data.ByteString.Lazy qualified as LBS -import Data.ByteString.Lazy (ByteString) import Lens.Micro.Platform import Codec.Serialise import Data.Maybe +import System.Exit pRefChan :: Parser (IO ()) pRefChan = hsubparser ( command "head" (info pRefChanHead (progDesc "head commands" )) @@ -78,27 +82,29 @@ pRefChanHeadPost :: Parser (IO ()) pRefChanHeadPost = do opts <- pRpcCommon ref <- strArgument (metavar "HEAD-BLOCK-TREE-HASH") - pure $ do + pure $ withRPC2 @UNIX opts $ \caller -> do href <- pure (fromStringMay ref) `orDie` "HEAD-BLOCK-TREE-HASH" - runRpcCommand opts (REFCHANHEADSEND href) + -- FIXME: proper-error-handling + void $ callService @RpcRefChanHeadPost caller href pRefChanHeadFetch :: Parser (IO ()) pRefChanHeadFetch = do opts <- pRpcCommon - ref <- strArgument (metavar "REFCHAH-HEAD-REF") - pure $ do + ref <- strArgument (metavar "REFCHAH-HEAD-KEY") + pure $ withRPC2 @UNIX opts $ \caller -> do href <- pure (fromStringMay ref) `orDie` "invalid REFCHAN-HEAD-REF" - runRpcCommand opts (REFCHANHEADFETCH href) - + void $ callService @RpcRefChanHeadFetch caller href pRefChanHeadGet :: Parser (IO ()) pRefChanHeadGet = do - opts <- pRpcCommon - ref <- strArgument (metavar "REFCHAH-HEAD-REF") - pure do + rpc <- pRpcCommon + ref <- strArgument (metavar "REFCHAH-HEAD-KEY") + pure $ withRPC2 @UNIX rpc $ \caller -> do href <- pure (fromStringMay ref) `orDie` "invalid REFCHAN-HEAD-REF" - runRpcCommand opts (REFCHANHEADGET href) - + callService @RpcRefChanHeadGet caller href >>= \case + Left{} -> exitFailure + Right Nothing -> exitFailure + Right (Just h) -> print (pretty h) >> exitSuccess pRefChanPropose :: Parser (IO ()) pRefChanPropose = do @@ -106,8 +112,8 @@ pRefChanPropose = do kra <- strOption (long "author" <> short 'a' <> help "author credentials") fn <- optional $ strOption (long "file" <> short 'f' <> help "file") dry <- optional (flag' True (long "dry" <> short 'n' <> help "only dump transaction")) <&> fromMaybe False - sref <- strArgument (metavar "REFCHAH-REF") - pure do + sref <- strArgument (metavar "REFCHAH-KEY") + pure $ withRPC2 @UNIX opts $ \caller -> do sc <- BS.readFile kra puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile sc)) `orDie` "bad keyring file" @@ -119,7 +125,8 @@ pRefChanPropose = do if dry then do LBS.putStr (serialise box) else do - runRpcCommand opts (REFCHANPROPOSE (puk, serialise box)) + -- FIXME: proper-error-handling + void $ callService @RpcRefChanPropose caller (puk, box) pRefChanNotify :: Parser (IO ()) pRefChanNotify = do @@ -127,32 +134,31 @@ pRefChanNotify = do kra <- strOption (long "author" <> short 'a' <> help "author credentials") fn <- optional $ strOption (long "file" <> short 'f' <> help "file") sref <- strArgument (metavar "REFCHAH-REF") - pure do + pure $ withRPC2 @UNIX opts $ \caller -> do sc <- BS.readFile kra puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile sc)) `orDie` "bad keyring file" - lbs <- maybe1 fn LBS.getContents LBS.readFile - let box = makeSignedBox @L4Proto @BS.ByteString (view peerSignPk creds) (view peerSignSk creds) (LBS.toStrict lbs) - - runRpcCommand opts (REFCHANNOTIFY (puk, serialise box)) - + void $ callService @RpcRefChanNotify caller (puk, box) pRefChanGet :: Parser (IO ()) pRefChanGet = do opts <- pRpcCommon - sref <- strArgument (metavar "REFCHAH-REF") - pure do + sref <- strArgument (metavar "REFCHAH-KEY") + pure $ withRPC2 @UNIX opts $ \caller -> do puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" - runRpcCommand opts (REFCHANGET puk) + callService @RpcRefChanGet caller puk >>= \case + Left{} -> exitFailure + Right Nothing -> exitFailure + Right (Just h) -> print (pretty h) >> exitSuccess pRefChanFetch :: Parser (IO ()) pRefChanFetch = do opts <- pRpcCommon - sref <- strArgument (metavar "REFCHAH-REF") - pure do - puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" - runRpcCommand opts (REFCHANFETCH puk) + ref <- strArgument (metavar "REFCHAH-KEY") + pure $ withRPC2 @UNIX opts $ \caller -> do + href <- pure (fromStringMay ref) `orDie` "invalid REFCHAN-HEAD-REF" + void $ callService @RpcRefChanFetch caller href diff --git a/hbs2-peer/app/CheckBlockAnnounce.hs b/hbs2-peer/app/CheckBlockAnnounce.hs new file mode 100644 index 00000000..c2d45d6d --- /dev/null +++ b/hbs2-peer/app/CheckBlockAnnounce.hs @@ -0,0 +1,110 @@ +{-# Language MultiWayIf #-} +module CheckBlockAnnounce where + +import HBS2.Prelude.Plated +import HBS2.Actors.Peer +import HBS2.Base58 +import HBS2.Data.Types.Peer +import HBS2.Hash +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Net.Proto.Types + +import PeerTypes +import PeerConfig +import CheckPeer (peerBanned) +import BlockDownload +import DownloadQ + +import HBS2.System.Logger.Simple + +import Data.Set qualified as Set +import Data.Set (Set) +import Lens.Micro.Platform +import Data.Text qualified as Text +import Data.Maybe + +data PeerAcceptAnnounceKey + +data AcceptAnnounce = AcceptAnnounceAll + | AcceptAnnounceFrom (Set (PubKey 'Sign (Encryption L4Proto))) + +instance Pretty AcceptAnnounce where + pretty = \case + AcceptAnnounceAll -> parens ("accept-announce" <+> "*") + + -- FIXME: better-pretty-for-AcceptAnnounceFrom + AcceptAnnounceFrom xs -> parens ("accept-announce" <+> pretty (fmap AsBase58 (Set.toList xs))) + + + +instance HasCfgKey PeerAcceptAnnounceKey AcceptAnnounce where + key = "accept-block-announce" + +instance HasCfgValue PeerAcceptAnnounceKey AcceptAnnounce where + cfgValue (PeerConfig syn) = fromMaybe (AcceptAnnounceFrom lst) fromAll + where + fromAll = headMay [ AcceptAnnounceAll | ListVal @C (Key s [SymbolVal "*"]) <- syn, s == kk ] + lst = Set.fromList $ + catMaybes [ fromStringMay @(PubKey 'Sign (Encryption L4Proto)) (Text.unpack e) + | ListVal @C (Key s [LitStrVal e]) <- syn, s == kk + ] + kk = key @PeerAcceptAnnounceKey @AcceptAnnounce + + +checkBlockAnnounce :: forall e m . ( e ~ L4Proto + , m ~ PeerM e IO + ) + => PeerConfig + -> DownloadEnv e + -> PeerNonce + -> PeerAddr e + -> Hash HbSync + -> m () + +checkBlockAnnounce conf denv nonce pa h = do + + let accptAnn = cfgValue @PeerAcceptAnnounceKey conf :: AcceptAnnounce + + let acceptAnnounce p pd = do + case accptAnn of + AcceptAnnounceAll -> pure True + AcceptAnnounceFrom s -> pure $ view peerSignKey pd `Set.member` s + + pip <- fromPeerAddr @e pa + + n1 <- peerNonce @e + + unless (nonce == n1) do + + mpde <- find @e (KnownPeerKey pip) id + + debug $ "received announce from" + <+> pretty pip + <+> pretty h + + case mpde of + Nothing -> do + sendPing @e pip + -- TODO: enqueue-announce-from-unknown-peer? + + Just pd -> do + + banned <- peerBanned conf pd + + notAccepted <- acceptAnnounce pip pd <&> not + + if | banned -> do + + notice $ pretty pip <+> "banned" + + | notAccepted -> do + + debug $ pretty pip <+> "announce-not-accepted" + + | otherwise -> do + + downloadLogAppend @e h + withDownload denv $ do + processBlock h + diff --git a/hbs2-peer/app/CheckPeer.hs b/hbs2-peer/app/CheckPeer.hs new file mode 100644 index 00000000..4e6fca4f --- /dev/null +++ b/hbs2-peer/app/CheckPeer.hs @@ -0,0 +1,43 @@ +module CheckPeer where + +import HBS2.Prelude.Plated +import HBS2.Data.Types.Peer +import HBS2.Net.Proto.Types + +import PeerTypes +import PeerConfig + +import Data.Set qualified as Set +import Data.Set (Set) +import Lens.Micro.Platform + + +data PeerBlackListKey +data PeerWhiteListKey + +instance HasCfgKey PeerBlackListKey (Set String) where + key = "blacklist" + +instance HasCfgKey PeerWhiteListKey (Set String) where + key = "whitelist" + +peerBanned :: forall e m . ( Monad m, FromStringMaybe (PubKey 'Sign (Encryption e)) + , Ord (PubKey 'Sign (Encryption e)) + ) + => PeerConfig + -> PeerData e -> m Bool + +peerBanned conf pd = do + + let bls = cfgValue @PeerBlackListKey conf :: Set String + let whs = cfgValue @PeerWhiteListKey conf :: Set String + let blkeys = toKeys bls + let wlkeys = toKeys (whs `Set.difference` bls) + + + let k = view peerSignKey pd + let blacklisted = k `Set.member` blkeys + let whitelisted = Set.null wlkeys || (k `Set.member` wlkeys) + pure $ blacklisted || not whitelisted + + diff --git a/hbs2-peer/app/Fetch.hs b/hbs2-peer/app/Fetch.hs new file mode 100644 index 00000000..c849b3c2 --- /dev/null +++ b/hbs2-peer/app/Fetch.hs @@ -0,0 +1,27 @@ +module Fetch where + +import HBS2.Prelude +import HBS2.Actors.Peer +import HBS2.Data.Types.Refs +import HBS2.Net.Proto.Types + +import HBS2.System.Logger.Simple + +import PeerTypes +import DownloadQ +import BlockDownload + +fetch :: forall e m . (e ~ L4Proto, MonadIO m) + => PeerEnv e + -> DownloadEnv e + -> HashRef + -> m () + +fetch penv denv href = do + debug $ "fetchAction" <+> pretty h + liftIO $ withPeerM penv $ do + downloadLogAppend @e h + withDownload denv (processBlock h) + where + h = fromHashRef href + diff --git a/hbs2-peer/app/Log.hs b/hbs2-peer/app/Log.hs new file mode 100644 index 00000000..650145f0 --- /dev/null +++ b/hbs2-peer/app/Log.hs @@ -0,0 +1,18 @@ +module Log where + +import HBS2.System.Logger.Simple + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[notice] " diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index c4f55ec0..2fa064f3 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -9,17 +9,15 @@ module PeerConfig import HBS2.Prelude.Plated import HBS2.System.Logger.Simple -import HBS2.Base58 import Data.Config.Suckless.Syntax import Data.Config.Suckless.Parse +import Data.Config.Suckless.KeyValue(HasConf(..)) import Control.Exception -import Data.Either +import Control.Monad.Reader import Data.Functor -import Data.Kind import Data.Maybe -import Prettyprinter import System.Directory import System.FilePath import Data.Set qualified as Set @@ -31,10 +29,12 @@ data FeatureSwitch = FeatureOn | FeatureOff deriving (Eq,Ord,Show,Generic) +-- FIXME: ASAP-switch-to-Suckless-KeyValue-1 class HasCfgKey a b where -- type family CfgValue a :: Type key :: Id +-- FIXME: ASAP-switch-to-Suckless-KeyValue-2 class HasCfgKey a b => HasCfgValue a b where cfgValue :: PeerConfig -> b @@ -49,6 +49,9 @@ data PeerHttpPortKey data PeerTcpProbeWaitKey data PeerUseHttpDownload +instance Monad m => HasConf (ReaderT PeerConfig m) where + getConf = asks (\(PeerConfig syn) -> syn) + instance HasCfgKey PeerListenTCPKey (Maybe String) where key = "listen-tcp" @@ -73,7 +76,7 @@ cfgName :: FilePath cfgName = "config" newtype PeerConfig = - PeerConfig [Syntax C] + PeerConfig { fromPeerConfig :: [Syntax C] } deriving newtype (Monoid, Semigroup, Pretty) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 52a7e9f5..1eb3a30d 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -34,18 +34,24 @@ import HBS2.Net.Proto.PeerMeta import HBS2.Net.Proto.RefLog import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Sessions +import HBS2.Net.Proto.Service +import HBS2.Net.Messaging.Unix (UNIX,newMessagingUnix,runMessagingUnix) import HBS2.OrDie import HBS2.Storage.Simple import HBS2.Data.Detect import HBS2.System.Logger.Simple hiding (info) -import HBS2.System.Logger.Simple qualified as Log + +-- FIXME: move-to-peer-config-eventually +import Data.Config.Suckless.KeyValue(HasConf(..)) import Brains -import RPC +import RPC2 import PeerTypes import BlockDownload import BlockHttpDownload +import CheckBlockAnnounce (checkBlockAnnounce) +import CheckPeer (peerBanned) import DownloadQ import PeerInfo import PeerConfig @@ -56,22 +62,23 @@ import RefLog qualified import RefLog (reflogWorker) import HttpWorker import ProxyMessaging -import PeerMain.DialogCliCommand -import PeerMain.Dialog.Server -import PeerMain.Dialog.Spec +-- import PeerMain.DialogCliCommand +-- import PeerMain.Dialog.Server import PeerMeta import CLI.RefChan import RefChan +import Log + +import RPC2.Service.Unix as RPC2 +import RPC2.API import Codec.Serialise as Serialise --- import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception as Exception import Control.Monad.Reader import Control.Monad.Trans.Maybe import Control.Monad.Trans.Writer.CPS qualified as W import Crypto.Saltine (sodiumInit) -import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.Cache qualified as Cache @@ -82,12 +89,8 @@ import Data.Map qualified as Map import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) -import Data.Text.Encoding qualified as TE import Data.Text qualified as Text -import Data.Text (Text) import Data.HashSet qualified as HashSet -import GHC.Stats -import GHC.TypeLits import Lens.Micro.Platform as Lens import Network.Socket import Options.Applicative @@ -98,7 +101,6 @@ import System.Mem import System.Metrics import System.Posix.Process import System.Environment -import Text.InterpolatedString.Perl6 (qc) import UnliftIO.Exception qualified as U -- import UnliftIO.STM @@ -106,7 +108,6 @@ import UnliftIO.Async as U import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S -import Streaming qualified as S -- TODO: write-workers-to-config defStorageThreads :: Integral a => a @@ -123,23 +124,15 @@ defLocalMulticast = "239.192.152.145:10153" data PeerListenKey data PeerKeyFileKey -data PeerBlackListKey -data PeerWhiteListKey data PeerStorageKey -data PeerAcceptAnnounceKey +data PeerDebugKey data PeerTraceKey data PeerTrace1Key data PeerProxyFetchKey -data AcceptAnnounce = AcceptAnnounceAll - | AcceptAnnounceFrom (Set (PubKey 'Sign (Encryption L4Proto))) -instance Pretty AcceptAnnounce where - pretty = \case - AcceptAnnounceAll -> parens ("accept-announce" <+> "*") - - -- FIXME: better-pretty-for-AcceptAnnounceFrom - AcceptAnnounceFrom xs -> parens ("accept-announce" <+> pretty (fmap AsBase58 (Set.toList xs))) +instance HasCfgKey PeerDebugKey FeatureSwitch where + key = "debug" instance HasCfgKey PeerTraceKey FeatureSwitch where key = "trace" @@ -156,28 +149,9 @@ instance HasCfgKey PeerKeyFileKey (Maybe String) where instance HasCfgKey PeerStorageKey (Maybe String) where key = "storage" -instance HasCfgKey PeerBlackListKey (Set String) where - key = "blacklist" - -instance HasCfgKey PeerWhiteListKey (Set String) where - key = "whitelist" - instance HasCfgKey PeerProxyFetchKey (Set String) where key = "proxy-fetch-for" -instance HasCfgKey PeerAcceptAnnounceKey AcceptAnnounce where - key = "accept-block-announce" - -instance HasCfgValue PeerAcceptAnnounceKey AcceptAnnounce where - cfgValue (PeerConfig syn) = fromMaybe (AcceptAnnounceFrom lst) fromAll - where - fromAll = headMay [ AcceptAnnounceAll | ListVal @C (Key s [SymbolVal "*"]) <- syn, s == kk ] - lst = Set.fromList $ - catMaybes [ fromStringMay @(PubKey 'Sign (Encryption L4Proto)) (Text.unpack e) - | ListVal @C (Key s [LitStrVal e]) <- syn, s == kk - ] - kk = key @PeerAcceptAnnounceKey @AcceptAnnounce - data PeerOpts = PeerOpts @@ -192,27 +166,12 @@ data PeerOpts = makeLenses 'PeerOpts -tracePrefix :: SetLoggerEntry -tracePrefix = logPrefix "[trace] " - -debugPrefix :: SetLoggerEntry -debugPrefix = logPrefix "[debug] " - -errorPrefix :: SetLoggerEntry -errorPrefix = logPrefix "[error] " - -warnPrefix :: SetLoggerEntry -warnPrefix = logPrefix "[warn] " - -noticePrefix :: SetLoggerEntry -noticePrefix = logPrefix "[notice] " main :: IO () main = do sodiumInit - setLogging @DEBUG debugPrefix setLogging @INFO defLog setLogging @ERROR errorPrefix setLogging @WARN warnPrefix @@ -223,30 +182,50 @@ main = do withSimpleLogger runCLI -runCLI :: IO () -runCLI = join . customExecParser (prefs showHelpOnError) $ - info (helper <*> parser) - ( fullDesc - <> header "hbs2-peer daemon" - <> progDesc "serves HBS2 protocol" - ) - where - parser :: Parser (IO ()) - parser = hsubparser ( command "init" (info pInit (progDesc "creates default config")) - <> command "run" (info pRun (progDesc "run peer")) - <> command "poke" (info pPoke (progDesc "poke peer by rpc")) - <> command "die" (info pDie (progDesc "die cmd")) - <> command "announce" (info pAnnounce (progDesc "announce block")) - <> command "ping" (info pPing (progDesc "ping another peer")) - <> command "fetch" (info pFetch (progDesc "fetch block")) - <> command "reflog" (info pRefLog (progDesc "reflog commands")) - <> command "refchan" (info pRefChan (progDesc "refchan commands")) - <> command "peers" (info pPeers (progDesc "show known peers")) - <> command "pexinfo" (info pPexInfo (progDesc "show pex")) - <> command "log" (info pLog (progDesc "set logging level")) - <> command "dial" (info pDialog (progDesc "dialog commands")) - ) + +data GOpts = + GOpts + { goDebug :: Bool + , goTrace :: Bool + } + +runCLI :: IO () +runCLI = do + + (g, cmd) <- customExecParser (prefs showHelpOnError) $ + info (helper <*> parser) + ( fullDesc + <> header "hbs2-peer daemon" + <> progDesc "serves HBS2 protocol" + ) + + withOpts cmd g + + where + + + parser :: Parser (GOpts,IO ()) + parser = do + + (,) <$> ( GOpts <$> switch (long "debug" <> short 'd' <> help "debug mode on") + <*> switch (long "trace" <> help "trace on" ) + ) + <*> hsubparser ( command "init" (info pInit (progDesc "creates default config")) + <> command "run" (info pRun (progDesc "run peer")) + <> command "poke" (info pPoke (progDesc "poke peer by rpc")) + <> command "die" (info pDie (progDesc "die cmd")) + <> command "announce" (info pAnnounce (progDesc "announce block")) + <> command "ping" (info pPing (progDesc "ping another peer")) + <> command "fetch" (info pFetch (progDesc "fetch block")) + <> command "reflog" (info pRefLog (progDesc "reflog commands")) + <> command "refchan" (info pRefChan (progDesc "refchan commands")) + <> command "peers" (info pPeers (progDesc "show known peers")) + <> command "pexinfo" (info pPexInfo (progDesc "show pex")) + <> command "log" (info pLog (progDesc "set logging level")) + -- FIXME: bring-back-dialogue-over-separate-socket + -- <> command "dial" (info pDialog (progDesc "dialog commands")) + ) confOpt = strOption ( long "config" <> short 'c' <> help "config" ) @@ -272,6 +251,16 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ pure $ PeerOpts pref l r k c resp + withOpts m g = do + + when (goDebug g) do + setLogging @DEBUG ( debugPrefix . toStderr ) + + when (goTrace g) do + setLogging @TRACE ( tracePrefix . toStderr ) + + m + pRun = do runPeer <$> common @@ -281,34 +270,59 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ pDie = do rpc <- pRpcCommon - pure $ runRpcCommand rpc DIE + pure $ withRPC2 @UNIX rpc $ \caller -> do + l <- async $ void $ callService @RpcDie caller () + pause @'Seconds 0.25 + cancel l pPoke = do rpc <- pRpcCommon - pure $ runRpcCommand rpc POKE + pure $ withRPC2 @UNIX rpc $ \caller -> do + r <- callService @RpcPoke caller () + case r of + Left e -> err (viaShow e) + Right txt -> putStrLn txt pAnnounce = do rpc <- pRpcCommon h <- strArgument ( metavar "HASH" ) - pure $ runRpcCommand rpc (ANNOUNCE h) + pure $ withRPC2 @UNIX rpc $ \caller -> do + void $ callService @RpcAnnounce caller h pFetch = do rpc <- pRpcCommon h <- strArgument ( metavar "HASH" ) - pure $ runRpcCommand rpc (FETCH h) + pure $ withRPC2 @UNIX rpc $ \caller -> do + void $ callService @RpcFetch caller h pPing = do rpc <- pRpcCommon h <- strArgument ( metavar "ADDR" ) - pure $ runRpcCommand rpc (PING h Nothing) + pure $ withRPC2 @UNIX rpc $ \caller -> do + callService @RpcPing caller h >>= \case + Left e -> err (viaShow e) + Right True -> putStrLn "pong" + Right False -> putStrLn "pang" pPeers = do rpc <- pRpcCommon - pure $ runRpcCommand rpc PEERS + pure $ withRPC2 @UNIX rpc $ \caller -> do + r <- callService @RpcPeers caller () + case r of + Left e -> err (viaShow e) + Right p -> do + print $ vcat (fmap fmt p) + where + fmt (a, b) = pretty (AsBase58 a) <+> pretty b pPexInfo = do rpc <- pRpcCommon - pure $ runRpcCommand rpc PEXINFO + pure $ withRPC2 @UNIX rpc $ \caller -> do + r <- callService @RpcPexInfo caller () + case r of + Left e -> err (viaShow e) + Right p -> do + print $ vcat (fmap pretty p) onOff l = hsubparser ( command "on" (info (pure (l True) ) (progDesc "on") ) ) @@ -316,11 +330,12 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ pLog = do rpc <- pRpcCommon - setlog <- SETLOG <$> ( hsubparser ( command "trace" (info (onOff TraceOn) (progDesc "set trace") ) ) - <|> - hsubparser ( command "debug" (info (onOff DebugOn) (progDesc "set debug") ) ) - ) - pure $ runRpcCommand rpc setlog + setlog <- hsubparser ( command "trace" (info (onOff TraceOn) (progDesc "set trace") ) ) + <|> + hsubparser ( command "debug" (info (onOff DebugOn) (progDesc "set debug") ) ) + + pure $ withRPC2 @UNIX rpc $ \caller -> do + void $ callService @RpcLogLevel caller setlog pInit = do pref <- optional $ strArgument ( metavar "DIR" ) @@ -335,42 +350,38 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ pRefLogSend = do rpc <- pRpcCommon kr <- strOption (long "keyring" <> short 'k' <> help "reflog keyring" <> metavar "FILE") - pure $ do - setLogging @TRACE tracePrefix - trace "pRefLogSend" + pure $ withRPC2 @UNIX rpc $ \caller -> do s <- BS.readFile kr creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile s)) `orDie` "bad keyring file" bs <- BS.take defChunkSize <$> BS.hGetContents stdin let pubk = view peerSignPk creds let privk = view peerSignSk creds - msg <- makeRefLogUpdate @L4Proto pubk privk bs <&> serialise - runRpcCommand rpc (REFLOGUPDATE msg) + msg <- makeRefLogUpdate @L4Proto pubk privk bs + void $ callService @RpcRefLogPost caller msg pRefLogSendRaw = do rpc <- pRpcCommon - pure $ do - setLogging @TRACE tracePrefix - trace "pRefLogSendRaw" + pure $ withRPC2 @UNIX rpc $ \caller -> do bs <- LBS.take defChunkSize <$> LBS.hGetContents stdin - runRpcCommand rpc (REFLOGUPDATE bs) + msg <- pure (deserialiseOrFail @(RefLogUpdate L4Proto) bs) `orDie` "Invalid reflog transaction" + void $ callService @RpcRefLogPost caller msg pRefLogFetch = do rpc <- pRpcCommon ref <- strArgument ( metavar "REFLOG-KEY" ) - pure $ do + pure $ withRPC2 @UNIX rpc $ \caller -> do href <- pure (fromStringMay ref) `orDie` "invalid REFLOG-KEY" - setLogging @TRACE tracePrefix - trace "pRefLogFetch" - runRpcCommand rpc (REFLOGFETCH href) + void $ callService @RpcRefLogFetch caller href pRefLogGet = do rpc <- pRpcCommon ref <- strArgument ( metavar "REFLOG-KEY" ) - pure $ do + pure $ withRPC2 @UNIX rpc $ \caller -> do href <- pure (fromStringMay ref) `orDie` "invalid REFLOG-KEY" - setLogging @TRACE tracePrefix - runRpcCommand rpc (REFLOGGET href) - + callService @RpcRefLogGet caller href >>= \case + Left{} -> exitFailure + Right Nothing -> exitFailure + Right (Just h) -> print (pretty h) >> exitSuccess myException :: SomeException -> IO () myException e = err ( show e ) @@ -466,14 +477,13 @@ runPeer opts = Exception.handle (\e -> myException e liftIO $ print $ pretty conf let listenConf = cfgValue @PeerListenKey conf - let rpcConf = cfgValue @PeerRpcKey conf let keyConf = cfgValue @PeerKeyFileKey conf let storConf = cfgValue @PeerStorageKey conf <&> StoragePrefix let traceConf = cfgValue @PeerTraceKey conf :: FeatureSwitch + let debugConf = cfgValue @PeerDebugKey conf :: FeatureSwitch let trace1Conf = cfgValue @PeerTrace1Key conf :: FeatureSwitch let listenSa = view listenOn opts <|> listenConf <|> Just defListenUDP - let rpcSa = view listenRpc opts <|> rpcConf <|> Just defRpcUDP credFile <- pure (view peerCredFile opts <|> keyConf) `orDie` "credentials not set" let pref = view storage opts <|> storConf <|> Just xdg @@ -485,39 +495,19 @@ runPeer opts = Exception.handle (\e -> myException e when (traceConf == FeatureOn) do setLogging @TRACE tracePrefix + setLogging @DEBUG debugPrefix + + when (debugConf == FeatureOn) do + setLogging @DEBUG debugPrefix when (trace1Conf == FeatureOn) do setLogging @TRACE1 tracePrefix - let bls = cfgValue @PeerBlackListKey conf :: Set String - let whs = cfgValue @PeerWhiteListKey conf :: Set String - let toKeys xs = Set.fromList - $ catMaybes [ fromStringMay x | x <- Set.toList xs - ] - let blkeys = toKeys bls - let wlkeys = toKeys (whs `Set.difference` bls) + let helpFetchKeys = cfgValue @PeerProxyFetchKey conf & toKeys let useHttpDownload = cfgValue @PeerUseHttpDownload conf & (== FeatureOn) - let accptAnn = cfgValue @PeerAcceptAnnounceKey conf :: AcceptAnnounce - - liftIO $ print $ pretty accptAnn - - -- FIXME: move-peerBanned-somewhere - let peerBanned p pd = do - let k = view peerSignKey pd - let blacklisted = k `Set.member` blkeys - let whitelisted = Set.null wlkeys || (k `Set.member` wlkeys) - pure $ blacklisted || not whitelisted - - let acceptAnnounce p pd = do - case accptAnn of - AcceptAnnounceAll -> pure True - AcceptAnnounceFrom s -> pure $ view peerSignKey pd `Set.member` s - - rpcQ <- liftIO $ newTQueueIO @RPCCommand - let ps = mempty pc' <- liftIO $ LBS.readFile credFile @@ -547,11 +537,6 @@ runPeer opts = Exception.handle (\e -> myException e udp <- async $ runMessagingUDP mess - udp1 <- newMessagingUDP False rpcSa - `orDie` "Can't start RPC listener" - - mrpc <- async $ runMessagingUDP udp1 - mcast <- newMessagingUDPMulticast defLocalMulticast `orDie` "Can't start RPC listener" @@ -747,13 +732,13 @@ runPeer opts = Exception.handle (\e -> myException e unless (nonce == pnonce) $ do debug $ "Got peer announce!" <+> pretty pip mpd :: Maybe (PeerData e) <- find (KnownPeerKey pip) id - banned <- maybe (pure False) (peerBanned pip) mpd + banned <- maybe (pure False) (peerBanned conf) mpd let known = isJust mpd && not banned sendPing pip subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do pa <- toPeerAddr p - liftIO $ atomically $ writeTQueue rpcQ (CHECK no pa (view biHash bi)) + checkBlockAnnounce conf denv no pa (view biHash bi) subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent p pd) -> do @@ -768,7 +753,7 @@ runPeer opts = Exception.handle (\e -> myException e liftIO $ atomically $ writeTVar (view peerPingFailed pinfo) 0 liftIO $ atomically $ writeTVar (view peerLastWatched pinfo) now - banned <- peerBanned p pd + banned <- peerBanned conf pd let doAddPeer p = do addPeers pl [p] @@ -904,100 +889,6 @@ runPeer opts = Exception.handle (\e -> myException e peerThread "refChanWorker" (refChanWorker @e rce (SomeBrains brains)) - peerThread "ping pong" $ forever $ do - cmd <- liftIO $ atomically $ readTQueue rpcQ - case cmd of - POKE -> debug "on poke: alive and kicking!" - - PING pa r -> do - debug $ "ping" <+> pretty pa - pip <- fromPeerAddr @e pa - subscribe (ConcretePeerKey pip) $ \(ConcretePeerData _ pde) -> do - - maybe1 r (pure ()) $ \rpcPeer -> do - pinged <- toPeerAddr pip - request rpcPeer (RPCPong @e pinged) - -- case (view peerEncPubKey pde) of - -- Nothing -> unencrypted ping - -- Just pubkey -> encryptengd - - sendPing pip - - ANNOUNCE h -> do - debug $ "got announce rpc" <+> pretty h - sto <- getStorage - mbsize <- liftIO $ hasBlock sto h - - maybe1 mbsize (pure ()) $ \size -> do - debug "send multicast announce" - - no <- peerNonce @e - let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta size h - let announce = BlockAnnounce @e no annInfo - - request localMulticast announce - - liftIO $ withPeerM env do - forKnownPeers $ \p _ -> do - debug $ "send single-cast announces" <+> pretty p - request @e p announce - - CHECK nonce pa h -> do - pip <- fromPeerAddr @e pa - - n1 <- peerNonce @e - - unless (nonce == n1) do - - mpde <- find @e (KnownPeerKey pip) id - - debug $ "received announce from" - <+> pretty pip - <+> pretty h - - case mpde of - Nothing -> do - sendPing @e pip - -- TODO: enqueue-announce-from-unknown-peer? - - Just pd -> do - - banned <- peerBanned pip pd - - notAccepted <- acceptAnnounce pip pd <&> not - - if | banned -> do - - notice $ pretty pip <+> "banned" - - | notAccepted -> do - - debug $ pretty pip <+> "announce-not-accepted" - - | otherwise -> do - - downloadLogAppend @e h - withDownload denv $ do - processBlock h - - REFLOGUPDATE bs -> do - - trace "REFLOGUPDATE" - - let msg' = deserialiseOrFail @(RefLogUpdate L4Proto) bs - & either (const Nothing) Just - - when (isNothing msg') do - warn "unable to parse RefLogUpdate message" - - maybe1 msg' none $ \msg -> do - let pubk = view refLogId msg - emit @e RefLogUpdateEvKey (RefLogUpdateEvData (pubk, msg)) - RefLog.doRefLogBroadCast msg - - _ -> pure () - - peerThread "all protos" do runProto @e [ makeResponse (blockSizeProto blk dontHandle onNoBlock) @@ -1013,100 +904,14 @@ runPeer opts = Exception.handle (\e -> myException e , makeResponse (refChanUpdateProto False pc refChanAdapter) , makeResponse (refChanRequestProto False refChanAdapter) , makeResponse (refChanNotifyProto False refChanAdapter) - -- , makeResponse (dialReqProto dialReqProtoAdapter) ] void $ liftIO $ waitAnyCancel workers - let dieAction _ = do - liftIO $ die "received die command" - - let pokeAction _ = do - who <- thatPeer (Proxy @(RPC e)) - let k = view peerSignPk pc - let rpc = "rpc:" <+> dquotes (pretty (listenAddr udp1)) - let udp = "udp:" <+> dquotes (pretty (listenAddr mess)) - - let http = case cfgValue @PeerHttpPortKey conf :: Maybe Integer of - Nothing -> mempty - Just p -> "http-port:" <+> pretty p - - let answ = show $ vcat [ "peer-key:" <+> dquotes (pretty (AsBase58 k)) - , rpc - , udp - , http - ] - - -- FIXME: to-delete-POKE - liftIO $ atomically $ writeTQueue rpcQ POKE - request who (RPCPokeAnswerFull @e (Text.pack answ)) - - let annAction h = do - liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h) - - let pingAction pa = do - that <- thatPeer (Proxy @(RPC e)) - liftIO $ atomically $ writeTQueue rpcQ (PING pa (Just that)) - - let fetchAction h = do - debug $ "fetchAction" <+> pretty h - liftIO $ withPeerM penv $ do - downloadLogAppend @e h - withDownload denv (processBlock h) - - let peersAction _ = do - who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do - forKnownPeers @e $ \p pde -> do - pa <- toPeerAddr p - let k = view peerSignKey pde - request who (RPCPeersAnswer @e pa k) - - let pexInfoAction :: RPC L4Proto -> ResponseM L4Proto (RpcM (ResourceT IO)) () - pexInfoAction _ = do - who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do - -- FIXME: filter-pexinfo-entries - ps <- getAllPex2Peers - request who (RPCPexInfoAnswer @e ps) - - let logLevelAction = \case - DebugOn True -> do - setLogging @DEBUG debugPrefix - debug "DebugOn" - - DebugOn False -> do - debug "DebugOff" - setLoggingOff @DEBUG - - TraceOn True -> do - setLogging @TRACE tracePrefix - trace "TraceOn" - - TraceOn False -> do - trace "TraceOff" - setLoggingOff @TRACE - - let reflogUpdateAction bs = void $ runMaybeT do - liftIO $ atomically $ writeTQueue rpcQ (REFLOGUPDATE bs) - -- trace $ "reflogUpdateAction" - -- - let reflogFetchAction puk = do - trace "reflogFetchAction" - void $ liftIO $ async $ withPeerM penv $ do - broadCastMessage (RefLogRequest @e puk) - - let reflogGetAction puk = do - trace $ "reflogGetAction" <+> pretty (AsBase58 puk) - who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do - sto <- getStorage - h <- liftIO $ getRef sto (RefLogKey @(Encryption e) puk) - request who (RPCRefLogGetAnswer @e h) - - let refChanHeadSendAction h = do - trace $ "refChanHeadSendAction" <+> pretty h - void $ liftIO $ async $ withPeerM penv $ do + let refChanHeadPostAction href = do + void $ liftIO $ withPeerM penv $ do + let h = fromHashRef href + debug $ "rpc2.refChanHeadPost" <+> pretty h me <- ownPeer @e sto <- getStorage @@ -1119,115 +924,28 @@ runPeer opts = Exception.handle (\e -> myException e let box = deserialiseOrFail @(SignedBox (RefChanHeadBlock e) e) (LBS.concat chunks) case box of + -- FIXME: proper-error-handling Left{} -> err $ "can't read head block" <+> pretty h Right (SignedBox k _ _) -> do let msg = RefChanHead k (RefChanHeadBlockTran (HashRef h)) refChanNotifyOnUpdated rce k runResponseM me $ refChanHeadProto @e True refChanAdapter msg - let refChanHeadGetAction puk = do - trace $ "refChanHeadGetAction" <+> pretty (AsBase58 puk) - who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do - sto <- getStorage - h <- liftIO $ getRef sto (RefChanHeadKey @(Encryption e) puk) - request who (RPCRefChanHeadGetAnsw @e h) - - let refChanHeadFetchAction puk = do - trace "reChanFetchAction" - void $ liftIO $ async $ withPeerM penv $ do - broadCastMessage (RefChanGetHead @e puk) - - let refChanProposeAction (puk, lbs) = do - trace "reChanProposeAction" - void $ liftIO $ async $ withPeerM penv $ do + let refChanProposeAction (puk, box) = do + debug $ "rpc2.reChanPropose" <+> pretty (AsBase58 puk) + void $ liftIO $ withPeerM penv $ do me <- ownPeer @e runMaybeT do - box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just proposed <- MaybeT $ makeProposeTran @e pc puk box - - -- debug $ "PROPOSAL:" <+> pretty (LBS.length (serialise proposed)) - -- lift $ broadCastMessage (Propose @e puk proposed) - - -- FIXME: remove-this-debug-stuff - -- или оставить? нода будет сама себе - -- консенсус слать тогда. может, и оставить lift $ runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed) - let refChanNotifyAction (puk, lbs) = do - trace "refChanNotifyAction" - void $ liftIO $ async $ withPeerM penv $ do + -- NOTE: moved-to-rpc2 + let refChanNotifyAction (puk, box) = do + void $ liftIO $ withPeerM penv $ do me <- ownPeer @e runMaybeT do - box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (Notify @e puk box) - let refChanGetAction puk = do - trace $ "refChanGetAction" <+> pretty (AsBase58 puk) - who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do - sto <- getStorage - h <- liftIO $ getRef sto (RefChanLogKey @(Encryption e) puk) - trace $ "refChanGetAction ANSWER IS" <+> pretty h - request who (RPCRefChanGetAnsw @e h) - - let refChanFetchAction puk = do - trace $ "refChanFetchAction" <+> pretty (AsBase58 puk) - void $ liftIO $ async $ withPeerM penv $ do - gossip (RefChanRequest @e puk) - - let arpc = RpcAdapter - { rpcOnPoke = pokeAction - , rpcOnDie = dieAction - , rpcOnPokeAnswer = dontHandle - , rpcOnPokeAnswerFull = dontHandle - , rpcOnAnnounce = annAction - , rpcOnPing = pingAction - , rpcOnPong = dontHandle - , rpcOnFetch = fetchAction - , rpcOnPeers = peersAction - , rpcOnPeersAnswer = dontHandle - , rpcOnPexInfo = pexInfoAction - , rpcOnPexInfoAnswer = dontHandle - , rpcOnLogLevel = logLevelAction - , rpcOnRefLogUpdate = reflogUpdateAction - , rpcOnRefLogFetch = reflogFetchAction - , rpcOnRefLogGet = reflogGetAction - , rpcOnRefLogGetAnsw = dontHandle - - , rpcOnRefChanHeadSend = refChanHeadSendAction - , rpcOnRefChanHeadGet = refChanHeadGetAction - , rpcOnRefChanHeadGetAnsw = dontHandle - , rpcOnRefChanHeadFetch = refChanHeadFetchAction - - , rpcOnRefChanFetch = refChanFetchAction - , rpcOnRefChanGet = refChanGetAction - , rpcOnRefChanGetAnsw = dontHandle -- rpcOnRefChanGetAnsw - - , rpcOnRefChanPropose = refChanProposeAction - , rpcOnRefChanNotify = refChanNotifyAction - } - - dialReqProtoAdapter <- do - let denv = DialEnv - - let dialReqProtoAdapterDApp = drpcFullDApp denv penv - - -- dialReqProtoAdapterNT :: ResponseM L4Proto (RpcM (ResourceT IO)) a -> IO a - dialReqProtoAdapterNT :: Peer e -> forall a . ResponseM L4Proto (RpcM (ResourceT IO)) a -> IO a - dialReqProtoAdapterNT = \peer -> - runResourceT - . runRPC udp1 - . runResponseM peer - - pure DialReqProtoAdapter {..} - - rpc <- async $ runRPC udp1 do - runProto @e - [ makeResponse (rpcHandler arpc) - , makeResponse (dialReqProto dialReqProtoAdapter) - ] - menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) ann <- liftIO $ async $ runPeerM menv $ do @@ -1237,7 +955,7 @@ runPeer opts = Exception.handle (\e -> myException e subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do unless (p == self) do pa <- toPeerAddr p - liftIO $ atomically $ writeTQueue rpcQ (CHECK no pa (view biHash bi)) + checkBlockAnnounce conf denv no pa (view biHash bi) subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent pip nonce) -> do -- debug $ "Got peer announce!" <+> pretty pip @@ -1248,7 +966,34 @@ runPeer opts = Exception.handle (\e -> myException e , makeResponse peerAnnounceProto ] - void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread] + + let k = view peerSignPk pc + + let http = case cfgValue @PeerHttpPortKey conf :: Maybe Integer of + Nothing -> mempty + Just p -> "http-port:" <+> pretty p + + let pokeAnsw = show $ vcat [ "peer-key:" <+> dquotes (pretty (AsBase58 k)) + , "udp:" <+> dquotes (pretty (listenAddr mess)) + , "local-multicast:" <+> dquotes (pretty localMulticast) + , http + ] + + let rpc2ctx = RPC2Context { rpcConfig = fromPeerConfig conf + , rpcPokeAnswer = pokeAnsw + , rpcPeerEnv = penv + , rpcDownloadEnv = denv + , rpcLocalMultiCast = localMulticast + , rpcStorage = AnyStorage s + , rpcDoRefChanHeadPost = refChanHeadPostAction + , rpcDoRefChanPropose = refChanProposeAction + , rpcDoRefChanNotify = refChanNotifyAction + } + + rpc2 <- async (runReaderT RPC2.runService rpc2ctx) + link rpc2 + + void $ waitAnyCancel $ w <> [udp,loop,rpc2,ann,messMcast,brainsThread] liftIO $ simpleStorageStop s diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index f6f6237c..94798283 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -56,10 +56,12 @@ import Data.IntSet (IntSet) import Data.Text qualified as Text import Data.Text.Encoding qualified as TE import Data.Heap qualified as Heap -import Data.Heap (Heap,Entry(..)) +import Data.Heap (Entry(..)) import Data.Time.Clock import Data.Word import Data.List qualified as List +import Data.Set qualified as Set +import Data.Set (Set) import UnliftIO.STM @@ -450,6 +452,31 @@ mkPeerMeta conf penv = do where elem k = W.tell . L.singleton . (k ,) +pingPeerWait :: forall e m . ( MonadIO m + , Request e (PeerHandshake e) m + , Sessions e (PeerHandshake e) m + , HasNonces (PeerHandshake e) m + , EventListener L4Proto (ConcretePeer L4Proto) m + , Pretty (Peer e) + , e ~ L4Proto + ) + => PeerAddr e + -> m Bool + +pingPeerWait pa = do + pip <- fromPeerAddr @e pa + + w <- newTQueueIO + + subscribe (ConcretePeerKey pip) $ \(ConcretePeerData _ _) -> do + atomically $ writeTQueue w () + + sendPing @e pip + + r <- liftIO $ race (pause @'Seconds 1) (void $ atomically $ readTQueue w) + + either (const $ pure False) (const $ pure True) r + -- FIXME: slow-deep-scan-exception-seems-not-working checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool @@ -526,6 +553,12 @@ instance (ForGossip e p (ResponseM e m), HasGossip e p m) => HasGossip e p (Resp request @e pip msg +toKeys :: (Ord a, FromStringMaybe a) => Set String -> Set a +toKeys xs = Set.fromList + $ catMaybes [ fromStringMay x | x <- Set.toList xs + ] + + simpleBlockAnnounce :: forall e m . ( Monad m , HasPeerNonce e m ) diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs deleted file mode 100644 index 12f806f9..00000000 --- a/hbs2-peer/app/RPC.hs +++ /dev/null @@ -1,451 +0,0 @@ -{-# Language TemplateHaskell #-} -{-# Language UndecidableInstances #-} -module RPC where - - -import HBS2.Actors.Peer -import HBS2.Base58 -import HBS2.Clock -import HBS2.Hash -import HBS2.Net.Auth.Credentials -import HBS2.Net.IP.Addr -import HBS2.Net.Messaging.UDP -import HBS2.Net.Proto -import HBS2.Net.Proto.Definition() -import HBS2.OrDie -import HBS2.Prelude.Plated -import HBS2.System.Logger.Simple hiding (info) -import HBS2.System.Logger.Simple qualified as Log - -import PeerConfig - - -import Control.Monad.IO.Unlift -import Codec.Serialise (serialise,deserialiseOrFail) -import Control.Applicative -import Control.Concurrent.STM -import Control.Concurrent.STM.TQueue -import Control.Monad.Reader -import Control.Monad.Trans.Resource -import Data.ByteString.Lazy (ByteString) -import Data.Function -import Data.Functor -import Data.List qualified as L -import Lens.Micro.Platform -import Network.Socket -import System.Exit -import System.IO -import UnliftIO.Async as U -import Control.Concurrent.MVar - -data PeerRpcKey - -instance HasCfgKey PeerRpcKey (Maybe String) where - key = "rpc" - -data SetLogging = - DebugOn Bool - | TraceOn Bool - deriving (Generic,Eq,Show) - -instance Serialise SetLogging - -data RPCCommand = - DIE - | POKE - | ANNOUNCE (Hash HbSync) - | PING (PeerAddr L4Proto) (Maybe (Peer L4Proto)) - | CHECK PeerNonce (PeerAddr L4Proto) (Hash HbSync) - | FETCH (Hash HbSync) - | PEERS - | PEXINFO - | SETLOG SetLogging - | REFLOGUPDATE ByteString - | REFLOGFETCH (PubKey 'Sign (Encryption L4Proto)) - | REFLOGGET (PubKey 'Sign (Encryption L4Proto)) - | REFCHANHEADSEND (Hash HbSync) - | REFCHANHEADGET (PubKey 'Sign (Encryption L4Proto)) - | REFCHANHEADFETCH (PubKey 'Sign (Encryption L4Proto)) - | REFCHANFETCH (PubKey 'Sign (Encryption L4Proto)) - | REFCHANGET (PubKey 'Sign (Encryption L4Proto)) - | REFCHANPROPOSE (PubKey 'Sign (Encryption L4Proto), ByteString) - | REFCHANNOTIFY (PubKey 'Sign (Encryption L4Proto), ByteString) - -data RPC e = - RPCDie - | RPCPoke - | RPCPing (PeerAddr e) - | RPCPong (PeerAddr e) - | RPCPokeAnswer (PubKey 'Sign (Encryption e)) - | RPCPokeAnswerFull Text - | RPCAnnounce (Hash HbSync) - | RPCFetch (Hash HbSync) - | RPCPeers - | RPCPeersAnswer (PeerAddr e) (PubKey 'Sign (Encryption e)) - | RPCPexInfo - | RPCPexInfoAnswer [PeerAddr L4Proto] - | RPCLogLevel SetLogging - | RPCRefLogUpdate ByteString - | RPCRefLogFetch (PubKey 'Sign (Encryption e)) - | RPCRefLogGet (PubKey 'Sign (Encryption e)) - | RPCRefLogGetAnswer (Maybe (Hash HbSync)) - - | RPCRefChanHeadSend (Hash HbSync) - | RPCRefChanHeadGet (PubKey 'Sign (Encryption e)) - | RPCRefChanHeadGetAnsw (Maybe (Hash HbSync)) - | RPCRefChanHeadFetch (PubKey 'Sign (Encryption e)) - - | RPCRefChanFetch (PubKey 'Sign (Encryption e)) - | RPCRefChanGet (PubKey 'Sign (Encryption e)) - | RPCRefChanGetAnsw (Maybe (Hash HbSync)) - - | RPCRefChanPropose (PubKey 'Sign (Encryption e), ByteString) - | RPCRefChanNotify (PubKey 'Sign (Encryption e), ByteString) - - deriving stock (Generic) - -deriving instance - ( Show (PubKey 'Sign (Encryption e)) - , Show (PeerAddr e) - ) => Show (RPC e) - -instance (Serialise (PeerAddr e), Serialise (PubKey 'Sign (Encryption e))) => Serialise (RPC e) - -instance HasProtocol L4Proto (RPC L4Proto) where - type instance ProtocolId (RPC L4Proto) = 0xFFFFFFE0 - type instance Encoded L4Proto = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - - -data RPCEnv = - RPCEnv - { _rpcSelf :: Peer L4Proto - , _rpcFab :: Fabriq L4Proto - } - -makeLenses 'RPCEnv - -data RpcAdapter e m = - RpcAdapter - { rpcOnPoke :: RPC e -> m () - , rpcOnDie :: RPC e -> m () - , rpcOnPokeAnswer :: PubKey 'Sign (Encryption e) -> m () - , rpcOnPokeAnswerFull :: Text -> m () - , rpcOnAnnounce :: Hash HbSync -> m () - , rpcOnPing :: PeerAddr e -> m () - , rpcOnPong :: PeerAddr e -> m () - , rpcOnFetch :: Hash HbSync -> m () - , rpcOnPeers :: RPC e -> m () - , rpcOnPeersAnswer :: (PeerAddr e, PubKey 'Sign (Encryption e)) -> m () - , rpcOnPexInfo :: RPC e -> m () - , rpcOnPexInfoAnswer :: [PeerAddr L4Proto] -> m () - , rpcOnLogLevel :: SetLogging -> m () - , rpcOnRefLogUpdate :: ByteString -> m () - , rpcOnRefLogFetch :: PubKey 'Sign (Encryption e) -> m () - , rpcOnRefLogGet :: PubKey 'Sign (Encryption e) -> m () - , rpcOnRefLogGetAnsw :: Maybe (Hash HbSync) -> m () - - , rpcOnRefChanHeadSend :: Hash HbSync -> m () - , rpcOnRefChanHeadGet :: PubKey 'Sign (Encryption e) -> m () - , rpcOnRefChanHeadGetAnsw :: Maybe (Hash HbSync) -> m () - , rpcOnRefChanHeadFetch :: PubKey 'Sign (Encryption e) -> m () - - -- refchan commands - , rpcOnRefChanFetch :: PubKey 'Sign (Encryption e) -> m () - , rpcOnRefChanGet :: PubKey 'Sign (Encryption e) -> m () - , rpcOnRefChanGetAnsw :: Maybe (Hash HbSync) -> m () - - , rpcOnRefChanPropose :: (PubKey 'Sign (Encryption e), ByteString) -> m () - , rpcOnRefChanNotify :: (PubKey 'Sign (Encryption e), ByteString) -> m () - } - -newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader RPCEnv - , MonadTrans - , MonadUnliftIO - ) - -runRPC :: ( MonadIO m - , PeerMessaging L4Proto - ) - => MessagingUDP -> RpcM m a -> m a - -runRPC udp m = runReaderT (fromRpcM m) (RPCEnv pip (Fabriq udp)) - where - pip = getOwnPeer udp - -continueWithRPC :: RPCEnv -> RpcM m a -> m a -continueWithRPC e m = runReaderT (fromRpcM m) e - -instance Monad m => HasFabriq L4Proto (RpcM m) where - getFabriq = asks (view rpcFab) - -instance Monad m => HasOwnPeer L4Proto (RpcM m) where - ownPeer = asks (view rpcSelf) - -instance (Monad m, HasProtocol L4Proto p) => HasTimeLimits L4Proto p (RpcM m) where - tryLockForPeriod _ _ = pure True - -rpcHandler :: forall e m . ( MonadIO m - , Response e (RPC e) m - , HasProtocol e (RPC e) - , IsPeerAddr e m - ) - => RpcAdapter e m -> RPC e -> m () - -rpcHandler adapter = \case - p@RPCDie{} -> rpcOnDie adapter p - p@RPCPoke{} -> rpcOnPoke adapter p - (RPCPokeAnswer k) -> rpcOnPokeAnswer adapter k - (RPCPokeAnswerFull k) -> rpcOnPokeAnswerFull adapter k - (RPCAnnounce h) -> rpcOnAnnounce adapter h - (RPCPing pa) -> rpcOnPing adapter pa - (RPCPong pa) -> rpcOnPong adapter pa - (RPCFetch h) -> rpcOnFetch adapter h - p@RPCPeers{} -> rpcOnPeers adapter p - (RPCPeersAnswer pa k) -> rpcOnPeersAnswer adapter (pa,k) - p@RPCPexInfo{} -> rpcOnPexInfo adapter p - (RPCPexInfoAnswer pa) -> rpcOnPexInfoAnswer adapter pa - (RPCLogLevel l) -> rpcOnLogLevel adapter l - (RPCRefLogUpdate bs) -> rpcOnRefLogUpdate adapter bs - (RPCRefLogFetch e) -> rpcOnRefLogFetch adapter e - (RPCRefLogGet e) -> rpcOnRefLogGet adapter e - (RPCRefLogGetAnswer s) -> rpcOnRefLogGetAnsw adapter s - (RPCRefChanHeadSend s) -> rpcOnRefChanHeadSend adapter s - - (RPCRefChanHeadGet s) -> rpcOnRefChanHeadGet adapter s - (RPCRefChanHeadGetAnsw s) -> rpcOnRefChanHeadGetAnsw adapter s - (RPCRefChanHeadFetch s) -> rpcOnRefChanHeadFetch adapter s - - (RPCRefChanGet s) -> rpcOnRefChanGet adapter s - (RPCRefChanGetAnsw s) -> rpcOnRefChanGetAnsw adapter s - (RPCRefChanFetch s) -> rpcOnRefChanFetch adapter s - - (RPCRefChanPropose s) -> rpcOnRefChanPropose adapter s - (RPCRefChanNotify s) -> rpcOnRefChanNotify adapter s - -data RPCOpt = - RPCOpt - { _rpcOptConf :: Maybe FilePath - , _rpcOptAddr :: Maybe String - } - -makeLenses 'RPCOpt - - -runRpcCommand :: FromStringMaybe (IPAddrPort L4Proto) => RPCOpt -> RPCCommand -> IO () -runRpcCommand opt = \case - DIE -> withRPC opt RPCDie - POKE -> withRPC opt RPCPoke - PING s _ -> withRPC opt (RPCPing s) - ANNOUNCE h -> withRPC opt (RPCAnnounce h) - FETCH h -> withRPC opt (RPCFetch h) - PEERS -> withRPC opt RPCPeers - - PEXINFO -> withRPC opt RPCPexInfo - - SETLOG s -> withRPC opt (RPCLogLevel s) - REFLOGUPDATE bs -> withRPC opt (RPCRefLogUpdate bs) - REFLOGFETCH k -> withRPC opt (RPCRefLogFetch k) - REFLOGGET k -> withRPC opt (RPCRefLogGet k) - - REFCHANHEADSEND h -> withRPC opt (RPCRefChanHeadSend h) - REFCHANHEADGET s -> withRPC opt (RPCRefChanHeadGet s) - REFCHANHEADFETCH s -> withRPC opt (RPCRefChanHeadFetch s) - - REFCHANGET s -> withRPC opt (RPCRefChanGet s) - REFCHANFETCH s -> withRPC opt (RPCRefChanFetch s) - - REFCHANPROPOSE s -> withRPC opt (RPCRefChanPropose s) - REFCHANNOTIFY s -> withRPC opt (RPCRefChanNotify s) - - _ -> pure () - - -withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () -withRPC o cmd = rpcClientMain o $ runResourceT do - - liftIO $ hSetBuffering stdout LineBuffering - - conf <- peerConfigRead (view rpcOptConf o) - - let rpcConf = cfgValue @PeerRpcKey conf :: Maybe String - - saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set" - - as <- liftIO $ parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) - let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as - - rpc <- pure rpc' `orDie` "Can't parse RPC endpoint" - - udp1 <- newMessagingUDP False Nothing `orDie` "Can't start RPC" - - mrpc <- async $ runMessagingUDP udp1 - - pingQ <- liftIO newTQueueIO - - pokeQ <- liftIO newTQueueIO - - pokeFQ <- liftIO newTQueueIO - - refQ <- liftIO newTQueueIO - - rchanheadMVar <- liftIO newEmptyMVar - - rchangetMVar <- liftIO newEmptyMVar - - let adapter = RpcAdapter - { rpcOnPoke = dontHandle - , rpcOnDie = dontHandle - , rpcOnPokeAnswer = (liftIO . atomically . writeTQueue pokeQ) - , rpcOnPokeAnswerFull = (liftIO . atomically . writeTQueue pokeFQ) - , rpcOnAnnounce = (const $ liftIO exitSuccess) - , rpcOnPing = (const $ notice "ping?") - , rpcOnPong = (liftIO . atomically . writeTQueue pingQ) - , rpcOnFetch = dontHandle - , rpcOnPeers = dontHandle - , rpcOnPeersAnswer = (\(pa, k) -> Log.info $ pretty (AsBase58 k) <+> pretty pa) - , rpcOnPexInfo = dontHandle - , rpcOnPexInfoAnswer = (\ps -> mapM_ (Log.info . pretty) ps) - , rpcOnLogLevel = dontHandle - , rpcOnRefLogUpdate = dontHandle - , rpcOnRefLogFetch = dontHandle - , rpcOnRefLogGet = dontHandle - , rpcOnRefLogGetAnsw = ( liftIO . atomically . writeTQueue refQ ) - - , rpcOnRefChanHeadSend = dontHandle - , rpcOnRefChanHeadGet = dontHandle - , rpcOnRefChanHeadGetAnsw = (liftIO . putMVar rchanheadMVar) - , rpcOnRefChanHeadFetch = dontHandle - - , rpcOnRefChanFetch = dontHandle - , rpcOnRefChanGet = dontHandle - , rpcOnRefChanGetAnsw = (liftIO . putMVar rchangetMVar) - - , rpcOnRefChanPropose = dontHandle - - , rpcOnRefChanNotify = dontHandle - } - - prpc <- async $ runRPC udp1 do - env <- ask - proto <- liftIO $ async $ continueWithRPC env $ do - runProto @L4Proto - [ makeResponse (rpcHandler adapter) - ] - - request rpc cmd - - case cmd of - RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess - - RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess - - RPCPing{} -> do - void $ liftIO $ void $ race (pause @'Seconds 5 >> exitFailure) do - pa <- liftIO $ atomically $ readTQueue pingQ - Log.info $ "pong from" <+> pretty pa - exitSuccess - - - RPCDie{} -> do - pause @'Seconds 0.25 - liftIO exitSuccess - - RPCPoke{} -> do - let onTimeout = do pause @'Seconds 1.5 - Log.info "no-one-is-here" - exitFailure - - void $ liftIO $ race onTimeout do - k <- liftIO $ atomically $ readTQueue pokeFQ - print (pretty k) - hFlush stdout - exitSuccess - - RPCPeers{} -> liftIO do - pause @'Seconds 1 - exitSuccess - - RPCPexInfo{} -> liftIO do - pause @'Seconds 1 - exitSuccess - - RPCLogLevel{} -> liftIO exitSuccess - - RPCRefLogUpdate{} -> liftIO do - pause @'Seconds 0.1 - exitSuccess - - RPCRefLogFetch {} -> liftIO do - pause @'Seconds 0.5 - exitSuccess - - RPCRefLogGet{} -> liftIO do - void $ liftIO $ race (pause @'Seconds 0.1 >> exitFailure) do - k <- liftIO $ atomically $ readTQueue refQ - case k of - Nothing -> exitFailure - Just re -> do - print $ pretty re - hFlush stdout - exitSuccess - - RPCRefChanHeadSend {} -> liftIO do - pause @'Seconds 0.25 - exitSuccess - - RPCRefChanHeadGet {} -> liftIO do - - r <- race (pause @'Seconds 2) do - withMVar rchanheadMVar $ \v -> do - pure v - - case r of - Right (Just x) -> print (pretty x) >> exitSuccess - - _ -> exitFailure - - RPCRefChanHeadFetch {} -> liftIO do - pause @'Seconds 0.25 - exitSuccess - - RPCRefChanFetch {} -> liftIO do - pause @'Seconds 0.25 - exitSuccess - - RPCRefChanGet {} -> liftIO do - r <- race (pause @'Seconds 2) do - withMVar rchangetMVar $ \v -> do - pure v - - case r of - Right (Just x) -> print (pretty x) >> exitSuccess - - _ -> exitFailure - - RPCRefChanPropose{} -> liftIO do - pause @'Seconds 0.25 - exitSuccess - - RPCRefChanNotify{} -> liftIO do - pause @'Seconds 0.25 - exitSuccess - - _ -> pure () - - void $ liftIO $ waitAnyCancel [proto] - - void $ waitAnyCancel [mrpc, prpc] - - -rpcClientMain :: RPCOpt -> IO () -> IO () -rpcClientMain opt action = do - setLoggingOff @DEBUG - action - diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs new file mode 100644 index 00000000..ca6b4f22 --- /dev/null +++ b/hbs2-peer/app/RPC2.hs @@ -0,0 +1,60 @@ +{-# Language TemplateHaskell #-} +module RPC2 where + +import HBS2.Prelude +import HBS2.Clock +import HBS2.Net.Proto.Service +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto.Types + +import HBS2.System.Logger.Simple + +import Data.Config.Suckless.KeyValue() + +import RPC2.Service.Unix as RPC2 +import RPC2.API +import PeerConfig + +import Data.Maybe +import Control.Applicative +import Lens.Micro.Platform +import Control.Monad.Reader +import UnliftIO + +data RPCOpt = + RPCOpt + { _rpcOptConf :: Maybe FilePath + , _rpcOptAddr :: Maybe String + } + +makeLenses 'RPCOpt + + +withRPC2 :: forall e m . (e ~ UNIX, HasProtocol e (ServiceProto RPC2 e), MonadUnliftIO m) + => RPCOpt + -> ( ServiceCaller RPC2 e -> m () ) + -> m () + +withRPC2 o action = do + conf <- peerConfigRead (view rpcOptConf o) + soConf <- runReaderT RPC2.getSocketName conf + let soOpt = view rpcOptAddr o + let soname = fromJust $ soOpt <|> Just soConf + + debug $ "withRPC2" <+> pretty soname + + client1 <- newMessagingUnix False 1.0 soname + + m1 <- async $ runMessagingUnix client1 + -- link m1 + + caller <- makeServiceCaller @RPC2 @UNIX (fromString soname) + p2 <- liftIO $ async $ runReaderT (runServiceClient @RPC2 @UNIX caller) client1 + + action caller + + pause @'Seconds 0.05 + cancel p2 + + void $ waitAnyCatchCancel [m1, p2] + diff --git a/hbs2-peer/app/RPC2/API.hs b/hbs2-peer/app/RPC2/API.hs new file mode 100644 index 00000000..f402a17f --- /dev/null +++ b/hbs2-peer/app/RPC2/API.hs @@ -0,0 +1,48 @@ +module RPC2.API + ( module RPC2.API + , module RPC2.Poke + , module RPC2.Ping + , module RPC2.Peers + , module RPC2.PexInfo + , module RPC2.Announce + , module RPC2.Fetch + , module RPC2.Die + , module RPC2.LogLevel + , module RPC2.RefLog + , module RPC2.RefChan + , module RPC2.Types + ) where + +import RPC2.Announce +import RPC2.Die +import RPC2.Fetch +import RPC2.Poke +import RPC2.Ping +import RPC2.Peers +import RPC2.PexInfo +import RPC2.LogLevel +import RPC2.RefLog +import RPC2.RefChan +import RPC2.Types + +type RPC2 = '[ RpcPoke + , RpcPing + , RpcAnnounce + , RpcFetch + , RpcPeers + , RpcPexInfo + , RpcRefLogGet + , RpcRefLogFetch + , RpcRefLogPost + , RpcRefChanHeadGet + , RpcRefChanHeadFetch + , RpcRefChanHeadPost + , RpcRefChanGet + , RpcRefChanFetch + , RpcRefChanPropose + , RpcRefChanNotify + , RpcLogLevel + , RpcDie + ] + + diff --git a/hbs2-peer/app/RPC2/Announce.hs b/hbs2-peer/app/RPC2/Announce.hs new file mode 100644 index 00000000..0a383eb9 --- /dev/null +++ b/hbs2-peer/app/RPC2/Announce.hs @@ -0,0 +1,24 @@ +module RPC2.Announce where + +import HBS2.Prelude.Plated +import HBS2.Data.Types.Refs (HashRef(..)) +import HBS2.Net.Proto.Service + +import HBS2.System.Logger.Simple + +import SendBlockAnnounce +import RPC2.Types + +data RpcAnnounce + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcAnnounce where + type instance Input RpcAnnounce = HashRef + type instance Output RpcAnnounce = () + + handleMethod href = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.announce:" <+> pretty href + sendBlockAnnounce (rpcPeerEnv co) (rpcLocalMultiCast co) (fromHashRef href) + + + diff --git a/hbs2-peer/app/RPC2/Die.hs b/hbs2-peer/app/RPC2/Die.hs new file mode 100644 index 00000000..1b228032 --- /dev/null +++ b/hbs2-peer/app/RPC2/Die.hs @@ -0,0 +1,27 @@ +module RPC2.Die where + +import HBS2.Prelude.Plated +import HBS2.Clock +import HBS2.Net.Proto.Service + +import HBS2.System.Logger.Simple +import Data.Config.Suckless.KeyValue + +import RPC2.Types +import System.Exit qualified as Exit +import Control.Concurrent.Async + +data RpcDie + +instance (MonadIO m) => HandleMethod m RpcDie where + type instance Input RpcDie = () + type instance Output RpcDie = () + + handleMethod _ = do + debug $ "rpc2.die: exiting" + void $ liftIO $ do + w <- async $ pause @'Seconds 0.5 >> Exit.exitSuccess + link w + + + diff --git a/hbs2-peer/app/RPC2/Fetch.hs b/hbs2-peer/app/RPC2/Fetch.hs new file mode 100644 index 00000000..9b0276ab --- /dev/null +++ b/hbs2-peer/app/RPC2/Fetch.hs @@ -0,0 +1,24 @@ +module RPC2.Fetch where + +import HBS2.Prelude.Plated +import HBS2.Data.Types.Refs (HashRef(..)) +import HBS2.Net.Proto.Service + +import HBS2.System.Logger.Simple + +import Fetch +import RPC2.Types + +data RpcFetch + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcFetch where + type instance Input RpcFetch = HashRef + type instance Output RpcFetch = () + + handleMethod href = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.fetch:" <+> pretty href + fetch (rpcPeerEnv co) (rpcDownloadEnv co) href + + + diff --git a/hbs2-peer/app/RPC2/LogLevel.hs b/hbs2-peer/app/RPC2/LogLevel.hs new file mode 100644 index 00000000..331a749c --- /dev/null +++ b/hbs2-peer/app/RPC2/LogLevel.hs @@ -0,0 +1,43 @@ +module RPC2.LogLevel where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Service + +import Log + +import HBS2.System.Logger.Simple +import Codec.Serialise + +data RpcLogLevel + +data SetLogging = + DebugOn Bool + | TraceOn Bool + deriving (Generic,Eq,Show) + +instance Serialise SetLogging + + +instance (MonadIO m) => HandleMethod m RpcLogLevel where + type instance Input RpcLogLevel = SetLogging + type instance Output RpcLogLevel = () + + handleMethod = \case + DebugOn True -> do + setLogging @DEBUG debugPrefix + debug "DebugOn" + + DebugOn False -> do + debug "DebugOff" + setLoggingOff @DEBUG + + TraceOn True -> do + setLogging @TRACE tracePrefix + trace "TraceOn" + + TraceOn False -> do + trace "TraceOff" + setLoggingOff @TRACE + + + diff --git a/hbs2-peer/app/RPC2/Peers.hs b/hbs2-peer/app/RPC2/Peers.hs new file mode 100644 index 00000000..31e8ccae --- /dev/null +++ b/hbs2-peer/app/RPC2/Peers.hs @@ -0,0 +1,41 @@ +module RPC2.Peers where + +import HBS2.Actors.Peer +import HBS2.Data.Types.Peer +import HBS2.Net.Proto.Types +import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated + +import HBS2.Net.Proto.Definition() + +import PeerTypes + +import RPC2.Types + +import Control.Monad +import Lens.Micro.Platform +import Data.Maybe + +data RpcPeers + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcPeers where + type instance Input RpcPeers = () + type instance Output RpcPeers = [(PubKey 'Sign HBS2Basic, PeerAddr L4Proto)] + + handleMethod _ = do + co <- getRpcContext @RPC2Context + withPeerM (rpcPeerEnv co) $ do + ps <- getKnownPeers @L4Proto + r <- forM ps $ \p -> do + mpde <- find (KnownPeerKey p) id + maybe1 mpde (pure Nothing) $ \pde -> do + pa <- toPeerAddr p + let k = view peerSignKey pde + pure $ Just (k, pa) + + pure $ catMaybes r + + + diff --git a/hbs2-peer/app/RPC2/PexInfo.hs b/hbs2-peer/app/RPC2/PexInfo.hs new file mode 100644 index 00000000..007c8c83 --- /dev/null +++ b/hbs2-peer/app/RPC2/PexInfo.hs @@ -0,0 +1,24 @@ +module RPC2.PexInfo where + + +import HBS2.Actors.Peer +import HBS2.Net.Proto.Types +import HBS2.Net.Proto.Service +import HBS2.Prelude.Plated + +import HBS2.Net.Proto.PeerExchange + +import RPC2.Types + +data RpcPexInfo + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcPexInfo where + type instance Input RpcPexInfo = () + type instance Output RpcPexInfo = [PeerAddr L4Proto] + + handleMethod _ = do + co <- getRpcContext @RPC2Context + withPeerM (rpcPeerEnv co) getAllPex2Peers + + + diff --git a/hbs2-peer/app/RPC2/Ping.hs b/hbs2-peer/app/RPC2/Ping.hs new file mode 100644 index 00000000..539a2a84 --- /dev/null +++ b/hbs2-peer/app/RPC2/Ping.hs @@ -0,0 +1,25 @@ +module RPC2.Ping where + +import HBS2.Prelude.Plated +import HBS2.Actors.Peer +import HBS2.Net.Proto.Types +import HBS2.Net.Proto.Service + +import HBS2.System.Logger.Simple + +import PeerTypes +import RPC2.Types + +data RpcPing + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcPing where + type instance Input RpcPing = PeerAddr L4Proto + type instance Output RpcPing = Bool + + handleMethod pa = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.ping:" <+> pretty pa + liftIO $ withPeerM (rpcPeerEnv co) $ do + pingPeerWait pa + + diff --git a/hbs2-peer/app/RPC2/Poke.hs b/hbs2-peer/app/RPC2/Poke.hs new file mode 100644 index 00000000..58490f60 --- /dev/null +++ b/hbs2-peer/app/RPC2/Poke.hs @@ -0,0 +1,23 @@ +module RPC2.Poke where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Service + +import HBS2.System.Logger.Simple +import Data.Config.Suckless.KeyValue + +import RPC2.Types + +data RpcPoke + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcPoke where + type instance Input RpcPoke = () + type instance Output RpcPoke = String + + handleMethod n = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.poke: alive and kicking!" <+> pretty n + pure $ rpcPokeAnswer co + + + diff --git a/hbs2-peer/app/RPC2/RefChan.hs b/hbs2-peer/app/RPC2/RefChan.hs new file mode 100644 index 00000000..9617485f --- /dev/null +++ b/hbs2-peer/app/RPC2/RefChan.hs @@ -0,0 +1,137 @@ +module RPC2.RefChan where + + +import HBS2.Prelude.Plated + +import HBS2.Actors.Peer +import HBS2.Hash +import HBS2.Base58 +import HBS2.Data.Detect +import HBS2.Data.Types.Refs (HashRef(..)) +import HBS2.Net.Proto.Definition() +import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Types +import HBS2.Data.Types.SignedBox +import HBS2.Net.Proto.RefChan +import HBS2.Storage + +import HBS2.System.Logger.Simple +import PeerTypes +import RPC2.Types + +import Data.ByteString (ByteString) +import Data.Functor +import Lens.Micro.Platform +import Streaming.Prelude qualified as S +import Data.ByteString.Lazy qualified as LBS +import Codec.Serialise + +-- NOTE: refchan-head-endpoints +data RpcRefChanHeadGet +data RpcRefChanHeadFetch +data RpcRefChanHeadPost + +-- NOTE: refchan-endpoints +data RpcRefChanFetch +data RpcRefChanGet +data RpcRefChanPropose + +data RpcRefChanNotify + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanHeadGet where + type instance Input RpcRefChanHeadGet = PubKey 'Sign HBS2Basic + type instance Output RpcRefChanHeadGet = Maybe HashRef + + handleMethod puk = do + co <- getRpcContext @RPC2Context + let penv = rpcPeerEnv co + debug $ "rpc2.refchanHeadGet:" <+> pretty (AsBase58 puk) + liftIO $ withPeerM penv $ do + sto <- getStorage + liftIO $ getRef sto (RefChanHeadKey @HBS2Basic puk) <&> fmap HashRef + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanHeadFetch where + type instance Input RpcRefChanHeadFetch = PubKey 'Sign HBS2Basic + type instance Output RpcRefChanHeadFetch = () + + handleMethod puk = do + debug $ "rpc2.refchanHeadFetch:" <+> pretty (AsBase58 puk) + penv <- rpcPeerEnv <$> getRpcContext @RPC2Context + void $ liftIO $ withPeerM penv $ do + broadCastMessage (RefChanGetHead @L4Proto puk) + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanFetch where + type instance Input RpcRefChanFetch = PubKey 'Sign HBS2Basic + type instance Output RpcRefChanFetch = () + + handleMethod puk = do + debug $ "rpc2.refchanFetch:" <+> pretty (AsBase58 puk) + penv <- rpcPeerEnv <$> getRpcContext @RPC2Context + void $ liftIO $ withPeerM penv $ do + gossip (RefChanRequest @L4Proto puk) + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanGet where + type instance Input RpcRefChanGet = PubKey 'Sign HBS2Basic + type instance Output RpcRefChanGet = Maybe HashRef + + handleMethod puk = do + co <- getRpcContext @RPC2Context + let penv = rpcPeerEnv co + debug $ "rpc2.refchanGet:" <+> pretty (AsBase58 puk) + liftIO $ withPeerM penv $ do + sto <- getStorage + liftIO $ getRef sto (RefChanLogKey @HBS2Basic puk) <&> fmap HashRef + + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanPropose where + type instance Input RpcRefChanPropose = (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) + type instance Output RpcRefChanPropose = () + + handleMethod (puk, box) = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.refChanNotifyAction" <+> pretty (AsBase58 puk) + liftIO $ rpcDoRefChanPropose co (puk, box) + + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanNotify where + type instance Input RpcRefChanNotify = (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) + type instance Output RpcRefChanNotify = () + + handleMethod (puk, box) = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.refChanNotifyAction" <+> pretty (AsBase58 puk) + liftIO $ rpcDoRefChanNotify co (puk, box) + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefChanHeadPost where + type instance Input RpcRefChanHeadPost = HashRef + type instance Output RpcRefChanHeadPost = () + + handleMethod href = do + co <- getRpcContext @RPC2Context + liftIO $ rpcDoRefChanHeadPost co href + +-- instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefLogFetch where +-- type instance Input RpcRefLogFetch = PubKey 'Sign HBS2Basic +-- type instance Output RpcRefLogFetch = () + +-- handleMethod pk = do +-- co <- getRpcContext @RPC2Context +-- debug $ "rpc2.reflogFetch:" <+> pretty (AsBase58 pk) + +-- liftIO $ withPeerM (rpcPeerEnv co) $ do +-- broadCastMessage (RefLogRequest @L4Proto pk) + +-- instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefLogPost where +-- type instance Input RpcRefLogPost = RefLogUpdate L4Proto +-- type instance Output RpcRefLogPost = () + +-- handleMethod msg = do +-- co <- getRpcContext @RPC2Context +-- let pk = view refLogId msg +-- debug $ "rpc2.reflogPost:" <+> pretty (AsBase58 pk) + +-- liftIO $ withPeerM (rpcPeerEnv co) $ do +-- emit @L4Proto RefLogUpdateEvKey (RefLogUpdateEvData (pk, msg)) +-- doRefLogBroadCast msg + + diff --git a/hbs2-peer/app/RPC2/RefLog.hs b/hbs2-peer/app/RPC2/RefLog.hs new file mode 100644 index 00000000..d29cb5a6 --- /dev/null +++ b/hbs2-peer/app/RPC2/RefLog.hs @@ -0,0 +1,66 @@ +module RPC2.RefLog where + +import HBS2.Prelude.Plated + +import HBS2.Actors.Peer +import HBS2.Hash +import HBS2.Base58 +import HBS2.Data.Types.Refs (HashRef(..)) +import HBS2.Events +import HBS2.Net.Proto.Definition() +import HBS2.Net.Proto.RefLog +import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Types +import HBS2.Storage + +import HBS2.System.Logger.Simple +import PeerTypes +import RefLog (doRefLogBroadCast) +import RPC2.Types + +import Data.Functor +import Lens.Micro.Platform + +data RpcRefLogGet +data RpcRefLogFetch +data RpcRefLogPost + + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefLogGet where + type instance Input RpcRefLogGet = PubKey 'Sign HBS2Basic + type instance Output RpcRefLogGet = Maybe HashRef + + handleMethod pk = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.reflogGet:" <+> pretty (AsBase58 pk) + <+> pretty (hashObject @HbSync (RefLogKey @HBS2Basic pk)) + + liftIO $ withPeerM (rpcPeerEnv co) $ do + let sto = rpcStorage co + liftIO (getRef sto (RefLogKey @HBS2Basic pk)) <&> fmap HashRef + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefLogFetch where + type instance Input RpcRefLogFetch = PubKey 'Sign HBS2Basic + type instance Output RpcRefLogFetch = () + + handleMethod pk = do + co <- getRpcContext @RPC2Context + debug $ "rpc2.reflogFetch:" <+> pretty (AsBase58 pk) + + liftIO $ withPeerM (rpcPeerEnv co) $ do + broadCastMessage (RefLogRequest @L4Proto pk) + +instance (MonadIO m, HasRpcContext RPC2Context m) => HandleMethod m RpcRefLogPost where + type instance Input RpcRefLogPost = RefLogUpdate L4Proto + type instance Output RpcRefLogPost = () + + handleMethod msg = do + co <- getRpcContext @RPC2Context + let pk = view refLogId msg + debug $ "rpc2.reflogPost:" <+> pretty (AsBase58 pk) + + liftIO $ withPeerM (rpcPeerEnv co) $ do + emit @L4Proto RefLogUpdateEvKey (RefLogUpdateEvData (pk, msg)) + doRefLogBroadCast msg + + diff --git a/hbs2-peer/app/RPC2/Service/Unix.hs b/hbs2-peer/app/RPC2/Service/Unix.hs new file mode 100644 index 00000000..4a2b5c05 --- /dev/null +++ b/hbs2-peer/app/RPC2/Service/Unix.hs @@ -0,0 +1,89 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +module RPC2.Service.Unix + ( module RPC2.Service.Unix + , module HBS2.Net.Proto.Service + ) where + +import HBS2.Prelude.Plated +import HBS2.Actors.Peer +import HBS2.Net.Proto +import HBS2.Net.Proto.Service +import HBS2.Net.Messaging.Unix + +import HBS2.System.Logger.Simple + +import RPC2.API + +import Data.Config.Suckless.Syntax +import Data.Config.Suckless.KeyValue + +import Data.Text qualified as Text +import Control.Monad.Reader +import UnliftIO +import Data.ByteString.Lazy (ByteString) +import Codec.Serialise + +instance HasProtocol UNIX (ServiceProto RPC2 UNIX) where + type instance ProtocolId (ServiceProto RPC2 UNIX) = 0xDA2374610000 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance Monad m => HasRpcContext RPC2Context (ReaderT RPC2Context m) where + getRpcContext = ask + +-- FIXME: fix-this-ugly-shit +instance (Monad m, HasRpcContext RPC2Context m) => HasRpcContext RPC2Context (ResponseM UNIX (ReaderT MessagingUnix m)) where + getRpcContext = lift $ lift getRpcContext + +instance MonadUnliftIO m => (HasDeferred UNIX (ServiceProto RPC2 UNIX) (ReaderT RPC2Context m)) where + deferred _ m = void $ async m + +instance (MonadUnliftIO m) => + HasDeferred UNIX (ServiceProto RPC2 UNIX) (ResponseM UNIX m) where + deferred _ m = do + -- FIXME: this-might-be-ok-for-rpc + -- никаких конвейров и прочих модных + -- штук, которые реализованы в PeerM + -- можно прикрутить какой-то глоальный + -- пул процессов? + -- О! Конвейр, буде он понадобится, + -- можно запихнуть прямо в MessagingUnix + void $ async m + +instance Monad m => HasConf (ReaderT RPC2Context m) where + getConf = asks rpcConfig + +sodef :: FilePath +sodef = "/tmp/hbs2-rpc2.socket" + +getSocketName :: HasConf m => m FilePath +getSocketName = do + syn <- getConf + + let soname = lastDef sodef [ Text.unpack n + | ListVal @C (Key "rpc2" [SymbolVal "unix", LitStrVal n]) <- syn + ] + pure soname + +runService :: ( HasConf m + , MonadUnliftIO m + , HasRpcContext RPC2Context m + , HasDeferred UNIX (ServiceProto RPC2 UNIX) m + ) => m () +runService = do + + soname <- getSocketName + + notice $ "RPC2 Service started" <+> pretty soname + + server <- newMessagingUnixOpts [MUFork] True 1.0 soname + m1 <- async $ runMessagingUnix server + link m1 + + flip runReaderT server do + runProto @UNIX + [ makeResponse (makeServer @RPC2) + ] + + diff --git a/hbs2-peer/app/RPC2/Types.hs b/hbs2-peer/app/RPC2/Types.hs new file mode 100644 index 00000000..eb85314e --- /dev/null +++ b/hbs2-peer/app/RPC2/Types.hs @@ -0,0 +1,29 @@ +module RPC2.Types where + +import HBS2.Actors.Peer +import HBS2.Net.Proto.Types +import HBS2.Data.Types.Refs (HashRef) +import HBS2.Data.Types.SignedBox + +import Data.Config.Suckless.Syntax +import PeerTypes (DownloadEnv(..)) +import PeerConfig + +import Data.ByteString ( ByteString ) + +data RPC2Context = + RPC2Context + { rpcConfig :: [Syntax C] + , rpcPokeAnswer :: String + , rpcPeerEnv :: PeerEnv L4Proto + , rpcDownloadEnv :: DownloadEnv L4Proto + , rpcLocalMultiCast :: Peer L4Proto + , rpcStorage :: AnyStorage + , rpcDoRefChanHeadPost :: HashRef -> IO () + , rpcDoRefChanPropose :: (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) -> IO () + , rpcDoRefChanNotify :: (PubKey 'Sign HBS2Basic, SignedBox ByteString L4Proto) -> IO () + } + +class HasRpcContext a m where + getRpcContext :: m a + diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 5c32f61c..4d958e12 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -194,6 +194,7 @@ refChanNotifyRelyFn env chan msg@(Notify _ (SignedBox k box s)) = do forM_ notifiers $ \(RefChanNotifier _ q _) -> do atomically $ writeTQueue q (Notify @UNIX chan (SignedBox k box s)) +refChanNotifyRelyFn _ _ _ = pure () refChanAddDownload :: forall e m . ( m ~ PeerM e IO , MyPeer e diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index 97624708..a022c07c 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -239,6 +239,7 @@ reflogWorker conf adapter = do for_ (HashMap.toList byRef) $ \(r,x) -> do let reflogkey = RefLogKey @s r + h' <- liftIO $! getRef sto (RefLogKey @s r) hashes <- liftIO $ readHashesFromBlock sto h' <&> HashSet.fromList @@ -250,6 +251,7 @@ reflogWorker conf adapter = do let already = newHashes `HashSet.isSubsetOf` hashes + unless already do -- TODO: needs-very-fast-sort-and-dedupe let hashesNew = (hashes <> newHashes) & HashSet.toList @@ -266,7 +268,7 @@ reflogWorker conf adapter = do -- TODO: old-root-to-delete - trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty newRoot + trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty (hashObject @HbSync reflogkey) <+> pretty newRoot -- trace "I'm a reflog update worker" diff --git a/hbs2-peer/app/SendBlockAnnounce.hs b/hbs2-peer/app/SendBlockAnnounce.hs new file mode 100644 index 00000000..c44f76e6 --- /dev/null +++ b/hbs2-peer/app/SendBlockAnnounce.hs @@ -0,0 +1,37 @@ +module SendBlockAnnounce where + +import HBS2.Prelude +import HBS2.Hash +import HBS2.Actors.Peer +import HBS2.Storage(Storage(..)) +import HBS2.Net.Proto.Types +import HBS2.Net.Proto.BlockAnnounce + +import PeerTypes + +import HBS2.System.Logger.Simple + +sendBlockAnnounce :: forall e m . (e ~ L4Proto, MonadIO m) + => PeerEnv e + -> Peer e + -> Hash HbSync + -> m () + +sendBlockAnnounce env mcast h = liftIO $ withPeerM env do + debug $ "got announce rpc" <+> pretty h + sto <- getStorage + mbsize <- liftIO $ hasBlock sto h + + maybe1 mbsize (pure ()) $ \size -> do + debug "send multicast announce" + + no <- peerNonce @e + let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta size h + let announce = BlockAnnounce @e no annInfo + + request mcast announce + + forKnownPeers $ \p _ -> do + debug $ "send single-cast announces" <+> pretty p + request @e p announce + diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 63a2b925..d37fe63b 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -129,18 +129,37 @@ executable hbs2-peer import: common-deps main-is: PeerMain.hs - other-modules: BlockDownload + other-modules: + BlockDownload , BlockHttpDownload , DownloadQ , DownloadMon , EncryptionKeys , Bootstrap , PeerInfo - , PeerMain.DialogCliCommand + -- , PeerMain.DialogCliCommand , PeerMain.Dialog.Server , PeerMain.Dialog.Spec , PeerMeta - , RPC + , SendBlockAnnounce + , CheckBlockAnnounce + , CheckPeer + , Fetch + , Log + , RPC2 + , RPC2.Service.Unix + , RPC2.API + , RPC2.Types + , RPC2.Poke + , RPC2.Announce + , RPC2.Fetch + , RPC2.Die + , RPC2.LogLevel + , RPC2.Peers + , RPC2.PexInfo + , RPC2.Ping + , RPC2.RefLog + , RPC2.RefChan , PeerTypes , PeerConfig , RefLog diff --git a/hbs2-tests/test/PrototypeGenericService.hs b/hbs2-tests/test/PrototypeGenericService.hs index ca6fa30c..c068424a 100644 --- a/hbs2-tests/test/PrototypeGenericService.hs +++ b/hbs2-tests/test/PrototypeGenericService.hs @@ -9,6 +9,7 @@ import HBS2.Clock import HBS2.Net.Messaging.Unix import HBS2.Net.Proto import HBS2.Prelude.Plated +-- import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Service import HBS2.System.Logger.Simple @@ -17,11 +18,12 @@ import Codec.Serialise import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import System.FilePath.Posix -import System.IO -import System.IO.Temp +-- import System.IO +-- import System.IO.Temp import UnliftIO.Async import Data.List +import UnliftIO import Test.Tasty.HUnit data Method1 @@ -30,11 +32,15 @@ data Method2 type MyServiceMethods1 = '[ Method1, Method2 ] instance HasProtocol UNIX (ServiceProto MyServiceMethods1 UNIX) where - type instance ProtocolId (ServiceProto MyServiceMethods1 UNIX) = 1 + type instance ProtocolId (ServiceProto MyServiceMethods1 UNIX) = 0xd79349a1bffb70c4 type instance Encoded UNIX = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise + +-- instance (MonadIO m, HasProtocol UNIX (ServiceProto MyServiceMethods1 UNIX)) => HasTimeLimits UNIX (ServiceProto MyServiceMethods1 UNIX) m where +-- tryLockForPeriod _ _ = pure True + instance MonadIO m => HandleMethod m Method1 where type instance Input Method1 = String type instance Output Method1 = String @@ -51,15 +57,9 @@ instance MonadIO m => HandleMethod m Method2 where handleMethod _ = pure () -instance Monad m => HasFabriq UNIX (ReaderT MessagingUnix m) where - getFabriq = asks Fabriq - -instance Monad m => HasOwnPeer UNIX (ReaderT MessagingUnix m) where - ownPeer = asks msgUnixSelf - -instance HasProtocol e (ServiceProto api e) => HasTimeLimits e (ServiceProto api e) IO where - tryLockForPeriod _ _ = pure True - +instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m) + => HasDeferred UNIX (ServiceProto api UNIX) m where + deferred _ m = void (async m) main :: IO () main = do @@ -71,9 +71,6 @@ main = do setLogging @NOTICE (logPrefix "[notice] ") setLogging @TRACE (logPrefix "[trace] ") - liftIO $ hSetBuffering stdout LineBuffering - liftIO $ hSetBuffering stderr LineBuffering - withSystemTempDirectory "test-unix-socket" $ \tmp -> do let soname = tmp "unix.socket"