Implemented broadcastLRef

This commit is contained in:
Sergey Ivanov 2023-03-13 20:57:04 +04:00
parent 9bded3d3af
commit 8617b91d42
3 changed files with 57 additions and 55 deletions

View File

@ -837,3 +837,17 @@ FIXME: Обработка ошибок в асинхронном приложе
## 2023-03-13
* [x] broadcastLRefI
FIXME: RPC, cli для линейных ссылок
* [ ] lref-new через rpc пира new
* [ ] lref-get через rpc пира get
* [ ] lref-update через rpc пира update
* [ ] lref-list через rpc пира list
* [ ] cli на запрос значений ссылки у всех нод
* [ ] cli на запрос всех ссылок ноды

View File

@ -32,7 +32,7 @@ data LRefI e m =
{ getBlockI :: GetBlockI HbSync m
, tryUpdateLinearRefI :: TryUpdateLinearRefI e HbSync m
, getLRefValI :: GetLRefValI e HbSync m
, announceLRefValI :: AnnounceLRefValI e HbSync m
, broadcastLRefI :: BroadcastLRefI e HbSync m
}
type GetBlockI h m = Hash h -> m (Maybe ByteString)
@ -41,7 +41,7 @@ type TryUpdateLinearRefI e h m = Hash h -> Signed SignatureVerified (MutableRef
type GetLRefValI e h m = Hash h -> m (Maybe (Signed SignaturePresent (MutableRef e 'LinearRef)))
type AnnounceLRefValI e h m = Hash h -> m ()
type BroadcastLRefI e h m = LRefProto e -> m ()
refLinearProto :: forall e m .
( MonadIO m
@ -64,7 +64,9 @@ refLinearProto LRefI{..} = \case
lift $ forM_ (verifyLinearMutableRefSigned (refOwner g) lref) \vlref -> do
r <- tryUpdateLinearRefI h vlref
when r (announceLRefValI h)
when r $ void $ runMaybeT do
slref <- MaybeT (getLRefValI h)
lift $ broadcastLRefI (AnnLRef @e h slref)
LRefGetVal h -> void $ runMaybeT do
slref <- MaybeT (getLRefValI h)

View File

@ -361,35 +361,6 @@ forKnownPeers m = do
pd' <- find (KnownPeerKey p) id
maybe1 pd' (pure ()) (m p)
mkLRefAdapter :: forall e st block m .
( m ~ PeerM e IO
, Signatures e
, Serialise (Signature e)
, Serialise (PubKey 'Sign e)
, Eq (PubKey 'Sign e)
)
=> m (LRefI e (CredentialsM e (ResponseM e m)))
mkLRefAdapter = do
st <- getStorage
let
getBlockI = liftIO . getBlock st
tryUpdateLinearRefI h = liftIO . tryUpdateLinearRef st h
getLRefValI h = (liftIO . runMaybeT) do
refvalraw <- MaybeT $ (readLinkRaw st h) `orLogError` "error reading ref val"
MaybeT $ pure ((either (const Nothing) Just
. deserialiseOrFail @(Signed SignaturePresent (MutableRef e 'LinearRef))) refvalraw)
`orLogError` "can not parse channel ref"
announceLRefValI h = do
-- FIXME: implement announceLRefValI
pure ()
pure LRefI {..}
runPeer :: forall e . e ~ UDP => PeerOpts -> IO ()
runPeer opts = Exception.handle myException $ do
@ -491,11 +462,43 @@ runPeer opts = Exception.handle myException $ do
penv <- newPeerEnv (AnyStorage s) (Fabriq mess) (getOwnPeer mess)
let
broadcastMsgAction msg = runPeerM penv do
env <- ask
broadcastMsgAction' env msg
broadcastMsgAction' :: (MonadIO m) => PeerEnv e -> LRefProto e -> PeerM e m ()
broadcastMsgAction' env msg = do
debug "send multicast msg"
request localMulticast msg
withPeerM env do
forKnownPeers $ \p _ -> do
debug $ "send single-cast msg" <+> pretty p
request @e p msg
let
getLRefValAction :: (MonadIO m)
=> AnyStorage -> Hash HbSync -> m (Maybe (Signed 'SignaturePresent (MutableRef e 'LinearRef)))
getLRefValAction st h = runMaybeT do
refvalraw <- MaybeT $ (liftIO $ readLinkRaw st h) `orLogError` "error reading ref val"
MaybeT $ pure ((either (const Nothing) Just
. deserialiseOrFail @(Signed SignaturePresent (MutableRef e 'LinearRef))) refvalraw)
`orLogError` "can not parse channel ref"
loop <- async do
runPeerM penv $ do
adapter <- mkAdapter
lrefAdapter <- mkLRefAdapter
lrefAdapter <- do
st <- getStorage
let
getBlockI = liftIO . getBlock st
tryUpdateLinearRefI h = liftIO . tryUpdateLinearRef st h
broadcastLRefI = broadcastMsgAction
getLRefValI = getLRefValAction st
pure LRefI {..}
env <- ask
pnonce <- peerNonce @e
@ -518,6 +521,8 @@ runPeer opts = Exception.handle myException $ do
subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent p d) -> do
-- subscribe @e LRefUpdateFromNodeKey $ \(KnownPeerEvent p d) -> do
let thatNonce = view peerOwnNonce d
banned <- peerBanned p d
@ -631,29 +636,10 @@ runPeer opts = Exception.handle myException $ do
ANNLREF h -> do
debug $ "got annlref rpc" <+> pretty h
sto <- getStorage
st <- getStorage
void $ runMaybeT do
refvalraw <- MaybeT $ (liftIO $ readLinkRaw sto h)
`orLogError` "error reading ref val"
slref@(LinearMutableRefSigned _ ref) <- MaybeT $
pure ((either (const Nothing) Just
. deserialiseOrFail @(Signed SignaturePresent (MutableRef e 'LinearRef))) refvalraw)
`orLogError` "can not parse channel ref"
let annlref :: LRefProto UDP
annlref = AnnLRef @e h slref
lift do
debug "send multicast annlref"
request localMulticast annlref
withPeerM env do
forKnownPeers $ \p _ -> do
debug $ "send single-cast annlrefs" <+> pretty p
request @e p annlref
slref <- MaybeT $ getLRefValAction st h
lift $ broadcastMsgAction' env (AnnLRef @e h slref :: LRefProto UDP)
CHECK nonce pa h -> do
pip <- fromPeerAddr @e pa