wip, refchanget proto skeleton, no log merge

This commit is contained in:
Dmitry Zuikov 2023-07-19 08:06:46 +03:00
parent d2141c304b
commit 819cec6402
4 changed files with 104 additions and 11 deletions

View File

@ -139,7 +139,19 @@ instance HasProtocol L4Proto (RefChanUpdate L4Proto) where
-- мы не можем рассылать одинаковые сообщения никогда, -- мы не можем рассылать одинаковые сообщения никогда,
-- ну или хотя бы не чаще, чем раз в 10 минут. -- ну или хотя бы не чаще, чем раз в 10 минут.
requestPeriodLim = ReqLimPerMessage 600 -- но poll у нас в минутах, и с минимальным периодом 1 минута
requestPeriodLim = ReqLimPerMessage 60
instance HasProtocol L4Proto (RefChanRequest L4Proto) where
type instance ProtocolId (RefChanRequest L4Proto) = 11003
type instance Encoded L4Proto = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
-- мы не можем рассылать одинаковые сообщения никогда,
-- ну или хотя бы не чаще, чем раз в 10 минут.
-- но poll у нас в минутах, и с минимальным периодом 1 минута
requestPeriodLim = ReqLimPerMessage 60
instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where
expiresIn _ = Just defCookieTimeoutSec expiresIn _ = Just defCookieTimeoutSec

View File

