wip lref rpc, peer-cli

This commit is contained in:
Sergey Ivanov 2023-03-14 21:18:52 +04:00
parent 8617b91d42
commit fa9edc0146
3 changed files with 70 additions and 26 deletions

View File

@ -114,6 +114,9 @@ data instance Signed SignaturePresent (MutableRef e 'LinearRef)
instance Serialise (Signature e) => instance Serialise (Signature e) =>
Serialise (Signed 'SignaturePresent (MutableRef e 'LinearRef)) Serialise (Signed 'SignaturePresent (MutableRef e 'LinearRef))
instance Show (Signature e) =>
Show (Signed 'SignaturePresent (MutableRef e 'LinearRef))
data instance Signed 'SignatureVerified (MutableRef e 'LinearRef) data instance Signed 'SignatureVerified (MutableRef e 'LinearRef)
= LinearMutableRefSignatureVerified = LinearMutableRefSignatureVerified
{ lmrefVSignature :: Signature e { lmrefVSignature :: Signature e

View File

@ -145,12 +145,13 @@ makeLenses 'RPCOpt
data RPCCommand = data RPCCommand =
POKE POKE
| ANNOUNCE (Hash HbSync) | ANNOUNCE (Hash HbSync)
| ANNLREF (Hash HbSync)
| PING (PeerAddr UDP) (Maybe (Peer UDP)) | PING (PeerAddr UDP) (Maybe (Peer UDP))
| CHECK PeerNonce (PeerAddr UDP) (Hash HbSync) | CHECK PeerNonce (PeerAddr UDP) (Hash HbSync)
| FETCH (Hash HbSync) | FETCH (Hash HbSync)
| PEERS | PEERS
| SETLOG SetLogging | SETLOG SetLogging
| LREFANN (Hash HbSync)
| LREFGET (Hash HbSync)
data PeerOpts = data PeerOpts =
PeerOpts PeerOpts
@ -209,11 +210,15 @@ runCLI = join . customExecParser (prefs showHelpOnError) $
<> command "run" (info pRun (progDesc "run peer")) <> command "run" (info pRun (progDesc "run peer"))
<> command "poke" (info pPoke (progDesc "poke peer by rpc")) <> command "poke" (info pPoke (progDesc "poke peer by rpc"))
<> command "announce" (info pAnnounce (progDesc "announce block")) <> command "announce" (info pAnnounce (progDesc "announce block"))
<> command "annlref" (info pAnnLRef (progDesc "announce linear ref"))
<> command "ping" (info pPing (progDesc "ping another peer")) <> command "ping" (info pPing (progDesc "ping another peer"))
<> command "fetch" (info pFetch (progDesc "fetch block")) <> command "fetch" (info pFetch (progDesc "fetch block"))
<> command "peers" (info pPeers (progDesc "show known peers")) <> command "peers" (info pPeers (progDesc "show known peers"))
<> command "log" (info pLog (progDesc "set logging level")) <> command "log" (info pLog (progDesc "set logging level"))
<> command "lref-ann" (info pLRefAnn (progDesc "announce linear ref"))
-- <> command "lref-new" (info pNewLRef (progDesc "generates a new linear ref"))
-- <> command "lref-list" (info pListLRef (progDesc "list node linear refs"))
<> command "lref-get" (info pLRefGet (progDesc "get a linear ref"))
-- <> command "lref-update" (info pUpdateLRef (progDesc "updates a linear ref"))
) )
confOpt = strOption ( long "config" <> short 'c' <> help "config" ) confOpt = strOption ( long "config" <> short 'c' <> help "config" )
@ -254,11 +259,6 @@ runCLI = join . customExecParser (prefs showHelpOnError) $
h <- strArgument ( metavar "HASH" ) h <- strArgument ( metavar "HASH" )
pure $ runRpcCommand rpc (ANNOUNCE h) pure $ runRpcCommand rpc (ANNOUNCE h)
pAnnLRef = do
rpc <- pRpcCommon
h <- strArgument ( metavar "HASH" )
pure $ runRpcCommand rpc (ANNLREF h)
pFetch = do pFetch = do
rpc <- pRpcCommon rpc <- pRpcCommon
h <- strArgument ( metavar "HASH" ) h <- strArgument ( metavar "HASH" )
@ -289,6 +289,16 @@ runCLI = join . customExecParser (prefs showHelpOnError) $
pref <- optional $ strArgument ( metavar "DIR" ) pref <- optional $ strArgument ( metavar "DIR" )
pure $ peerConfigInit pref pure $ peerConfigInit pref
pLRefAnn = do
rpc <- pRpcCommon
h <- strArgument ( metavar "HASH" )
pure $ runRpcCommand rpc (LREFANN h)
pLRefGet = do
rpc <- pRpcCommon
h <- strArgument ( metavar "REF-ID" )
pure $ runRpcCommand rpc (LREFGET h)
myException :: SomeException -> IO () myException :: SomeException -> IO ()
myException e = die ( show e ) >> exitFailure myException e = die ( show e ) >> exitFailure
@ -634,13 +644,6 @@ runPeer opts = Exception.handle myException $ do
debug $ "send single-cast announces" <+> pretty p debug $ "send single-cast announces" <+> pretty p
request @e p announce request @e p announce
ANNLREF h -> do
debug $ "got annlref rpc" <+> pretty h
st <- getStorage
void $ runMaybeT do
slref <- MaybeT $ getLRefValAction st h
lift $ broadcastMsgAction' env (AnnLRef @e h slref :: LRefProto UDP)
CHECK nonce pa h -> do CHECK nonce pa h -> do
pip <- fromPeerAddr @e pa pip <- fromPeerAddr @e pa
@ -679,6 +682,16 @@ runPeer opts = Exception.handle myException $ do
withDownload denv $ do withDownload denv $ do
processBlock h processBlock h
LREFANN h -> do
debug $ "got lrefann rpc" <+> pretty h
st <- getStorage
void $ runMaybeT do
slref <- MaybeT $ getLRefValAction st h
lift $ broadcastMsgAction' env (AnnLRef @e h slref :: LRefProto UDP)
LREFGET h -> do
debug $ "got lrefget rpc" <+> pretty h
_ -> pure () _ -> pure ()
@ -704,9 +717,6 @@ runPeer opts = Exception.handle myException $ do
let annAction h = do let annAction h = do
liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h) liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h)
let annLRefAction h = do
liftIO $ atomically $ writeTQueue rpcQ (ANNLREF h)
let pingAction pa = do let pingAction pa = do
that <- thatPeer (Proxy @(RPC e)) that <- thatPeer (Proxy @(RPC e))
liftIO $ atomically $ writeTQueue rpcQ (PING pa (Just that)) liftIO $ atomically $ writeTQueue rpcQ (PING pa (Just that))
@ -743,16 +753,31 @@ runPeer opts = Exception.handle myException $ do
trace "TraceOff" trace "TraceOff"
setLoggingOff @TRACE setLoggingOff @TRACE
let lrefAnnAction h = do
liftIO $ atomically $ writeTQueue rpcQ (LREFANN h)
let lrefGetAction h = do
debug $ "lrefGetAction" <+> pretty h
who <- thatPeer (Proxy @(RPC e))
void $ liftIO $ async $ withPeerM penv $ do
st <- getStorage
mhval <- getLRefValAction st h
forM_ mhval \hval ->
request who (RPCLRefGetAnswer @e h hval)
let arpc = RpcAdapter pokeAction let arpc = RpcAdapter pokeAction
dontHandle dontHandle
annAction annAction
annLRefAction
pingAction pingAction
dontHandle dontHandle
fetchAction fetchAction
peersAction peersAction
dontHandle dontHandle
logLevelAction logLevelAction
lrefAnnAction
lrefGetAction
(\h hval -> pure ())
rpc <- async $ runRPC udp1 do rpc <- async $ runRPC udp1 do
runProto @e runProto @e
@ -833,8 +858,6 @@ withRPC o cmd = do
case cmd of case cmd of
RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCAnnLRef{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCPing{} -> do RPCPing{} -> do
@ -860,6 +883,10 @@ withRPC o cmd = do
RPCLogLevel{} -> liftIO exitSuccess RPCLogLevel{} -> liftIO exitSuccess
RPCLRefAnn{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCLRefGet{} -> pause @'Seconds 1 >> liftIO exitSuccess
_ -> pure () _ -> pure ()
void $ liftIO $ waitAnyCatchCancel [proto] void $ liftIO $ waitAnyCatchCancel [proto]
@ -871,7 +898,6 @@ withRPC o cmd = do
dontHandle dontHandle
(liftIO . atomically . writeTQueue pq) (liftIO . atomically . writeTQueue pq)
(const $ liftIO exitSuccess) (const $ liftIO exitSuccess)
(const $ liftIO exitSuccess)
(const $ notice "ping?") (const $ notice "ping?")
(liftIO . atomically . writeTQueue q) (liftIO . atomically . writeTQueue q)
dontHandle dontHandle
@ -882,15 +908,20 @@ withRPC o cmd = do
dontHandle dontHandle
(const $ liftIO exitSuccess)
(const $ liftIO exitSuccess)
(\h hval -> Log.info $ pretty h <+> viaShow hval)
runRpcCommand :: RPCOpt -> RPCCommand -> IO () runRpcCommand :: RPCOpt -> RPCCommand -> IO ()
runRpcCommand opt = \case runRpcCommand opt = \case
POKE -> withRPC opt RPCPoke POKE -> withRPC opt RPCPoke
PING s _ -> withRPC opt (RPCPing s) PING s _ -> withRPC opt (RPCPing s)
ANNOUNCE h -> withRPC opt (RPCAnnounce h) ANNOUNCE h -> withRPC opt (RPCAnnounce h)
ANNLREF h -> withRPC opt (RPCAnnLRef h)
FETCH h -> withRPC opt (RPCFetch h) FETCH h -> withRPC opt (RPCFetch h)
PEERS -> withRPC opt RPCPeers PEERS -> withRPC opt RPCPeers
SETLOG s -> withRPC opt (RPCLogLevel s) SETLOG s -> withRPC opt (RPCLogLevel s)
LREFANN h -> withRPC opt (RPCLRefAnn h)
LREFGET h -> withRPC opt (RPCLRefGet h)
_ -> pure () _ -> pure ()

View File

@ -2,6 +2,7 @@
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
module RPC where module RPC where
import HBS2.Data.Types.Refs
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Hash import HBS2.Hash
@ -28,15 +29,20 @@ data RPC e =
| RPCPong (PeerAddr e) | RPCPong (PeerAddr e)
| RPCPokeAnswer (PubKey 'Sign e) | RPCPokeAnswer (PubKey 'Sign e)
| RPCAnnounce (Hash HbSync) | RPCAnnounce (Hash HbSync)
| RPCAnnLRef (Hash HbSync)
| RPCFetch (Hash HbSync) | RPCFetch (Hash HbSync)
| RPCPeers | RPCPeers
| RPCPeersAnswer (PeerAddr e) (PubKey 'Sign e) | RPCPeersAnswer (PeerAddr e) (PubKey 'Sign e)
| RPCLogLevel SetLogging | RPCLogLevel SetLogging
| RPCLRefAnn (Hash HbSync)
| RPCLRefGet (Hash HbSync)
| RPCLRefGetAnswer (Hash HbSync) (Signed SignaturePresent (MutableRef e 'LinearRef))
deriving stock (Generic) deriving stock (Generic)
instance Serialise (PeerAddr e) => Serialise (RPC e) instance (
Serialise (PeerAddr e)
, Serialise (Signature e)
) => Serialise (RPC e)
instance HasProtocol UDP (RPC UDP) where instance HasProtocol UDP (RPC UDP) where
type instance ProtocolId (RPC UDP) = 0xFFFFFFE0 type instance ProtocolId (RPC UDP) = 0xFFFFFFE0
@ -58,13 +64,15 @@ data RpcAdapter e m =
{ rpcOnPoke :: RPC e -> m () { rpcOnPoke :: RPC e -> m ()
, rpcOnPokeAnswer :: PubKey 'Sign e -> m () , rpcOnPokeAnswer :: PubKey 'Sign e -> m ()
, rpcOnAnnounce :: Hash HbSync -> m () , rpcOnAnnounce :: Hash HbSync -> m ()
, rpcOnAnnLRef :: Hash HbSync -> m ()
, rpcOnPing :: PeerAddr e -> m () , rpcOnPing :: PeerAddr e -> m ()
, rpcOnPong :: PeerAddr e -> m () , rpcOnPong :: PeerAddr e -> m ()
, rpcOnFetch :: Hash HbSync -> m () , rpcOnFetch :: Hash HbSync -> m ()
, rpcOnPeers :: RPC e -> m () , rpcOnPeers :: RPC e -> m ()
, rpcOnPeersAnswer :: (PeerAddr e, PubKey 'Sign e) -> m () , rpcOnPeersAnswer :: (PeerAddr e, PubKey 'Sign e) -> m ()
, rpcOnLogLevel :: SetLogging -> m () , rpcOnLogLevel :: SetLogging -> m ()
, rpcOnLRefAnn :: Hash HbSync -> m ()
, rpcOnLRefGet :: Hash HbSync -> m ()
, rpcOnLRefGetAnswer :: Hash HbSync -> Signed SignaturePresent (MutableRef e 'LinearRef) -> m ()
} }
newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a }
@ -108,11 +116,13 @@ rpcHandler adapter = \case
p@RPCPoke{} -> rpcOnPoke adapter p p@RPCPoke{} -> rpcOnPoke adapter p
(RPCPokeAnswer k) -> rpcOnPokeAnswer adapter k (RPCPokeAnswer k) -> rpcOnPokeAnswer adapter k
(RPCAnnounce h) -> rpcOnAnnounce adapter h (RPCAnnounce h) -> rpcOnAnnounce adapter h
(RPCAnnLRef h) -> rpcOnAnnLRef adapter h
(RPCPing pa) -> rpcOnPing adapter pa (RPCPing pa) -> rpcOnPing adapter pa
(RPCPong pa) -> rpcOnPong adapter pa (RPCPong pa) -> rpcOnPong adapter pa
(RPCFetch h) -> rpcOnFetch adapter h (RPCFetch h) -> rpcOnFetch adapter h
p@RPCPeers{} -> rpcOnPeers adapter p p@RPCPeers{} -> rpcOnPeers adapter p
(RPCPeersAnswer pa k) -> rpcOnPeersAnswer adapter (pa,k) (RPCPeersAnswer pa k) -> rpcOnPeersAnswer adapter (pa,k)
(RPCLogLevel l) -> rpcOnLogLevel adapter l (RPCLogLevel l) -> rpcOnLogLevel adapter l
(RPCLRefAnn h) -> rpcOnLRefAnn adapter h
(RPCLRefGet h) -> rpcOnLRefGet adapter h
(RPCLRefGetAnswer h hval) -> rpcOnLRefGetAnswer adapter h hval