diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 528053d1..a056b140 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -12,7 +12,6 @@ import HBS2.Defaults import HBS2.Events import HBS2.Hash import HBS2.Data.Types.Refs (RefLogKey(..)) -import HBS2.Merkle import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr import HBS2.Net.Messaging.UDP @@ -105,7 +104,6 @@ defLocalMulticast :: String defLocalMulticast = "239.192.152.145:10153" data PeerListenKey -data PeerRpcKey data PeerKeyFileKey data PeerBlackListKey data PeerWhiteListKey @@ -130,8 +128,6 @@ instance HasCfgKey PeerTraceKey FeatureSwitch where instance HasCfgKey PeerListenKey (Maybe String) where key = "listen" -instance HasCfgKey PeerRpcKey (Maybe String) where - key = "rpc" instance HasCfgKey PeerKeyFileKey (Maybe String) where key = "key" @@ -161,27 +157,6 @@ instance HasCfgValue PeerAcceptAnnounceKey AcceptAnnounce where ] kk = key @PeerAcceptAnnounceKey @AcceptAnnounce -data RPCOpt = - RPCOpt - { _rpcOptConf :: Maybe FilePath - , _rpcOptAddr :: Maybe String - } - -makeLenses 'RPCOpt - - -data RPCCommand = - DIE - | POKE - | ANNOUNCE (Hash HbSync) - | PING (PeerAddr L4Proto) (Maybe (Peer L4Proto)) - | CHECK PeerNonce (PeerAddr L4Proto) (Hash HbSync) - | FETCH (Hash HbSync) - | PEERS - | SETLOG SetLogging - | REFLOGUPDATE ByteString - | REFLOGFETCH (PubKey 'Sign (Encryption L4Proto)) - | REFLOGGET (PubKey 'Sign (Encryption L4Proto)) data PeerOpts = PeerOpts @@ -982,6 +957,10 @@ runPeer opts = U.handle (\e -> myException e h <- liftIO $ getRef sto (RefLogKey @(Encryption e) puk) request who (RPCRefLogGetAnswer @e h) + let refChanHeadSendAction h = do + trace $ "refChanHeadSendAction" <+> pretty h + pure () + let arpc = RpcAdapter pokeAction dieAction dontHandle @@ -997,6 +976,7 @@ runPeer opts = U.handle (\e -> myException e reflogFetchAction reflogGetAction dontHandle + refChanHeadSendAction -- rpcOnRefChanHeadSend rpc <- async $ runRPC udp1 do runProto @e @@ -1039,138 +1019,4 @@ emitToPeer :: ( MonadIO m emitToPeer env k e = liftIO $ withPeerM env (emit k e) -rpcClientMain :: RPCOpt -> IO () -> IO () -rpcClientMain opt action = do - setLoggingOff @DEBUG - action - -withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () -withRPC o cmd = rpcClientMain o $ runResourceT do - - liftIO $ hSetBuffering stdout LineBuffering - - conf <- peerConfigRead (view rpcOptConf o) - - let rpcConf = cfgValue @PeerRpcKey conf :: Maybe String - - saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set" - - as <- liftIO $ parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) - let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as - - rpc <- pure rpc' `orDie` "Can't parse RPC endpoint" - - udp1 <- newMessagingUDP False Nothing `orDie` "Can't start RPC" - - mrpc <- async $ runMessagingUDP udp1 - - pingQ <- liftIO newTQueueIO - - pokeQ <- liftIO newTQueueIO - - pokeFQ <- liftIO newTQueueIO - - refQ <- liftIO newTQueueIO - - let adapter = - RpcAdapter dontHandle - dontHandle - (liftIO . atomically . writeTQueue pokeQ) - (liftIO . atomically . writeTQueue pokeFQ) - (const $ liftIO exitSuccess) - (const $ notice "ping?") - (liftIO . atomically . writeTQueue pingQ) - dontHandle - dontHandle - - (\(pa, k) -> Log.info $ pretty (AsBase58 k) <+> pretty pa - ) - - dontHandle - dontHandle - dontHandle - dontHandle - - ( liftIO . atomically . writeTQueue refQ ) - - prpc <- async $ runRPC udp1 do - env <- ask - proto <- liftIO $ async $ continueWithRPC env $ do - runProto @L4Proto - [ makeResponse (rpcHandler adapter) - ] - - request rpc cmd - - case cmd of - RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess - - RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess - - RPCPing{} -> do - void $ liftIO $ void $ race (pause @'Seconds 5 >> exitFailure) do - pa <- liftIO $ atomically $ readTQueue pingQ - Log.info $ "pong from" <+> pretty pa - exitSuccess - - - RPCDie{} -> do - pause @'Seconds 0.25 - liftIO exitSuccess - - RPCPoke{} -> do - let onTimeout = do pause @'Seconds 1.5 - Log.info "no-one-is-here" - exitFailure - - void $ liftIO $ race onTimeout do - k <- liftIO $ atomically $ readTQueue pokeFQ - print (pretty k) - hFlush stdout - exitSuccess - - RPCPeers{} -> liftIO do - pause @'Seconds 1 - exitSuccess - - RPCLogLevel{} -> liftIO exitSuccess - - RPCRefLogUpdate{} -> liftIO do - pause @'Seconds 0.1 - exitSuccess - - RPCRefLogFetch {} -> liftIO do - pause @'Seconds 0.5 - exitSuccess - - RPCRefLogGet{} -> liftIO do - void $ liftIO $ race (pause @'Seconds 0.1 >> exitFailure) do - k <- liftIO $ atomically $ readTQueue refQ - case k of - Nothing -> exitFailure - Just re -> do - print $ pretty re - hFlush stdout - exitSuccess - - _ -> pure () - - void $ liftIO $ waitAnyCancel [proto] - - void $ waitAnyCancel [mrpc, prpc] - -runRpcCommand :: FromStringMaybe (IPAddrPort L4Proto) => RPCOpt -> RPCCommand -> IO () -runRpcCommand opt = \case - DIE -> withRPC opt RPCDie - POKE -> withRPC opt RPCPoke - PING s _ -> withRPC opt (RPCPing s) - ANNOUNCE h -> withRPC opt (RPCAnnounce h) - FETCH h -> withRPC opt (RPCFetch h) - PEERS -> withRPC opt RPCPeers - SETLOG s -> withRPC opt (RPCLogLevel s) - REFLOGUPDATE bs -> withRPC opt (RPCRefLogUpdate bs) - REFLOGFETCH k -> withRPC opt (RPCRefLogFetch k) - REFLOGGET k -> withRPC opt (RPCRefLogGet k) - - _ -> pure () diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 2cbbf0ad..4af7410e 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -2,18 +2,43 @@ {-# Language UndecidableInstances #-} module RPC where -import HBS2.Prelude.Plated -import HBS2.Net.Proto -import HBS2.Net.Messaging.UDP -import HBS2.Hash -import HBS2.Actors.Peer -import HBS2.Net.Auth.Credentials -import HBS2.Net.Proto.Definition() -import Control.Monad.Reader -import Data.ByteString.Lazy (ByteString) +import HBS2.Actors.Peer +import HBS2.Base58 +import HBS2.Clock +import HBS2.Hash +import HBS2.Net.Auth.Credentials +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging.UDP +import HBS2.Net.Proto +import HBS2.Net.Proto.Definition() +import HBS2.OrDie +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple hiding (info) +import HBS2.System.Logger.Simple qualified as Log + +import PeerConfig + + import Codec.Serialise (serialise,deserialiseOrFail) +import Control.Applicative +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Control.Monad.Reader +import Control.Monad.Trans.Resource +import Data.ByteString.Lazy (ByteString) +import Data.Function +import Data.Functor +import Data.List qualified as L import Lens.Micro.Platform +import Network.Socket +import System.Exit +import System.IO +import UnliftIO.Async as U +data PeerRpcKey + +instance HasCfgKey PeerRpcKey (Maybe String) where + key = "rpc" data SetLogging = DebugOn Bool @@ -22,6 +47,20 @@ data SetLogging = instance Serialise SetLogging +data RPCCommand = + DIE + | POKE + | ANNOUNCE (Hash HbSync) + | PING (PeerAddr L4Proto) (Maybe (Peer L4Proto)) + | CHECK PeerNonce (PeerAddr L4Proto) (Hash HbSync) + | FETCH (Hash HbSync) + | PEERS + | SETLOG SetLogging + | REFLOGUPDATE ByteString + | REFLOGFETCH (PubKey 'Sign (Encryption L4Proto)) + | REFLOGGET (PubKey 'Sign (Encryption L4Proto)) + | REFCHANHEADSEND (Hash HbSync) + data RPC e = RPCDie | RPCPoke @@ -38,6 +77,7 @@ data RPC e = | RPCRefLogFetch (PubKey 'Sign (Encryption e)) | RPCRefLogGet (PubKey 'Sign (Encryption e)) | RPCRefLogGetAnswer (Maybe (Hash HbSync)) + | RPCRefChanHeadSend (Hash HbSync) deriving stock (Generic) instance (Serialise (PeerAddr e), Serialise (PubKey 'Sign (Encryption e))) => Serialise (RPC e) @@ -74,6 +114,7 @@ data RpcAdapter e m = , rpcOnRefLogFetch :: PubKey 'Sign (Encryption e) -> m () , rpcOnRefLogGet :: PubKey 'Sign (Encryption e) -> m () , rpcOnRefLogGetAnsw :: Maybe (Hash HbSync) -> m () + , rpcOnRefChanHeadSend :: Hash HbSync -> m () } newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } @@ -129,4 +170,158 @@ rpcHandler adapter = \case (RPCRefLogFetch e) -> rpcOnRefLogFetch adapter e (RPCRefLogGet e) -> rpcOnRefLogGet adapter e (RPCRefLogGetAnswer s) -> rpcOnRefLogGetAnsw adapter s + (RPCRefChanHeadSend s) -> rpcOnRefChanHeadSend adapter s + +data RPCOpt = + RPCOpt + { _rpcOptConf :: Maybe FilePath + , _rpcOptAddr :: Maybe String + } + +makeLenses 'RPCOpt + + +runRpcCommand :: FromStringMaybe (IPAddrPort L4Proto) => RPCOpt -> RPCCommand -> IO () +runRpcCommand opt = \case + DIE -> withRPC opt RPCDie + POKE -> withRPC opt RPCPoke + PING s _ -> withRPC opt (RPCPing s) + ANNOUNCE h -> withRPC opt (RPCAnnounce h) + FETCH h -> withRPC opt (RPCFetch h) + PEERS -> withRPC opt RPCPeers + SETLOG s -> withRPC opt (RPCLogLevel s) + REFLOGUPDATE bs -> withRPC opt (RPCRefLogUpdate bs) + REFLOGFETCH k -> withRPC opt (RPCRefLogFetch k) + REFLOGGET k -> withRPC opt (RPCRefLogGet k) + REFCHANHEADSEND h -> withRPC opt (RPCRefChanHeadSend h) + + _ -> pure () + + +withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () +withRPC o cmd = rpcClientMain o $ runResourceT do + + liftIO $ hSetBuffering stdout LineBuffering + + conf <- peerConfigRead (view rpcOptConf o) + + let rpcConf = cfgValue @PeerRpcKey conf :: Maybe String + + saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set" + + as <- liftIO $ parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) + let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as + + rpc <- pure rpc' `orDie` "Can't parse RPC endpoint" + + udp1 <- newMessagingUDP False Nothing `orDie` "Can't start RPC" + + mrpc <- async $ runMessagingUDP udp1 + + pingQ <- liftIO newTQueueIO + + pokeQ <- liftIO newTQueueIO + + pokeFQ <- liftIO newTQueueIO + + refQ <- liftIO newTQueueIO + + let adapter = + RpcAdapter dontHandle + dontHandle + (liftIO . atomically . writeTQueue pokeQ) + (liftIO . atomically . writeTQueue pokeFQ) + (const $ liftIO exitSuccess) + (const $ notice "ping?") + (liftIO . atomically . writeTQueue pingQ) + dontHandle + dontHandle + + (\(pa, k) -> Log.info $ pretty (AsBase58 k) <+> pretty pa + ) + + dontHandle + dontHandle + dontHandle + dontHandle + + ( liftIO . atomically . writeTQueue refQ ) + + dontHandle + + prpc <- async $ runRPC udp1 do + env <- ask + proto <- liftIO $ async $ continueWithRPC env $ do + runProto @L4Proto + [ makeResponse (rpcHandler adapter) + ] + + request rpc cmd + + case cmd of + RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess + + RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess + + RPCPing{} -> do + void $ liftIO $ void $ race (pause @'Seconds 5 >> exitFailure) do + pa <- liftIO $ atomically $ readTQueue pingQ + Log.info $ "pong from" <+> pretty pa + exitSuccess + + + RPCDie{} -> do + pause @'Seconds 0.25 + liftIO exitSuccess + + RPCPoke{} -> do + let onTimeout = do pause @'Seconds 1.5 + Log.info "no-one-is-here" + exitFailure + + void $ liftIO $ race onTimeout do + k <- liftIO $ atomically $ readTQueue pokeFQ + print (pretty k) + hFlush stdout + exitSuccess + + RPCPeers{} -> liftIO do + pause @'Seconds 1 + exitSuccess + + RPCLogLevel{} -> liftIO exitSuccess + + RPCRefLogUpdate{} -> liftIO do + pause @'Seconds 0.1 + exitSuccess + + RPCRefLogFetch {} -> liftIO do + pause @'Seconds 0.5 + exitSuccess + + RPCRefLogGet{} -> liftIO do + void $ liftIO $ race (pause @'Seconds 0.1 >> exitFailure) do + k <- liftIO $ atomically $ readTQueue refQ + case k of + Nothing -> exitFailure + Just re -> do + print $ pretty re + hFlush stdout + exitSuccess + + RPCRefChanHeadSend {} -> liftIO do + pause @'Seconds 0.25 + exitSuccess + + _ -> pure () + + void $ liftIO $ waitAnyCancel [proto] + + void $ waitAnyCancel [mrpc, prpc] + + +rpcClientMain :: RPCOpt -> IO () -> IO () +rpcClientMain opt action = do + setLoggingOff @DEBUG + action