@ -206,11 +206,17 @@ instance Expires (EventKey e (RefChanRound e)) where
data RefChanUpdate e = data RefChanUpdate e =
Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира
| Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира | Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира
deriving stock (Generic) deriving stock (Generic)
instance ForRefChans e => Serialise (RefChanUpdate e) instance ForRefChans e => Serialise (RefChanUpdate e)
data RefChanRequest e =
RefChanRequest (RefChanId e)
| RefChanResponse (RefChanId e) HashRef
deriving stock (Generic)
instance ForRefChans e => Serialise (RefChanRequest e)
type instance SessionData e (RefChanHeadBlock e) = RefChanHeadBlock e type instance SessionData e (RefChanHeadBlock e) = RefChanHeadBlock e
newtype instance SessionKey e (RefChanHeadBlock e) = newtype instance SessionKey e (RefChanHeadBlock e) =
@ -234,11 +240,19 @@ data RefChanAdapter e m =
, refChanWriteTran :: HashRef -> m () , refChanWriteTran :: HashRef -> m ()
} }
refChanUpdateChan :: RefChanUpdate e -> RefChanId e class HasRefChanId e p | p -> e where
refChanUpdateChan = \case getRefChanId :: p -> RefChanId e
instance HasRefChanId e (RefChanUpdate e) where
getRefChanId = \case
Propose c _ -> c Propose c _ -> c
Accept c _ -> c Accept c _ -> c
instance HasRefChanId e (RefChanRequest e) where
getRefChanId = \case
RefChanRequest c -> c
RefChanResponse c _ -> c
refChanHeadProto :: forall e s m . ( MonadIO m refChanHeadProto :: forall e s m . ( MonadIO m
, Request e (RefChanHead e) m , Request e (RefChanHead e) m
, Request e (BlockAnnounce e) m , Request e (BlockAnnounce e) m
@ -348,7 +362,7 @@ refChanUpdateProto self pc adapter msg = do
-- "блок". -- "блок".
-- так-то и количество proposers можно ограничить -- так-то и количество proposers можно ограничить
guard =<< lift (refChanSubscribed adapter (refChanUpdateChan msg)) guard =<< lift (refChanSubscribed adapter (getRefChanId msg))
let h0 = hashObject @HbSync (serialise msg) let h0 = hashObject @HbSync (serialise msg)
guard =<< liftIO (hasBlock sto h0 <&> isNothing) guard =<< liftIO (hasBlock sto h0 <&> isNothing)
@ -356,9 +370,6 @@ refChanUpdateProto self pc adapter msg = do
case msg of case msg of
Propose chan box -> do Propose chan box -> do
let h0 = hashObject @HbSync (serialise msg)
guard =<< liftIO (hasBlock sto h0 <&> isNothing)
debug $ "RefChanUpdate/Propose" <+> pretty h0 debug $ "RefChanUpdate/Propose" <+> pretty h0
deferred proto do deferred proto do
@ -534,6 +545,62 @@ refChanUpdateProto self pc adapter msg = do
-- Пишем в итоговый лог только такие -- Пишем в итоговый лог только такие
-- propose + accept у которых больше quorum accept -- propose + accept у которых больше quorum accept
-- каждую транзу обрабатываем только один раз -- каждую транзу обрабатываем только один раз
--
refChanRequestProto :: forall e s m . ( MonadIO m
, Request e (RefChanRequest e) m
, Response e (RefChanRequest e) m
, HasDeferred e (RefChanRequest e) m
, IsPeerAddr e m
, Pretty (Peer e)
, Sessions e (KnownPeer e) m
, Sessions e (RefChanHeadBlock e) m
, HasStorage m
, Signatures s
, IsRefPubKey s
, Pretty (AsBase58 (PubKey 'Sign s))
-- , Serialise (Signature s)
, ForRefChans e
, s ~ Encryption e
)
=> Bool
-> RefChanAdapter e m
-> RefChanRequest e
-> m ()
refChanRequestProto self adapter msg = do
peer <- thatPeer proto
auth' <- find (KnownPeerKey peer) id
sto <- getStorage
void $ runMaybeT do
guard (self || isJust auth')
auth <- MaybeT $ pure auth'
guard =<< lift (refChanSubscribed adapter (getRefChanId @e msg))
case msg of
RefChanRequest chan -> do
rv <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan)
lift $ response @e (RefChanResponse @e chan (HashRef rv))
RefChanResponse chan val -> do
hd <- MaybeT $ getActualRefChanHead @e (RefChanHeadKey @s chan)
let ppk = view peerSignKey auth
guard $ ppk `HashMap.member` view refChanHeadPeers hd
debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val
where
proto = Proxy @(RefChanRequest e)
getActualRefChanHead :: forall e s m . ( MonadIO m getActualRefChanHead :: forall e s m . ( MonadIO m
, Sessions e (RefChanHeadBlock e) m , Sessions e (RefChanHeadBlock e) m

View File

@ -22,6 +22,7 @@ import Data.Maybe
pRefChan :: Parser (IO ()) pRefChan :: Parser (IO ())
pRefChan = hsubparser ( command "head" (info pRefChanHead (progDesc "head commands" )) pRefChan = hsubparser ( command "head" (info pRefChanHead (progDesc "head commands" ))
<> command "propose" (info pRefChanPropose (progDesc "post propose transaction")) <> command "propose" (info pRefChanPropose (progDesc "post propose transaction"))
<> command "fetch" (info pRefChanFetch (progDesc "fetch and sync refchan value"))
<> command "get" (info pRefChanGet (progDesc "get refchan value")) <> command "get" (info pRefChanGet (progDesc "get refchan value"))
) )
@ -127,5 +128,12 @@ pRefChanGet = do
puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key"
runRpcCommand opts (REFCHANGET puk) runRpcCommand opts (REFCHANGET puk)
pRefChanFetch :: Parser (IO ())
pRefChanFetch = do
opts <- pRpcCommon
sref <- strArgument (metavar "REFCHAH-REF")
pure do
puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key"
runRpcCommand opts (REFCHANFETCH puk)

View File

@ -893,6 +893,7 @@ runPeer opts = U.handle (\e -> myException e
, makeResponse (peerMetaProto (mkPeerMeta conf)) , makeResponse (peerMetaProto (mkPeerMeta conf))
, makeResponse (refChanHeadProto False refChanAdapter) , makeResponse (refChanHeadProto False refChanAdapter)
, makeResponse (refChanUpdateProto False pc refChanAdapter) , makeResponse (refChanUpdateProto False pc refChanAdapter)
, makeResponse (refChanRequestProto False refChanAdapter)
] ]
void $ liftIO $ waitAnyCancel workers void $ liftIO $ waitAnyCancel workers
@ -1034,6 +1035,11 @@ runPeer opts = U.handle (\e -> myException e
trace $ "refChanGetAction ANSWER IS" <+> pretty h trace $ "refChanGetAction ANSWER IS" <+> pretty h
request who (RPCRefChanGetAnsw @e h) request who (RPCRefChanGetAnsw @e h)
let refChanFetchAction puk = do
trace $ "refChanFetchAction" <+> pretty (AsBase58 puk)
void $ liftIO $ async $ withPeerM penv $ do
gossip (RefChanRequest @e puk)
let arpc = RpcAdapter pokeAction let arpc = RpcAdapter pokeAction
dieAction dieAction
dontHandle dontHandle
@ -1054,7 +1060,7 @@ runPeer opts = U.handle (\e -> myException e
dontHandle dontHandle
refChanHeadFetchAction refChanHeadFetchAction
dontHandle -- rpcOnRefChanFetch refChanFetchAction
refChanGetAction refChanGetAction
dontHandle -- rpcOnRefChanGetAnsw dontHandle -- rpcOnRefChanGetAnsw