This commit is contained in:
Sergey Ivanov 2023-03-10 08:50:23 +04:00
parent afa4bb8247
commit 9bded3d3af
5 changed files with 52 additions and 44 deletions

View File

@ -95,8 +95,8 @@ instance HasProtocol UDP (PeerExchange UDP) where
decode = either (const Nothing) Just . deserialiseOrFail decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise encode = serialise
instance HasProtocol UDP (LRef UDP) where instance HasProtocol UDP (LRefProto UDP) where
type instance ProtocolId (LRef UDP) = 7 type instance ProtocolId (LRefProto UDP) = 7
type instance Encoded UDP = ByteString type instance Encoded UDP = ByteString
decode = either (const Nothing) Just . deserialiseOrFail decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise encode = serialise
@ -122,7 +122,7 @@ instance Expires (SessionKey UDP (PeerHandshake UDP)) where
instance Expires (EventKey UDP (PeerAnnounce UDP)) where instance Expires (EventKey UDP (PeerAnnounce UDP)) where
expiresIn _ = Nothing expiresIn _ = Nothing
instance Expires (EventKey UDP (LRef UDP)) where instance Expires (EventKey UDP (LRefProto UDP)) where
expiresIn _ = Nothing expiresIn _ = Nothing

View File

@ -20,42 +20,41 @@ import Lens.Micro.Platform
import Type.Reflection (someTypeRep) import Type.Reflection (someTypeRep)
newtype AnnLRefNonce = AnnLRefNonce Word64 data LRefProto e
deriving newtype (Num,Enum,Real,Integral)
deriving stock (Ord,Eq,Generic,Show)
instance Serialise AnnLRefNonce
data LRef e
= AnnLRef (Hash HbSync) (Signed SignaturePresent (MutableRef e 'LinearRef)) = AnnLRef (Hash HbSync) (Signed SignaturePresent (MutableRef e 'LinearRef))
| LRefGetVal (Hash HbSync)
deriving stock (Generic) deriving stock (Generic)
instance Serialise (Signature e) => Serialise (LRef e) instance Serialise (Signature e) => Serialise (LRefProto e)
data LRefI e m = data LRefI e m =
LRefI LRefI
{ getBlockI :: GetBlockI HbSync m { getBlockI :: GetBlockI HbSync m
, tryUpdateLinearRefI :: TryUpdateLinearRefI e HbSync m , tryUpdateLinearRefI :: TryUpdateLinearRefI e HbSync m
, getLRefValI :: GetLRefValI e HbSync m
, announceLRefValI :: AnnounceLRefValI e HbSync m
} }
type GetBlockI h m = Hash h -> m (Maybe ByteString) type GetBlockI h m = Hash h -> m (Maybe ByteString)
type TryUpdateLinearRefI e h m = Hash h -> Signed SignatureVerified (MutableRef e 'LinearRef) -> m Bool type TryUpdateLinearRefI e h m = Hash h -> Signed SignatureVerified (MutableRef e 'LinearRef) -> m Bool
type GetLRefValI e h m = Hash h -> m (Maybe (Signed SignaturePresent (MutableRef e 'LinearRef)))
type AnnounceLRefValI e h m = Hash h -> m ()
refLinearProto :: forall e m . refLinearProto :: forall e m .
( MonadIO m ( MonadIO m
, Response e (LRef e) m , Response e (LRefProto e) m
, HasCredentials e m , HasCredentials e m
, Serialise (PubKey 'Sign e) , Serialise (PubKey 'Sign e)
, Signatures e , Signatures e
) )
=> LRefI e m => LRefI e m
-> LRef e -> LRefProto e
-> m () -> m ()
refLinearProto LRefI{..} = \case refLinearProto LRefI{..} = \case
-- Анонс ссылки (уведомление о новом состоянии без запроса)
AnnLRef h (lref@LinearMutableRefSigned{}) -> do AnnLRef h (lref@LinearMutableRefSigned{}) -> do
creds <- getCredentials @e creds <- getCredentials @e
@ -65,22 +64,8 @@ refLinearProto LRefI{..} = \case
lift $ forM_ (verifyLinearMutableRefSigned (refOwner g) lref) \vlref -> do lift $ forM_ (verifyLinearMutableRefSigned (refOwner g) lref) \vlref -> do
r <- tryUpdateLinearRefI h vlref r <- tryUpdateLinearRefI h vlref
when r do when r (announceLRefValI h)
-- FIXME: В случае успеха разослать анонс на другие ноды
pure ()
-- data instance EventKey e (LRef e) = LRefGetVal h -> void $ runMaybeT do
-- AnnLRefInfoKey slref <- MaybeT (getLRefValI h)
-- deriving stock (Typeable, Eq,Generic) lift $ response (AnnLRef @e h slref)
-- data instance Event e (LRef e) =
-- AnnLRefEvent (Peer e) (AnnLRefInfo e) PeerNonce
-- deriving stock (Typeable)
-- instance Typeable (AnnLRefInfo e) => Hashable (EventKey e (LRef e)) where
-- hashWithSalt salt _ = hashWithSalt salt (someTypeRep p)
-- where
-- p = Proxy @(AnnLRefInfo e)
-- instance EventType ( Event e ( LRef e) ) where
-- isPersistent = True

View File

@ -361,7 +361,6 @@ forKnownPeers m = do
pd' <- find (KnownPeerKey p) id pd' <- find (KnownPeerKey p) id
maybe1 pd' (pure ()) (m p) maybe1 pd' (pure ()) (m p)
-- FIXME: implement mkLRefAdapter
mkLRefAdapter :: forall e st block m . mkLRefAdapter :: forall e st block m .
( m ~ PeerM e IO ( m ~ PeerM e IO
, Signatures e , Signatures e
@ -372,11 +371,24 @@ mkLRefAdapter :: forall e st block m .
=> m (LRefI e (CredentialsM e (ResponseM e m))) => m (LRefI e (CredentialsM e (ResponseM e m)))
mkLRefAdapter = do mkLRefAdapter = do
st <- getStorage st <- getStorage
pure $
LRefI let
{ getBlockI = liftIO . getBlock st
, tryUpdateLinearRefI = \h lvref -> liftIO $ tryUpdateLinearRef (st) h lvref getBlockI = liftIO . getBlock st
}
tryUpdateLinearRefI h = liftIO . tryUpdateLinearRef st h
getLRefValI h = (liftIO . runMaybeT) do
refvalraw <- MaybeT $ (readLinkRaw st h) `orLogError` "error reading ref val"
MaybeT $ pure ((either (const Nothing) Just
. deserialiseOrFail @(Signed SignaturePresent (MutableRef e 'LinearRef))) refvalraw)
`orLogError` "can not parse channel ref"
announceLRefValI h = do
-- FIXME: implement announceLRefValI
pure ()
pure LRefI {..}
runPeer :: forall e . e ~ UDP => PeerOpts -> IO () runPeer :: forall e . e ~ UDP => PeerOpts -> IO ()
runPeer opts = Exception.handle myException $ do runPeer opts = Exception.handle myException $ do
@ -484,8 +496,6 @@ runPeer opts = Exception.handle myException $ do
runPeerM penv $ do runPeerM penv $ do
adapter <- mkAdapter adapter <- mkAdapter
lrefAdapter <- mkLRefAdapter lrefAdapter <- mkLRefAdapter
-- lrefAdapter :: LRefI UDP (CredentialsM UDP (ResponseM UDP (PeerM UDP IO)))
-- <- undefined :: (PeerM UDP IO) (LRefI UDP (CredentialsM UDP (ResponseM UDP (PeerM UDP IO))))
env <- ask env <- ask
pnonce <- peerNonce @e pnonce <- peerNonce @e
@ -632,7 +642,7 @@ runPeer opts = Exception.handle myException $ do
. deserialiseOrFail @(Signed SignaturePresent (MutableRef e 'LinearRef))) refvalraw) . deserialiseOrFail @(Signed SignaturePresent (MutableRef e 'LinearRef))) refvalraw)
`orLogError` "can not parse channel ref" `orLogError` "can not parse channel ref"
let annlref :: LRef UDP let annlref :: LRefProto UDP
annlref = AnnLRef @e h slref annlref = AnnLRef @e h slref
lift do lift do
@ -708,6 +718,9 @@ runPeer opts = Exception.handle myException $ do
let annAction h = do let annAction h = do
liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h) liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h)
let annLRefAction h = do
liftIO $ atomically $ writeTQueue rpcQ (ANNLREF h)
let pingAction pa = do let pingAction pa = do
that <- thatPeer (Proxy @(RPC e)) that <- thatPeer (Proxy @(RPC e))
liftIO $ atomically $ writeTQueue rpcQ (PING pa (Just that)) liftIO $ atomically $ writeTQueue rpcQ (PING pa (Just that))
@ -747,6 +760,7 @@ runPeer opts = Exception.handle myException $ do
let arpc = RpcAdapter pokeAction let arpc = RpcAdapter pokeAction
dontHandle dontHandle
annAction annAction
annLRefAction
pingAction pingAction
dontHandle dontHandle
fetchAction fetchAction
@ -833,6 +847,8 @@ withRPC o cmd = do
case cmd of case cmd of
RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCAnnLRef{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCPing{} -> do RPCPing{} -> do
@ -865,9 +881,11 @@ withRPC o cmd = do
void $ waitAnyCatchCancel [mrpc, prpc] void $ waitAnyCatchCancel [mrpc, prpc]
where where
adapter q pq = RpcAdapter dontHandle adapter q pq = RpcAdapter
dontHandle
(liftIO . atomically . writeTQueue pq) (liftIO . atomically . writeTQueue pq)
(const $ liftIO exitSuccess) (const $ liftIO exitSuccess)
(const $ liftIO exitSuccess)
(const $ notice "ping?") (const $ notice "ping?")
(liftIO . atomically . writeTQueue q) (liftIO . atomically . writeTQueue q)
dontHandle dontHandle
@ -883,6 +901,7 @@ runRpcCommand opt = \case
POKE -> withRPC opt RPCPoke POKE -> withRPC opt RPCPoke
PING s _ -> withRPC opt (RPCPing s) PING s _ -> withRPC opt (RPCPing s)
ANNOUNCE h -> withRPC opt (RPCAnnounce h) ANNOUNCE h -> withRPC opt (RPCAnnounce h)
ANNLREF h -> withRPC opt (RPCAnnLRef h)
FETCH h -> withRPC opt (RPCFetch h) FETCH h -> withRPC opt (RPCFetch h)
PEERS -> withRPC opt RPCPeers PEERS -> withRPC opt RPCPeers
SETLOG s -> withRPC opt (RPCLogLevel s) SETLOG s -> withRPC opt (RPCLogLevel s)

View File

@ -28,6 +28,7 @@ data RPC e =
| RPCPong (PeerAddr e) | RPCPong (PeerAddr e)
| RPCPokeAnswer (PubKey 'Sign e) | RPCPokeAnswer (PubKey 'Sign e)
| RPCAnnounce (Hash HbSync) | RPCAnnounce (Hash HbSync)
| RPCAnnLRef (Hash HbSync)
| RPCFetch (Hash HbSync) | RPCFetch (Hash HbSync)
| RPCPeers | RPCPeers
| RPCPeersAnswer (PeerAddr e) (PubKey 'Sign e) | RPCPeersAnswer (PeerAddr e) (PubKey 'Sign e)
@ -57,6 +58,7 @@ data RpcAdapter e m =
{ rpcOnPoke :: RPC e -> m () { rpcOnPoke :: RPC e -> m ()
, rpcOnPokeAnswer :: PubKey 'Sign e -> m () , rpcOnPokeAnswer :: PubKey 'Sign e -> m ()
, rpcOnAnnounce :: Hash HbSync -> m () , rpcOnAnnounce :: Hash HbSync -> m ()
, rpcOnAnnLRef :: Hash HbSync -> m ()
, rpcOnPing :: PeerAddr e -> m () , rpcOnPing :: PeerAddr e -> m ()
, rpcOnPong :: PeerAddr e -> m () , rpcOnPong :: PeerAddr e -> m ()
, rpcOnFetch :: Hash HbSync -> m () , rpcOnFetch :: Hash HbSync -> m ()
@ -106,6 +108,7 @@ rpcHandler adapter = \case
p@RPCPoke{} -> rpcOnPoke adapter p p@RPCPoke{} -> rpcOnPoke adapter p
(RPCPokeAnswer k) -> rpcOnPokeAnswer adapter k (RPCPokeAnswer k) -> rpcOnPokeAnswer adapter k
(RPCAnnounce h) -> rpcOnAnnounce adapter h (RPCAnnounce h) -> rpcOnAnnounce adapter h
(RPCAnnLRef h) -> rpcOnAnnLRef adapter h
(RPCPing pa) -> rpcOnPing adapter pa (RPCPing pa) -> rpcOnPing adapter pa
(RPCPong pa) -> rpcOnPong adapter pa (RPCPong pa) -> rpcOnPong adapter pa
(RPCFetch h) -> rpcOnFetch adapter h (RPCFetch h) -> rpcOnFetch adapter h

View File

@ -93,6 +93,7 @@ common shared-properties
, MultiParamTypeClasses , MultiParamTypeClasses
, OverloadedStrings , OverloadedStrings
, QuasiQuotes , QuasiQuotes
, RecordWildCards
, ScopedTypeVariables , ScopedTypeVariables
, StandaloneDeriving , StandaloneDeriving
, TupleSections , TupleSections