mirror of https://github.com/voidlizard/hbs2
refchan refactoring
This commit is contained in:
parent
5820b808c5
commit
835a0322e0
|
@ -0,0 +1 @@
|
|||
#
|
|
@ -127,6 +127,10 @@ library
|
|||
, HBS2.Net.Proto.Sessions
|
||||
, HBS2.Net.Proto.RefLog
|
||||
, HBS2.Net.Proto.RefChan
|
||||
, HBS2.Net.Proto.RefChan.Types
|
||||
, HBS2.Net.Proto.RefChan.RefChanHead
|
||||
, HBS2.Net.Proto.RefChan.RefChanNotify
|
||||
, HBS2.Net.Proto.RefChan.RefChanUpdate
|
||||
, HBS2.Net.Proto.AnyRef
|
||||
, HBS2.Net.Proto.Types
|
||||
, HBS2.OrDie
|
||||
|
|
|
@ -452,8 +452,8 @@ runProto hh = do
|
|||
}) -> maybe (pure ()) (runResponseM pip . h) (decoder msg)
|
||||
|
||||
|
||||
instance (Monad m, HasProtocol e p) => HasThatPeer e p (ResponseM e m) where
|
||||
thatPeer _ = asks (view answTo)
|
||||
instance (Monad m, HasProtocol e p) => HasThatPeer p e (ResponseM e m) where
|
||||
thatPeer = asks (view answTo)
|
||||
|
||||
instance HasProtocol e p => HasDeferred p e (ResponseM e (PeerM e IO)) where
|
||||
deferred action = do
|
||||
|
@ -465,6 +465,7 @@ instance HasProtocol e p => HasDeferred p e (ResponseM e (PeerM e IO)) where
|
|||
|
||||
instance ( HasProtocol e p
|
||||
, MonadTrans (ResponseM e)
|
||||
, HasThatPeer p e (ResponseM e m)
|
||||
, HasStorage (PeerM e IO)
|
||||
, Pretty (Peer e)
|
||||
, PeerMessaging e
|
||||
|
@ -475,7 +476,7 @@ instance ( HasProtocol e p
|
|||
|
||||
response msg = do
|
||||
let proto = protoId @e @p (Proxy @p)
|
||||
who <- thatPeer (Proxy @p)
|
||||
who <- thatPeer @p
|
||||
self <- lift $ ownPeer @e
|
||||
fab <- lift $ getFabriq @e
|
||||
sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto (encode msg))
|
||||
|
|
|
@ -14,6 +14,7 @@ import Data.ByteString qualified as BS
|
|||
import Lens.Micro.Platform
|
||||
import UnliftIO
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.HashSet qualified as HashSet
|
||||
|
||||
|
||||
splitPattern :: FilePath -> (FilePath, FilePath)
|
||||
|
@ -64,22 +65,26 @@ findKeyRing fp kr = do
|
|||
|
||||
pure (catMaybes kf)
|
||||
|
||||
findKeyRingEntry :: forall s m . ( MonadUnliftIO m
|
||||
findKeyRingEntries :: forall s m . ( MonadUnliftIO m
|
||||
, SerialisedCredentials s
|
||||
, ForHBS2Basic s
|
||||
, Hashable (PubKey 'Encrypt s)
|
||||
-- , ForHBS2Basic s
|
||||
)
|
||||
=> [FilePattern]
|
||||
-> PubKey 'Encrypt s
|
||||
-> m (Maybe (KeyringEntry s))
|
||||
-> [PubKey 'Encrypt s]
|
||||
-> m [KeyringEntry s]
|
||||
|
||||
findKeyRingEntries fp pkl = do
|
||||
|
||||
let pks = HashSet.fromList pkl
|
||||
|
||||
findKeyRingEntry fp pk0 = do
|
||||
fs <- findFilesBy fp
|
||||
|
||||
w <- for fs $ \f -> runMaybeT do
|
||||
bs <- liftIO (try @_ @IOException (BS.readFile f))
|
||||
>>= toMPlus
|
||||
krf <- parseCredentials (AsCredFile bs) & toMPlus
|
||||
MaybeT $ pure $ headMay [ e | e@(KeyringEntry pk _ _) <- _peerKeyring krf, pk == pk0 ]
|
||||
MaybeT $ pure $ headMay [ e | e@(KeyringEntry pk _ _) <- _peerKeyring krf, pk `HashSet.member` pks ]
|
||||
|
||||
pure $ headMay (catMaybes w)
|
||||
pure $ catMaybes w
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ data KeyringEntry e =
|
|||
|
||||
pattern KeyringKeys :: forall {s} . PubKey 'Encrypt s -> PrivKey 'Encrypt s -> KeyringEntry s
|
||||
pattern KeyringKeys a b <- KeyringEntry a b _
|
||||
{-# COMPLETE KeyringKeys #-}
|
||||
|
||||
deriving stock instance (Eq (PubKey 'Encrypt e), Eq (PrivKey 'Encrypt e))
|
||||
=> Eq (KeyringEntry e)
|
||||
|
@ -139,8 +140,8 @@ delKeyPair (AsBase58 pks) cred = do
|
|||
pure $ cred & set peerKeyring rest
|
||||
|
||||
|
||||
parseCredentials :: forall s . ( ForHBS2Basic s
|
||||
, SerialisedCredentials s
|
||||
parseCredentials :: forall s . ( -- ForHBS2Basic s
|
||||
SerialisedCredentials s
|
||||
)
|
||||
=> AsCredFile ByteString -> Maybe (PeerCredentials s)
|
||||
parseCredentials (AsCredFile bs) = parseSerialisableFromBase58 bs
|
||||
|
|
|
@ -57,7 +57,7 @@ blockAnnounceProto :: forall e m . ( MonadIO m
|
|||
blockAnnounceProto =
|
||||
\case
|
||||
BlockAnnounce n info -> do
|
||||
that <- thatPeer (Proxy @(BlockAnnounce e))
|
||||
that <- thatPeer @(BlockAnnounce e)
|
||||
emit @e BlockAnnounceInfoKey (BlockAnnounceEvent that info n)
|
||||
|
||||
data instance EventKey e (BlockAnnounce e) =
|
||||
|
|
|
@ -90,7 +90,7 @@ blockChunksProto :: forall e m proto . ( MonadIO m
|
|||
|
||||
blockChunksProto adapter (BlockChunks c p) = do
|
||||
|
||||
peer <- thatPeer (Proxy @(BlockChunks e))
|
||||
peer <- thatPeer @proto
|
||||
auth <- find (KnownPeerKey peer) id <&> isJust
|
||||
|
||||
case p of
|
||||
|
@ -122,7 +122,7 @@ blockChunksProto adapter (BlockChunks c p) = do
|
|||
maybe (pure ()) (response_ . BlockChunk @e i) chunk
|
||||
|
||||
BlockChunk n bs | auth -> deferred @(BlockChunks e) do
|
||||
who <- thatPeer proto
|
||||
who <- thatPeer @proto
|
||||
h <- blkGetHash adapter (who, c)
|
||||
|
||||
maybe1 h (response_ (BlockLost @e)) $ \hh -> do
|
||||
|
@ -140,7 +140,6 @@ blockChunksProto adapter (BlockChunks c p) = do
|
|||
pure ()
|
||||
|
||||
where
|
||||
proto = Proxy @(BlockChunks e)
|
||||
response_ pt = response (BlockChunks c pt)
|
||||
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ blockSizeProto getBlockSize evHasBlock onNoBlock =
|
|||
\case
|
||||
GetBlockSize h -> do
|
||||
-- liftIO $ print "GetBlockSize"
|
||||
p <- thatPeer (Proxy @(BlockInfo e))
|
||||
p <- thatPeer @proto
|
||||
auth <- find (KnownPeerKey p) id <&> isJust
|
||||
when auth do
|
||||
deferred @proto $ do
|
||||
|
@ -48,12 +48,12 @@ blockSizeProto getBlockSize evHasBlock onNoBlock =
|
|||
response (NoBlock @e h)
|
||||
|
||||
NoBlock h -> do
|
||||
that <- thatPeer (Proxy @(BlockInfo e))
|
||||
that <- thatPeer @proto
|
||||
emit @e (BlockSizeEventKey h) (NoBlockEvent that)
|
||||
evHasBlock ( that, h, Nothing )
|
||||
|
||||
BlockSize h sz -> do
|
||||
that <- thatPeer (Proxy @(BlockInfo e))
|
||||
that <- thatPeer @proto
|
||||
emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz))
|
||||
evHasBlock ( that, h, Just sz )
|
||||
|
||||
|
|
|
@ -191,6 +191,7 @@ instance HasProtocol L4Proto (RefChanNotify L4Proto) where
|
|||
-- возьмем пока 10 секунд
|
||||
requestPeriodLim = NoLimit
|
||||
|
||||
|
||||
instance HasProtocol L4Proto (DialReq L4Proto) where
|
||||
type instance ProtocolId (DialReq L4Proto) = 96000
|
||||
type instance Encoded L4Proto = ByteString
|
||||
|
|
|
@ -84,7 +84,7 @@ dialReqProto :: forall e s m .
|
|||
-> DialReq e
|
||||
-> m ()
|
||||
dialReqProto adapter = unDialReq >>> \frames -> do
|
||||
peer <- thatPeer dialReqProtoProxy
|
||||
peer <- thatPeer @(DialReq e)
|
||||
|
||||
-- let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e))
|
||||
-- dialReqEnv = DialogRequestEnv
|
||||
|
@ -100,8 +100,6 @@ dialReqProto adapter = unDialReq >>> \frames -> do
|
|||
|
||||
liftIO $ (dialReqProtoAdapterDApp adapter) frames replyToPeerIO
|
||||
|
||||
where
|
||||
dialReqProtoProxy = Proxy @(DialReq e)
|
||||
|
||||
---
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ data EncryptionHandshakeAdapter e m s = EncryptionHandshakeAdapter
|
|||
}
|
||||
|
||||
|
||||
encryptionHandshakeProto :: forall e s m .
|
||||
encryptionHandshakeProto :: forall e s m proto .
|
||||
( MonadIO m
|
||||
, Response e (EncryptionHandshake e) m
|
||||
, Request e (EncryptionHandshake e) m
|
||||
|
@ -91,6 +91,7 @@ encryptionHandshakeProto :: forall e s m .
|
|||
, PubKey 'Encrypt s ~ Encrypt.PublicKey
|
||||
, Show (PubKey 'Sign s)
|
||||
, Show (Nonce ())
|
||||
, proto ~ EncryptionHandshake e
|
||||
)
|
||||
=> EncryptionHandshakeAdapter e m s
|
||||
-> EncryptionHandshake e
|
||||
|
@ -99,7 +100,7 @@ encryptionHandshakeProto :: forall e s m .
|
|||
encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case
|
||||
|
||||
ResetEncryptionKeys -> do
|
||||
peer <- thatPeer proto
|
||||
peer <- thatPeer @proto
|
||||
mpeerData <- find (KnownPeerKey peer) id
|
||||
-- TODO: check theirsign
|
||||
trace $ "ENCRYPTION EHSP ResetEncryptionKeys from" <+> viaShow (peer, mpeerData)
|
||||
|
@ -112,7 +113,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case
|
|||
sendBeginEncryptionExchange @e creds ourpubkey peer
|
||||
|
||||
BeginEncryptionExchange theirsign theirpubkey -> do
|
||||
peer <- thatPeer proto
|
||||
peer <- thatPeer @proto
|
||||
mpeerData <- find (KnownPeerKey peer) id
|
||||
-- TODO: check theirsign
|
||||
|
||||
|
@ -137,7 +138,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case
|
|||
encHandshake_considerPeerAsymmKey peer (Just theirpubkey)
|
||||
|
||||
AckEncryptionExchange theirsign theirpubkey -> do
|
||||
peer <- thatPeer proto
|
||||
peer <- thatPeer @proto
|
||||
mpeerData <- find (KnownPeerKey peer) id
|
||||
-- TODO: check theirsign
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ makeNotifyServer (NotifyEnv{..}) what = do
|
|||
|
||||
debug "SERVER: NotifyWant"
|
||||
|
||||
who <- thatPeer (Proxy @(NotifyProto ev e))
|
||||
who <- thatPeer @(NotifyProto ev e)
|
||||
|
||||
hndl <- startNotify @ev @src @m notifySource key $ \ha d -> do
|
||||
atomically $ writeTQueue notifyQ (ha, who, NotifyEvent key d)
|
||||
|
|
|
@ -91,16 +91,16 @@ newtype PeerHandshakeAdapter e m =
|
|||
}
|
||||
|
||||
|
||||
peerHandShakeProto :: forall e s m . ( MonadIO m
|
||||
, Response e (PeerHandshake e) m
|
||||
, Request e (PeerHandshake e) m
|
||||
, Sessions e (PeerHandshake e) m
|
||||
peerHandShakeProto :: forall e s m proto . ( MonadIO m
|
||||
, Response e proto m
|
||||
, Request e proto m
|
||||
, Sessions e proto m
|
||||
, Sessions e (KnownPeer e) m
|
||||
, HasNonces (PeerHandshake e) m
|
||||
, HasNonces proto m
|
||||
, HasPeerNonce e m
|
||||
, Nonce (PeerHandshake e) ~ PingNonce
|
||||
, Nonce proto ~ PingNonce
|
||||
, Pretty (Peer e)
|
||||
, EventEmitter e (PeerHandshake e) m
|
||||
, EventEmitter e proto m
|
||||
, EventEmitter e (ConcretePeer e) m
|
||||
, HasCredentials s m
|
||||
, Asymm s
|
||||
|
@ -108,6 +108,7 @@ peerHandShakeProto :: forall e s m . ( MonadIO m
|
|||
, Serialise (PubKey 'Encrypt (Encryption e))
|
||||
, s ~ Encryption e
|
||||
, e ~ L4Proto
|
||||
, proto ~ PeerHandshake e
|
||||
)
|
||||
=> PeerHandshakeAdapter e m
|
||||
-> PeerEnv e
|
||||
|
@ -117,7 +118,7 @@ peerHandShakeProto :: forall e s m . ( MonadIO m
|
|||
peerHandShakeProto adapter penv =
|
||||
\case
|
||||
PeerPing nonce -> do
|
||||
pip <- thatPeer proto
|
||||
pip <- thatPeer @proto
|
||||
-- взять свои ключи
|
||||
creds <- getCredentials @s
|
||||
|
||||
|
@ -137,7 +138,7 @@ peerHandShakeProto adapter penv =
|
|||
sendPing pip
|
||||
|
||||
PeerPong nonce0 sign d -> do
|
||||
pip <- thatPeer proto
|
||||
pip <- thatPeer @proto
|
||||
|
||||
se' <- find @e (PeerHandshakeKey (nonce0,pip)) id
|
||||
|
||||
|
@ -163,9 +164,6 @@ peerHandShakeProto adapter penv =
|
|||
emit AnyKnownPeerEventKey (KnownPeerEvent pip d)
|
||||
emit (ConcretePeerKey pip) (ConcretePeerData pip d)
|
||||
|
||||
where
|
||||
proto = Proxy @(PeerHandshake e)
|
||||
|
||||
data ConcretePeer e = ConcretePeer
|
||||
|
||||
newtype instance EventKey e (ConcretePeer e) =
|
||||
|
|
|
@ -40,7 +40,7 @@ peerAnnounceProto :: forall e m . ( MonadIO m
|
|||
peerAnnounceProto =
|
||||
\case
|
||||
PeerAnnounce nonce -> do
|
||||
who <- thatPeer (Proxy @(PeerAnnounce e))
|
||||
who <- thatPeer @(PeerAnnounce e)
|
||||
emit @e PeerAnnounceEventKey (PeerAnnounceEvent who nonce)
|
||||
|
||||
|
||||
|
|
|
@ -79,12 +79,11 @@ peerExchangeProto msg = do
|
|||
PeerExchangePeers2 nonce pips -> peerExchangePeers2 nonce pips
|
||||
|
||||
where
|
||||
proto = Proxy @(PeerExchange e)
|
||||
|
||||
fromPEXAddr1 = fromPeerAddr . L4Address UDP
|
||||
|
||||
peerExchangePeers1 nonce pips = do
|
||||
pip <- thatPeer proto
|
||||
pip <- thatPeer @proto
|
||||
|
||||
ok <- find (PeerExchangeKey @e nonce) id <&> isJust
|
||||
|
||||
|
@ -95,7 +94,7 @@ peerExchangeProto msg = do
|
|||
emit @e PeerExchangePeersKey (PeerExchangePeersData sa)
|
||||
|
||||
peerExchangePeers2 nonce pips = do
|
||||
pip <- thatPeer proto
|
||||
pip <- thatPeer @proto
|
||||
|
||||
ok <- find (PeerExchangeKey @e nonce) id <&> isJust
|
||||
|
||||
|
@ -106,7 +105,7 @@ peerExchangeProto msg = do
|
|||
emit @e PeerExchangePeersKey (PeerExchangePeersData sa)
|
||||
|
||||
peerExchangeGet pex n = deferred @proto do
|
||||
that <- thatPeer proto
|
||||
that <- thatPeer @proto
|
||||
|
||||
debug $ "PeerExchangeGet" <+> "from" <+> pretty that
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ peerMetaProto :: forall e m proto . ( MonadIO m
|
|||
peerMetaProto peerMeta =
|
||||
\case
|
||||
GetPeerMeta -> do
|
||||
p <- thatPeer (Proxy @(PeerMetaProto e))
|
||||
p <- thatPeer @proto
|
||||
auth <- find (KnownPeerKey p) id <&> isJust
|
||||
when auth do
|
||||
debug $ "PEER META: ANSWERING" <+> pretty p <+> viaShow peerMeta
|
||||
|
@ -50,7 +50,7 @@ peerMetaProto peerMeta =
|
|||
response (ThePeerMeta @e peerMeta)
|
||||
|
||||
ThePeerMeta meta -> do
|
||||
that <- thatPeer (Proxy @(PeerMetaProto e))
|
||||
that <- thatPeer @proto
|
||||
debug $ "GOT PEER META FROM" <+> pretty that <+> viaShow meta
|
||||
emit @e (PeerMetaEventKey that) (PeerMetaEvent meta)
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,77 @@
|
|||
{-# Language UndecidableInstances #-}
|
||||
{-# Language AllowAmbiguousTypes #-}
|
||||
module HBS2.Net.Proto.RefChan.RefChanHead where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Net.Proto
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Base58
|
||||
import HBS2.Net.Proto.Peer
|
||||
import HBS2.Net.Proto.BlockAnnounce
|
||||
import HBS2.Net.Proto.Sessions
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Storage
|
||||
|
||||
import HBS2.Net.Proto.RefChan.Types
|
||||
|
||||
import HBS2.System.Logger.Simple
|
||||
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.Maybe
|
||||
|
||||
|
||||
refChanHeadProto :: forall e s m proto . ( MonadIO m
|
||||
, Request e proto m
|
||||
, Response e proto m
|
||||
, Request e (BlockAnnounce e) m
|
||||
, HasPeerNonce e m
|
||||
, HasDeferred proto e m
|
||||
, IsPeerAddr e m
|
||||
, Pretty (Peer e)
|
||||
, Sessions e (KnownPeer e) m
|
||||
, HasStorage m
|
||||
, Signatures s
|
||||
, IsRefPubKey s
|
||||
, s ~ Encryption e
|
||||
, proto ~ RefChanHead e
|
||||
)
|
||||
=> Bool
|
||||
-> RefChanAdapter e m
|
||||
-> RefChanHead e
|
||||
-> m ()
|
||||
|
||||
refChanHeadProto self adapter msg = do
|
||||
-- авторизовать пира
|
||||
peer <- thatPeer @proto
|
||||
|
||||
auth <- find (KnownPeerKey peer) id <&> isJust
|
||||
|
||||
no <- peerNonce @e
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
guard (auth || self)
|
||||
|
||||
case msg of
|
||||
RefChanHead chan pkt -> do
|
||||
guard =<< lift (refChanSubscribed adapter chan)
|
||||
trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan)
|
||||
-- TODO: notify-others-for-new-head
|
||||
-- нужно ли уведомить остальных, что голова поменялась?
|
||||
-- всех, от кого мы еще не получали данное сообщение
|
||||
-- откуда мы знаем, от кого мы получали данное сообщение?
|
||||
lift $ refChanOnHead adapter chan pkt
|
||||
|
||||
RefChanGetHead chan -> deferred @proto do
|
||||
trace $ "RefChanGetHead" <+> pretty self <+> pretty (AsBase58 chan)
|
||||
|
||||
sto <- getStorage
|
||||
ref <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan)
|
||||
sz <- MaybeT $ liftIO $ hasBlock sto ref
|
||||
|
||||
let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta sz ref
|
||||
let announce = BlockAnnounce @e no annInfo
|
||||
lift $ request peer announce
|
||||
lift $ request peer (RefChanHead @e chan (RefChanHeadBlockTran (HashRef ref)))
|
||||
|
||||
|
|
@ -0,0 +1,102 @@
|
|||
{-# Language UndecidableInstances #-}
|
||||
{-# Language AllowAmbiguousTypes #-}
|
||||
module HBS2.Net.Proto.RefChan.RefChanNotify where
|
||||
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Hash
|
||||
import HBS2.Net.Proto
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Events
|
||||
import HBS2.Net.Proto.Peer
|
||||
import HBS2.Net.Proto.Sessions
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Data.Types.SignedBox
|
||||
import HBS2.Actors.Peer.Types
|
||||
import HBS2.Storage
|
||||
|
||||
import HBS2.Net.Proto.RefChan.Types
|
||||
|
||||
import HBS2.System.Logger.Simple
|
||||
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.Maybe
|
||||
|
||||
refChanNotifyProto :: forall e s m proto . ( MonadIO m
|
||||
, Request e proto m
|
||||
, Response e proto m
|
||||
, HasRefChanId e proto
|
||||
, HasDeferred proto e m
|
||||
, HasGossip e proto m
|
||||
, IsPeerAddr e m
|
||||
, Pretty (Peer e)
|
||||
, Sessions e (RefChanHeadBlock e) m
|
||||
, Sessions e (KnownPeer e) m
|
||||
, EventEmitter e proto m
|
||||
, HasStorage m
|
||||
, Signatures s
|
||||
, IsRefPubKey s
|
||||
, ForRefChans e
|
||||
, proto ~ RefChanNotify e
|
||||
, s ~ Encryption e
|
||||
)
|
||||
=> Bool
|
||||
-> RefChanAdapter e m
|
||||
-> RefChanNotify e
|
||||
-> m ()
|
||||
|
||||
refChanNotifyProto self adapter msg@(ActionRequest rchan a) = do
|
||||
debug $ "RefChanNotify ACTION REQUEST"
|
||||
pure ()
|
||||
|
||||
refChanNotifyProto self adapter msg@(Notify rchan box) = do
|
||||
-- аутентифицируем
|
||||
-- проверяем ACL
|
||||
-- пересылаем всем
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
peer <- thatPeer @proto
|
||||
|
||||
let h0 = hashObject @HbSync (serialise msg)
|
||||
|
||||
auth <- find (KnownPeerKey peer) id <&> isJust
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
guard =<< lift (refChanSubscribed adapter rchan)
|
||||
|
||||
guard (self || auth)
|
||||
|
||||
debug $ "&&& refChanNotifyProto" <+> pretty self
|
||||
|
||||
deferred @proto do
|
||||
|
||||
guard =<< liftIO (hasBlock sto h0 <&> isNothing)
|
||||
|
||||
(authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||
|
||||
let refchanKey = RefChanHeadKey @s rchan
|
||||
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
|
||||
|
||||
guard $ checkACL headBlock Nothing authorKey
|
||||
|
||||
-- FIXME: garbage-collection-required
|
||||
liftIO $ putBlock sto (serialise msg)
|
||||
|
||||
-- теперь пересылаем по госсипу
|
||||
lift $ gossip msg
|
||||
|
||||
-- FIXME: remove-debug
|
||||
let h1 = hashObject @HbSync (serialise box)
|
||||
debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 <+> pretty h1
|
||||
|
||||
-- тут надо заслать во внешнее приложение,
|
||||
-- равно как и в остальных refchan-протоколах
|
||||
|
||||
unless self do
|
||||
debug $ "^^^ CALL refChanNotifyRely" <+> pretty h0
|
||||
lift $ refChanNotifyRely adapter rchan msg
|
||||
|
||||
lift $ emit @e (RefChanNotifyEventKey rchan) (RefChanNotifyEvent (HashRef h0) msg)
|
||||
|
|
@ -0,0 +1,591 @@
|
|||
{-# Language UndecidableInstances #-}
|
||||
{-# Language AllowAmbiguousTypes #-}
|
||||
{-# Language TemplateHaskell #-}
|
||||
{-# Language FunctionalDependencies #-}
|
||||
{-# LANGUAGE ImplicitParams #-}
|
||||
{-# LANGUAGE PatternSynonyms, ViewPatterns #-}
|
||||
module HBS2.Net.Proto.RefChan.RefChanUpdate where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Hash
|
||||
import HBS2.Clock
|
||||
import HBS2.Net.Proto
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Base58
|
||||
import HBS2.Events
|
||||
import HBS2.Net.Proto.Peer
|
||||
import HBS2.Net.Proto.Sessions
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Data.Types.SignedBox
|
||||
import HBS2.Actors.Peer.Types
|
||||
import HBS2.Data.Types.Peer
|
||||
import HBS2.Storage
|
||||
|
||||
import HBS2.Net.Proto.RefChan.Types
|
||||
|
||||
import HBS2.System.Logger.Simple
|
||||
|
||||
import Codec.Serialise
|
||||
import Control.Monad.Identity
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.HashSet qualified as HashSet
|
||||
import Data.Maybe
|
||||
import Data.Word
|
||||
import Lens.Micro.Platform
|
||||
import Data.Hashable hiding (Hashed)
|
||||
import Type.Reflection (someTypeRep)
|
||||
import Data.Time.Clock.POSIX (getPOSIXTime)
|
||||
|
||||
import UnliftIO
|
||||
|
||||
data ProposeTran e = ProposeTran HashRef (SignedBox ByteString e) -- произвольная бинарная транзакция,
|
||||
deriving stock (Generic) -- подписанная ключом **АВТОРА**, который её рассылает
|
||||
|
||||
newtype AcceptTime = AcceptTime Word64
|
||||
deriving stock (Eq,Ord,Data,Generic)
|
||||
deriving newtype (Enum,Num,Real,Integral)
|
||||
|
||||
instance Serialise AcceptTime
|
||||
|
||||
data AcceptTran e = AcceptTran1 HashRef HashRef -- ссылка на (ProposTran e)
|
||||
| AcceptTran2 (Maybe AcceptTime) HashRef HashRef
|
||||
deriving stock (Generic)
|
||||
|
||||
acceptTime :: SimpleGetter (AcceptTran e) (Maybe AcceptTime)
|
||||
acceptTime = to getter
|
||||
where
|
||||
getter (AcceptTran1 _ _) = Nothing
|
||||
getter (AcceptTran2 a _ _) = a
|
||||
|
||||
unpackAcceptTran :: AcceptTran e -> (Maybe AcceptTime, HashRef, HashRef)
|
||||
unpackAcceptTran (AcceptTran1 a b) = (Nothing, a, b)
|
||||
unpackAcceptTran (AcceptTran2 t a b) = (t, a, b)
|
||||
|
||||
pattern AcceptTran :: Maybe AcceptTime -> HashRef -> HashRef -> AcceptTran e
|
||||
pattern AcceptTran t a b <- (unpackAcceptTran -> (t, a, b))
|
||||
where
|
||||
AcceptTran Nothing a b = AcceptTran1 a b
|
||||
AcceptTran (Just t) a b = AcceptTran2 (Just t) a b
|
||||
|
||||
instance ForRefChans e => Serialise (ProposeTran e)
|
||||
instance ForRefChans e => Serialise (AcceptTran e)
|
||||
|
||||
data RefChanRound e =
|
||||
RefChanRound
|
||||
{ _refChanRoundKey :: HashRef -- ^ hash of the Propose transaction
|
||||
, _refChanHeadKey :: RefChanHeadKey (Encryption e)
|
||||
, _refChanRoundTTL :: TimeSpec
|
||||
, _refChanRoundClosed :: TVar Bool
|
||||
, _refChanRoundPropose :: TVar (Maybe (ProposeTran e)) -- ^ propose transaction itself
|
||||
, _refChanRoundTrans :: TVar (HashSet HashRef)
|
||||
, _refChanRoundAccepts :: TVar (HashMap (PubKey 'Sign (Encryption e)) ())
|
||||
}
|
||||
deriving stock (Typeable, Generic)
|
||||
|
||||
makeLenses 'RefChanRound
|
||||
|
||||
newtype instance SessionKey e (RefChanRound e) =
|
||||
RefChanRoundKey HashRef
|
||||
deriving stock (Generic, Eq, Typeable)
|
||||
deriving newtype (Pretty)
|
||||
|
||||
deriving newtype instance Hashable (SessionKey e (RefChanRound e))
|
||||
|
||||
type instance SessionData e (RefChanRound e) = RefChanRound e
|
||||
|
||||
instance Expires (SessionKey e (RefChanRound e)) where
|
||||
expiresIn _ = Just 300
|
||||
|
||||
data instance EventKey e (RefChanRound e) =
|
||||
RefChanRoundEventKey
|
||||
deriving (Generic,Typeable,Eq)
|
||||
|
||||
newtype instance Event e (RefChanRound e) =
|
||||
RefChanRoundEvent (SessionKey e (RefChanRound e))
|
||||
deriving (Typeable,Generic)
|
||||
deriving newtype (Pretty)
|
||||
|
||||
instance Typeable (RefChanRound e) => Hashable (EventKey e (RefChanRound e)) where
|
||||
hashWithSalt salt _ = hashWithSalt salt (someTypeRep p)
|
||||
where
|
||||
p = Proxy @(RefChanRound e)
|
||||
|
||||
instance EventType ( Event e (RefChanRound e) ) where
|
||||
isPersistent = True
|
||||
|
||||
instance Expires (EventKey e (RefChanRound e)) where
|
||||
expiresIn = const Nothing
|
||||
|
||||
-- TODO: find-out-sure-transaction-size
|
||||
-- транзакция должна быть маленькая!
|
||||
-- хочешь что-то большое просунуть -- шли хэши.
|
||||
-- черт его знает, какой там останется пайлоад.
|
||||
-- надо посмотреть. байт, небось, 400
|
||||
data RefChanUpdate e =
|
||||
Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира
|
||||
| Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира
|
||||
deriving stock (Generic)
|
||||
|
||||
instance ForRefChans e => Serialise (RefChanUpdate e)
|
||||
|
||||
data RefChanRequest e =
|
||||
RefChanRequest (RefChanId e)
|
||||
| RefChanResponse (RefChanId e) HashRef
|
||||
deriving stock (Generic,Typeable)
|
||||
|
||||
instance ForRefChans e => Serialise (RefChanRequest e)
|
||||
|
||||
data instance EventKey e (RefChanRequest e) =
|
||||
RefChanRequestEventKey
|
||||
deriving (Generic,Typeable,Eq)
|
||||
|
||||
data instance Event e (RefChanRequest e) =
|
||||
RefChanRequestEvent (RefChanId e) HashRef
|
||||
deriving (Typeable,Generic)
|
||||
|
||||
instance EventType ( Event e (RefChanRequest e) ) where
|
||||
isPersistent = True
|
||||
|
||||
instance Expires (EventKey e (RefChanRequest e)) where
|
||||
expiresIn = const Nothing
|
||||
|
||||
instance Typeable (RefChanRequest e) => Hashable (EventKey e (RefChanRequest e)) where
|
||||
hashWithSalt salt _ = hashWithSalt salt (someTypeRep p)
|
||||
where
|
||||
p = Proxy @(RefChanRequest e)
|
||||
|
||||
|
||||
|
||||
type RefChanValidateNonce e = Nonce (RefChanValidate e)
|
||||
|
||||
data RefChanValidate e =
|
||||
RefChanValidate
|
||||
{ rcvNonce :: Nonce (RefChanValidate e)
|
||||
, rcvChan :: RefChanId e
|
||||
, rcvData :: RefChanValidateData e
|
||||
}
|
||||
deriving stock (Generic)
|
||||
|
||||
data RefChanValidateData e =
|
||||
Validate HashRef
|
||||
| Accepted HashRef
|
||||
| Rejected HashRef
|
||||
| Poke
|
||||
deriving stock (Generic)
|
||||
|
||||
instance Serialise (RefChanValidateData e)
|
||||
|
||||
instance ( Serialise (PubKey 'Sign (Encryption e))
|
||||
, Serialise (Nonce (RefChanValidate e)) )
|
||||
=> Serialise (RefChanValidate e)
|
||||
|
||||
instance ( ForRefChans e
|
||||
, Pretty (AsBase58 (Nonce (RefChanValidate e)))
|
||||
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
|
||||
) => Pretty (RefChanValidate e) where
|
||||
pretty (RefChanValidate n c d) = case d of
|
||||
Validate r -> pretty "validate" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r
|
||||
Accepted r -> pretty "accepted" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r
|
||||
Rejected r -> pretty "rejected" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r
|
||||
Poke -> pretty "poke" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c)
|
||||
|
||||
|
||||
|
||||
instance HasRefChanId e (RefChanUpdate e) where
|
||||
getRefChanId = \case
|
||||
Propose c _ -> c
|
||||
Accept c _ -> c
|
||||
|
||||
instance HasRefChanId e (RefChanRequest e) where
|
||||
getRefChanId = \case
|
||||
RefChanRequest c -> c
|
||||
RefChanResponse c _ -> c
|
||||
|
||||
|
||||
instance HasRefChanId e (RefChanValidate e) where
|
||||
getRefChanId = rcvChan
|
||||
|
||||
|
||||
refChanUpdateProto :: forall e s m proto . ( MonadUnliftIO m
|
||||
, Request e proto m
|
||||
, Response e proto m
|
||||
, HasDeferred proto e m
|
||||
, IsPeerAddr e m
|
||||
, Pretty (Peer e)
|
||||
, Sessions e (KnownPeer e) m
|
||||
, Sessions e (RefChanHeadBlock e) m
|
||||
, Sessions e (RefChanRound e) m
|
||||
, EventEmitter e (RefChanRound e) m
|
||||
, HasStorage m
|
||||
, HasGossip e proto m
|
||||
, Signatures s
|
||||
, IsRefPubKey s
|
||||
, Pretty (AsBase58 (PubKey 'Sign s))
|
||||
, ForRefChans e
|
||||
, s ~ Encryption e
|
||||
, proto ~ RefChanUpdate e
|
||||
)
|
||||
=> Bool
|
||||
-> PeerCredentials s
|
||||
-> RefChanAdapter e m
|
||||
-> RefChanUpdate e
|
||||
-> m ()
|
||||
|
||||
refChanUpdateProto self pc adapter msg = do
|
||||
-- авторизовать пира
|
||||
peer <- thatPeer @proto
|
||||
|
||||
auth <- find (KnownPeerKey peer) id <&> isJust
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
let pk = view peerSignPk pc
|
||||
let sk = view peerSignSk pc
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
guard (auth || self)
|
||||
|
||||
-- TODO: process-each-message-only-once
|
||||
-- где-то тут мы разбираемся, что такое сообщеине
|
||||
-- уже отправляли и больше одного раза не реагируем
|
||||
|
||||
-- У нас тут получается раунд на каждый Propose
|
||||
-- Это может быть и хорошо и похо. Если очень
|
||||
-- много транзакций, это плохо. Если не очень
|
||||
-- то это нормально и можно обойтись без понятия
|
||||
-- "блок".
|
||||
-- так-то и количество proposers можно ограничить
|
||||
|
||||
guard =<< lift (refChanSubscribed adapter (getRefChanId msg))
|
||||
|
||||
let h0 = hashObject @HbSync (serialise msg)
|
||||
guard =<< liftIO (hasBlock sto h0 <&> isNothing)
|
||||
|
||||
case msg of
|
||||
Propose chan box -> do
|
||||
|
||||
debug $ "RefChanUpdate/Propose" <+> pretty h0
|
||||
|
||||
deferred @proto do
|
||||
|
||||
-- проверили подпись пира
|
||||
(peerKey, ProposeTran headRef abox) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||
|
||||
-- проверили подпись автора
|
||||
(authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 abox
|
||||
|
||||
-- итак, сначала достаём голову. как мы достаём голову?
|
||||
|
||||
let refchanKey = RefChanHeadKey @s chan
|
||||
h <- MaybeT $ liftIO $ getRef sto refchanKey
|
||||
-- смотрим, что у нас такая же голова.
|
||||
-- если нет -- значит, кто-то рассинхронизировался.
|
||||
-- может быть, потом попробуем головы запросить
|
||||
guard (HashRef h == headRef)
|
||||
|
||||
debug $ "OMG! Got trans" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey)
|
||||
|
||||
-- теперь достаём голову
|
||||
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
|
||||
|
||||
let pips = view refChanHeadPeers headBlock
|
||||
|
||||
guard $ checkACL headBlock (Just peerKey) authorKey
|
||||
|
||||
debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey)
|
||||
|
||||
-- TODO: validate-transaction
|
||||
-- итак, как нам валидировать транзакцию?
|
||||
-- HTTP ?
|
||||
-- TCP ?
|
||||
-- UDP ? (кстати, годно и быстро)
|
||||
-- CLI ?
|
||||
-- получается, риалтайм: ждём не более X секунд валидации,
|
||||
-- иначе не валидируем.
|
||||
-- по хорошему, не блокироваться бы в запросе.
|
||||
-- тут мы зависим от состояния конвейра, нас можно DDoS-ить
|
||||
-- большим количеством запросов и транзакции будут отклоняться
|
||||
-- при большом потоке.
|
||||
-- но решается это.. тадам! PoW! подбором красивых хэшей
|
||||
-- при увеличении нагрузки.
|
||||
-- тогда, правда, в открытой системе работает паттерн -- DDoS
|
||||
-- всех, кроме своих узлов, а свои узлы всё принимают.
|
||||
|
||||
-- для начала: сделаем хук для валидации, которыйне будет делать ничего
|
||||
|
||||
-- если не смогли сохранить транзу, то и Accept разослать
|
||||
-- не сможем
|
||||
-- это правильно, так как транза содержит ссылку на RefChanId
|
||||
-- следовательно, для другого рефчана будет другая транза
|
||||
|
||||
hash <- MaybeT $ liftIO $ putBlock sto (serialise msg)
|
||||
|
||||
ts <- liftIO getTimeCoarse
|
||||
|
||||
let toWait = TimeoutSec (fromIntegral $ 2 * view refChanHeadWaitAccept headBlock)
|
||||
let ttl = ts + fromNanoSecs (fromIntegral $ toNanoSeconds toWait)
|
||||
|
||||
let rcrk = RefChanRoundKey (HashRef hash)
|
||||
|
||||
rndHere <- lift $ find rcrk id
|
||||
|
||||
defRound <- RefChanRound @e (HashRef hash) refchanKey ttl
|
||||
<$> newTVarIO False
|
||||
<*> newTVarIO Nothing
|
||||
<*> newTVarIO (HashSet.singleton (HashRef hash)) -- save propose
|
||||
<*> newTVarIO (HashMap.singleton peerKey ())
|
||||
|
||||
unless (isJust rndHere) do
|
||||
lift $ update defRound rcrk id
|
||||
lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk)
|
||||
|
||||
-- не обрабатывать propose, если он уже в процессе
|
||||
guard (isNothing rndHere)
|
||||
|
||||
-- FIXME: fixed-timeout-is-no-good
|
||||
validated <- either id id <$> lift ( race (pause @'Seconds 5 >> pure False)
|
||||
$ refChanValidatePropose adapter chan (HashRef hash)
|
||||
)
|
||||
-- почему так:
|
||||
-- мы можем тормозить в проверке транзакции,
|
||||
-- другие пиры могут работать быстрее и от них
|
||||
-- может прийти accept.
|
||||
-- так что раунд всё равно нужно завести,
|
||||
-- даже если транза не очень.
|
||||
|
||||
unless validated do
|
||||
maybe1 rndHere none $ \rnd -> do
|
||||
atomically $ writeTVar (view refChanRoundClosed rnd) True
|
||||
liftIO $ delBlock sto hash
|
||||
|
||||
guard validated
|
||||
|
||||
debug $ "TRANS VALIDATED" <+> pretty (AsBase58 chan) <+> pretty hash
|
||||
|
||||
lift $ gossip msg
|
||||
|
||||
-- проверить, что мы вообще авторизованы
|
||||
-- рассылать ACCEPT
|
||||
|
||||
guard ( pk `HashMap.member` pips )
|
||||
|
||||
-- если нет - то и всё, просто перешлём
|
||||
-- по госсипу исходную транзу
|
||||
|
||||
ts <- liftIO getPOSIXTime <&> round <&> Just
|
||||
let tran = AcceptTran ts headRef (HashRef hash)
|
||||
|
||||
-- -- генерируем Accept
|
||||
let accept = Accept chan (makeSignedBox @e pk sk tran)
|
||||
|
||||
-- -- и рассылаем всем
|
||||
debug "GOSSIP ACCEPT TRANSACTION"
|
||||
lift $ gossip accept
|
||||
|
||||
-- -- рассылаем ли себе? что бы был хоть один accept
|
||||
lift $ refChanUpdateProto True pc adapter accept
|
||||
|
||||
Accept chan box -> deferred @proto do
|
||||
|
||||
-- что если получили ACCEPT раньше PROPOSE ?
|
||||
-- что если PROPOSE еще обрабатывается?
|
||||
-- надо, короче, блокироваться и ждать тут Propose
|
||||
-- но если блокироваться --- то конвейр вообще
|
||||
-- может встать. что делать?
|
||||
--
|
||||
|
||||
debug $ "RefChanUpdate/ACCEPT" <+> pretty h0
|
||||
|
||||
(peerKey, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
|
||||
|
||||
let refchanKey = RefChanHeadKey @s chan
|
||||
|
||||
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
|
||||
|
||||
h <- MaybeT $ liftIO $ getRef sto refchanKey
|
||||
|
||||
guard (HashRef h == headRef)
|
||||
|
||||
lift $ gossip msg
|
||||
|
||||
-- тут может так случиться, что propose еще нет
|
||||
-- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы
|
||||
-- почти одновременно. ну или не успело записаться. и что делать?
|
||||
|
||||
-- вот прямо тут надо ждать, пока придёт / завершится Propose
|
||||
-- -- или до таймаута
|
||||
|
||||
let afterPropose = runMaybeT do
|
||||
|
||||
here <- fix \next -> do
|
||||
blk <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust
|
||||
if blk then
|
||||
pure blk
|
||||
else do
|
||||
pause @'Seconds 0.25
|
||||
next
|
||||
|
||||
unless here do
|
||||
warn $ "No propose transaction saved yet!" <+> pretty hashRef
|
||||
|
||||
tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef)
|
||||
|
||||
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just
|
||||
|
||||
|
||||
proposed <- MaybeT $ pure $ case tran of
|
||||
Propose _ pbox -> Just pbox
|
||||
_ -> Nothing
|
||||
|
||||
|
||||
(_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed
|
||||
|
||||
debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0
|
||||
|
||||
-- compiler bug?
|
||||
let (ProposeTran _ pbox) = ptran
|
||||
|
||||
(authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox
|
||||
|
||||
-- может, и не надо второй раз проверять
|
||||
guard $ checkACL headBlock (Just peerKey) authorKey
|
||||
|
||||
debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef
|
||||
|
||||
rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id
|
||||
|
||||
atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ())
|
||||
|
||||
-- TODO: garbage-collection-strongly-required
|
||||
ha <- MaybeT $ liftIO $ putBlock sto (serialise msg)
|
||||
|
||||
atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha))
|
||||
-- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it?
|
||||
|
||||
accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size
|
||||
|
||||
-- FIXME: why-accepts-quorum-on-failed-proposal?
|
||||
|
||||
debug $ "ACCEPTS:" <+> pretty accepts
|
||||
|
||||
closed <- readTVarIO (view refChanRoundClosed rcRound)
|
||||
|
||||
-- FIXME: round!
|
||||
when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do
|
||||
debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef
|
||||
|
||||
trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList
|
||||
|
||||
forM_ trans $ \t -> do
|
||||
lift $ refChanWriteTran adapter t
|
||||
debug $ "WRITING TRANS" <+> pretty t
|
||||
|
||||
let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList
|
||||
votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys
|
||||
|
||||
debug $ "PIPS" <+> pretty (HashSet.toList pips & fmap AsBase58)
|
||||
debug $ "VOTES" <+> pretty (HashSet.toList votes & fmap AsBase58)
|
||||
|
||||
when (pips `HashSet.isSubsetOf` votes) do
|
||||
debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans)
|
||||
atomically $ writeTVar (view refChanRoundClosed rcRound) True
|
||||
|
||||
-- мы не можем ждать / поллить в deferred потому,
|
||||
-- что мы так забьем конвейр - там сейчас всего 8
|
||||
-- воркеров, и 8 параллельных ждущих запросов
|
||||
-- все остановят.
|
||||
|
||||
let w = TimeoutSec (realToFrac $ view refChanHeadWaitAccept headBlock)
|
||||
void $ lift $ race ( pause (2 * w) ) afterPropose
|
||||
|
||||
|
||||
-- TODO: refchan-poll-proto
|
||||
-- Запрашиваем refchan у всех.
|
||||
-- Пишем в итоговый лог только такие
|
||||
-- propose + accept у которых больше quorum accept
|
||||
-- каждую транзу обрабатываем только один раз
|
||||
--
|
||||
|
||||
refChanRequestProto :: forall e s m proto . ( MonadIO m
|
||||
, Request e proto m
|
||||
, Response e proto m
|
||||
, HasDeferred proto e m
|
||||
, IsPeerAddr e m
|
||||
, Pretty (Peer e)
|
||||
, Sessions e (KnownPeer e) m
|
||||
, Sessions e (RefChanHeadBlock e) m
|
||||
, EventEmitter e proto m
|
||||
, HasStorage m
|
||||
, Signatures s
|
||||
, IsRefPubKey s
|
||||
, ForRefChans e
|
||||
, s ~ Encryption e
|
||||
, proto ~ RefChanRequest e
|
||||
)
|
||||
=> Bool
|
||||
-> RefChanAdapter e m
|
||||
-> RefChanRequest e
|
||||
-> m ()
|
||||
|
||||
refChanRequestProto self adapter msg = do
|
||||
|
||||
peer <- thatPeer @proto
|
||||
|
||||
auth' <- find (KnownPeerKey peer) id
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
guard (self || isJust auth')
|
||||
|
||||
auth <- MaybeT $ pure auth'
|
||||
|
||||
guard =<< lift (refChanSubscribed adapter (getRefChanId @e msg))
|
||||
|
||||
case msg of
|
||||
|
||||
RefChanRequest chan -> do
|
||||
rv <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan)
|
||||
lift $ response @e (RefChanResponse @e chan (HashRef rv))
|
||||
|
||||
RefChanResponse chan val -> do
|
||||
hd <- MaybeT $ getActualRefChanHead @e (RefChanHeadKey @s chan)
|
||||
let ppk = view peerSignKey auth
|
||||
|
||||
guard $ ppk `HashMap.member` view refChanHeadPeers hd
|
||||
|
||||
lift $ emit RefChanRequestEventKey (RefChanRequestEvent @e chan val)
|
||||
debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val
|
||||
|
||||
|
||||
makeProposeTran :: forall e s m . ( MonadIO m
|
||||
, ForRefChans e
|
||||
, Signatures (Encryption e)
|
||||
, HasStorage m
|
||||
, s ~ Encryption e
|
||||
)
|
||||
=> PeerCredentials s
|
||||
-> RefChanId e
|
||||
-> SignedBox ByteString e
|
||||
-> m (Maybe (SignedBox (ProposeTran e) e))
|
||||
|
||||
makeProposeTran creds chan box1 = do
|
||||
sto <- getStorage
|
||||
runMaybeT do
|
||||
h <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan)
|
||||
let tran = ProposeTran @e (HashRef h) box1
|
||||
let pk = view peerSignPk creds
|
||||
let sk = view peerSignSk creds
|
||||
pure $ makeSignedBox @e pk sk tran
|
||||
|
||||
-- FIXME: reconnect-validator-client-after-restart
|
||||
-- почему-то сейчас если рестартовать пира,
|
||||
-- но не растартовать валидатор --- то не получится
|
||||
-- повторно соединиться с валидатором.
|
||||
|
|
@ -0,0 +1,349 @@
|
|||
{-# Language UndecidableInstances #-}
|
||||
{-# Language AllowAmbiguousTypes #-}
|
||||
{-# Language TemplateHaskell #-}
|
||||
{-# Language FunctionalDependencies #-}
|
||||
{-# LANGUAGE ImplicitParams #-}
|
||||
{-# LANGUAGE PatternSynonyms, ViewPatterns #-}
|
||||
module HBS2.Net.Proto.RefChan.Types where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Detect
|
||||
import HBS2.Clock
|
||||
import HBS2.Net.Proto
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Base58
|
||||
import HBS2.Defaults
|
||||
import HBS2.Events
|
||||
import HBS2.Net.Proto.Peer
|
||||
import HBS2.Net.Proto.Sessions
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Data.Types.SignedBox
|
||||
import HBS2.Storage
|
||||
|
||||
import Data.Config.Suckless
|
||||
|
||||
import HBS2.System.Logger.Simple
|
||||
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.Either
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.HashSet qualified as HashSet
|
||||
import Data.Maybe
|
||||
import Data.Text qualified as Text
|
||||
import Lens.Micro.Platform
|
||||
import Data.Hashable hiding (Hashed)
|
||||
|
||||
|
||||
{- HLINT ignore "Use newtype instead of data" -}
|
||||
|
||||
type RefChanId e = PubKey 'Sign (Encryption e)
|
||||
type RefChanOwner e = PubKey 'Sign (Encryption e)
|
||||
type RefChanAuthor e = PubKey 'Sign (Encryption e)
|
||||
|
||||
type Weight = Integer
|
||||
|
||||
data RefChanHeadBlock e =
|
||||
RefChanHeadBlockSmall
|
||||
{ _refChanHeadVersion :: Integer
|
||||
, _refChanHeadQuorum :: Integer
|
||||
, _refChanHeadWaitAccept :: Integer
|
||||
, _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight
|
||||
, _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e))
|
||||
}
|
||||
| RefChanHeadBlock1
|
||||
{ _refChanHeadVersion :: Integer
|
||||
, _refChanHeadQuorum :: Integer
|
||||
, _refChanHeadWaitAccept :: Integer
|
||||
, _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight
|
||||
, _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e))
|
||||
, _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e))
|
||||
, _refChanHeadExt :: ByteString
|
||||
}
|
||||
deriving stock (Generic)
|
||||
|
||||
makeLenses ''RefChanHeadBlock
|
||||
|
||||
data RefChanActionRequest =
|
||||
RefChanAnnounceBlock HashRef
|
||||
| RefChanFetch HashRef
|
||||
deriving stock (Generic)
|
||||
|
||||
instance Serialise RefChanActionRequest
|
||||
|
||||
data RefChanNotify e =
|
||||
Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора
|
||||
-- довольно уместно будет добавить эти команды сюда -
|
||||
-- они постоянно нужны, и это сильно упростит коммуникации
|
||||
| ActionRequest (RefChanId e) RefChanActionRequest
|
||||
|
||||
deriving stock (Generic)
|
||||
|
||||
instance ForRefChans e => Serialise (RefChanNotify e)
|
||||
|
||||
newtype instance EventKey e (RefChanNotify e) =
|
||||
RefChanNotifyEventKey (RefChanId e)
|
||||
|
||||
deriving stock instance ForRefChans e => Typeable (EventKey e (RefChanNotify e))
|
||||
deriving stock instance ForRefChans e => Eq (EventKey e (RefChanNotify e))
|
||||
deriving newtype instance ForRefChans e => Hashable (EventKey e (RefChanNotify e))
|
||||
|
||||
data instance Event e (RefChanNotify e) =
|
||||
RefChanNotifyEvent HashRef (RefChanNotify e)
|
||||
|
||||
-- FIXME: remove-default-instance?
|
||||
instance EventType (Event e (RefChanNotify e)) where
|
||||
isPersistent = True
|
||||
|
||||
instance Expires (EventKey e (RefChanNotify e)) where
|
||||
expiresIn = const Nothing -- (Just defCookieTimeoutSec)
|
||||
|
||||
|
||||
|
||||
type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e))
|
||||
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
|
||||
, FromStringMaybe (PubKey 'Sign (Encryption e))
|
||||
, FromStringMaybe (PubKey 'Encrypt (Encryption e))
|
||||
, Signatures (Encryption e)
|
||||
, Serialise (Signature (Encryption e))
|
||||
, Serialise (PubKey 'Encrypt (Encryption e))
|
||||
, Hashable (PubKey 'Encrypt (Encryption e))
|
||||
, Hashable (PubKey 'Sign (Encryption e))
|
||||
)
|
||||
|
||||
|
||||
|
||||
refChanHeadReaders :: ForRefChans e
|
||||
=> Lens (RefChanHeadBlock e)
|
||||
(RefChanHeadBlock e)
|
||||
(HashSet (PubKey 'Encrypt (Encryption e)))
|
||||
(HashSet (PubKey 'Encrypt (Encryption e)))
|
||||
|
||||
refChanHeadReaders = lens g s
|
||||
where
|
||||
g (RefChanHeadBlockSmall{}) = mempty
|
||||
g (RefChanHeadBlock1{..}) = _refChanHeadReaders'
|
||||
s v@(RefChanHeadBlock1{}) x = v { _refChanHeadReaders' = x }
|
||||
s x _ = x
|
||||
|
||||
instance ForRefChans e => Serialise (RefChanHeadBlock e)
|
||||
|
||||
type instance SessionData e (RefChanHeadBlock e) = RefChanHeadBlock e
|
||||
|
||||
newtype instance SessionKey e (RefChanHeadBlock e) =
|
||||
RefChanHeadBlockKey (RefChanHeadKey (Encryption e))
|
||||
|
||||
deriving newtype instance ForRefChans L4Proto
|
||||
=> Hashable (SessionKey L4Proto (RefChanHeadBlock L4Proto))
|
||||
|
||||
deriving stock instance ForRefChans L4Proto
|
||||
=> Eq (SessionKey L4Proto (RefChanHeadBlock L4Proto))
|
||||
|
||||
-- TODO: define-expiration-time
|
||||
instance Expires (SessionKey L4Proto (RefChanHeadBlock L4Proto)) where
|
||||
expiresIn = const (Just defCookieTimeoutSec)
|
||||
|
||||
newtype RefChanHeadKey s = RefChanHeadKey (PubKey 'Sign s)
|
||||
|
||||
instance RefMetaData (RefChanHeadKey s)
|
||||
|
||||
deriving stock instance IsRefPubKey s => Eq (RefChanHeadKey s)
|
||||
|
||||
instance IsRefPubKey s => Hashable (RefChanHeadKey s) where
|
||||
hashWithSalt s k = hashWithSalt s (hashObject @HbSync k)
|
||||
|
||||
instance IsRefPubKey s => Hashed HbSync (RefChanHeadKey s) where
|
||||
hashObject (RefChanHeadKey pk) = hashObject ("refchanhead|" <> serialise pk)
|
||||
|
||||
instance IsRefPubKey s => FromStringMaybe (RefChanHeadKey s) where
|
||||
fromStringMay s = RefChanHeadKey <$> fromStringMay s
|
||||
|
||||
instance IsRefPubKey s => IsString (RefChanHeadKey s) where
|
||||
fromString s = fromMaybe (error "bad public key base58") (fromStringMay s)
|
||||
|
||||
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanHeadKey s)) where
|
||||
pretty (AsBase58 (RefChanHeadKey k)) = pretty (AsBase58 k)
|
||||
|
||||
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanHeadKey s) where
|
||||
pretty (RefChanHeadKey k) = pretty (AsBase58 k)
|
||||
|
||||
|
||||
newtype RefChanLogKey s = RefChanLogKey { fromRefChanLogKey :: PubKey 'Sign s }
|
||||
|
||||
instance RefMetaData (RefChanLogKey s)
|
||||
|
||||
deriving stock instance IsRefPubKey s => Eq (RefChanLogKey s)
|
||||
|
||||
instance IsRefPubKey s => Hashable (RefChanLogKey s) where
|
||||
hashWithSalt s k = hashWithSalt s (hashObject @HbSync k)
|
||||
|
||||
instance IsRefPubKey s => Hashed HbSync (RefChanLogKey s) where
|
||||
hashObject (RefChanLogKey pk) = hashObject ("refchanlog|" <> serialise pk)
|
||||
|
||||
instance IsRefPubKey s => FromStringMaybe (RefChanLogKey s) where
|
||||
fromStringMay s = RefChanLogKey <$> fromStringMay s
|
||||
|
||||
instance IsRefPubKey s => IsString (RefChanLogKey s) where
|
||||
fromString s = fromMaybe (error "bad public key base58") (fromStringMay s)
|
||||
|
||||
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanLogKey s)) where
|
||||
pretty (AsBase58 (RefChanLogKey k)) = pretty (AsBase58 k)
|
||||
|
||||
instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where
|
||||
pretty (RefChanLogKey k) = pretty (AsBase58 k)
|
||||
|
||||
instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where
|
||||
|
||||
fromStringMay str =
|
||||
case readers of
|
||||
[] -> RefChanHeadBlockSmall <$> version
|
||||
<*> quorum
|
||||
<*> wait
|
||||
<*> pure (HashMap.fromList peers)
|
||||
<*> pure (HashSet.fromList authors)
|
||||
|
||||
rs -> RefChanHeadBlock1 <$> version
|
||||
<*> quorum
|
||||
<*> wait
|
||||
<*> pure (HashMap.fromList peers)
|
||||
<*> pure (HashSet.fromList authors)
|
||||
<*> pure (HashSet.fromList rs)
|
||||
<*> pure mempty
|
||||
|
||||
where
|
||||
parsed = parseTop str & fromRight mempty
|
||||
version = lastMay [ n | (ListVal [SymbolVal "version", LitIntVal n] ) <- parsed ]
|
||||
quorum = lastMay [ n | (ListVal [SymbolVal "quorum", LitIntVal n] ) <- parsed ]
|
||||
wait = lastMay [ n | (ListVal [SymbolVal "wait", LitIntVal n] ) <- parsed ]
|
||||
|
||||
peers = catMaybes [ (,) <$> fromStringMay (Text.unpack s) <*> pure w
|
||||
| (ListVal [SymbolVal "peer", LitStrVal s, LitIntVal w] ) <- parsed
|
||||
]
|
||||
|
||||
authors = catMaybes [ fromStringMay (Text.unpack s)
|
||||
| (ListVal [SymbolVal "author", LitStrVal s] ) <- parsed
|
||||
]
|
||||
|
||||
readers = catMaybes [ fromStringMay @(PubKey 'Encrypt (Encryption e)) (Text.unpack s)
|
||||
| (ListVal [SymbolVal "reader", LitStrVal s] ) <- parsed
|
||||
]
|
||||
|
||||
instance (ForRefChans e
|
||||
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
|
||||
, Pretty (AsBase58 (PubKey 'Encrypt (Encryption e)))
|
||||
) => Pretty (RefChanHeadBlock e) where
|
||||
pretty blk = parens ("version" <+> pretty (view refChanHeadVersion blk)) <> line
|
||||
<>
|
||||
parens ("quorum" <+> pretty (view refChanHeadQuorum blk)) <> line
|
||||
<>
|
||||
parens ("wait" <+> pretty (view refChanHeadWaitAccept blk)) <> line
|
||||
<>
|
||||
vcat (fmap peer (HashMap.toList $ view refChanHeadPeers blk)) <> line
|
||||
<>
|
||||
vcat (fmap author (HashSet.toList $ view refChanHeadAuthors blk)) <> line
|
||||
<>
|
||||
vcat (fmap reader (HashSet.toList $ view refChanHeadReaders blk)) <> line
|
||||
|
||||
where
|
||||
peer (p,w) = parens ("peer" <+> dquotes (pretty (AsBase58 p)) <+> pretty w)
|
||||
author p = parens ("author" <+> dquotes (pretty (AsBase58 p)))
|
||||
reader p = parens ("reader" <+> dquotes (pretty (AsBase58 p)))
|
||||
|
||||
|
||||
-- блок головы может быть довольно большой.
|
||||
-- поэтому посылаем его, как merkle tree
|
||||
newtype RefChanHeadBlockTran e =
|
||||
RefChanHeadBlockTran { fromRefChanHeadBlockTran :: HashRef }
|
||||
deriving stock (Generic)
|
||||
|
||||
instance Serialise (RefChanHeadBlockTran e)
|
||||
|
||||
data RefChanHead e =
|
||||
RefChanHead (RefChanId e) (RefChanHeadBlockTran e)
|
||||
| RefChanGetHead (RefChanId e)
|
||||
deriving stock (Generic)
|
||||
|
||||
instance ForRefChans e => Serialise (RefChanHead e)
|
||||
|
||||
-- FIXME: rename
|
||||
data RefChanAdapter e m =
|
||||
RefChanAdapter
|
||||
{ refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()
|
||||
, refChanSubscribed :: RefChanId e -> m Bool
|
||||
, refChanWriteTran :: HashRef -> m ()
|
||||
, refChanValidatePropose :: RefChanId e -> HashRef -> m Bool
|
||||
, refChanNotifyRely :: RefChanId e -> RefChanNotify e -> m ()
|
||||
}
|
||||
|
||||
class HasRefChanId e p | p -> e where
|
||||
getRefChanId :: p -> RefChanId e
|
||||
|
||||
instance HasRefChanId e (RefChanNotify e) where
|
||||
getRefChanId = \case
|
||||
Notify c _ -> c
|
||||
ActionRequest c _ -> c
|
||||
|
||||
|
||||
getActualRefChanHead :: forall e s m . ( MonadIO m
|
||||
, Sessions e (RefChanHeadBlock e) m
|
||||
, HasStorage m
|
||||
, Signatures s
|
||||
, IsRefPubKey s
|
||||
-- , Pretty (AsBase58 (PubKey 'Sign s))
|
||||
-- , Serialise (Signature s)
|
||||
, ForRefChans e
|
||||
, HasStorage m
|
||||
, s ~ Encryption e
|
||||
)
|
||||
=> RefChanHeadKey s
|
||||
-> m (Maybe (RefChanHeadBlock e))
|
||||
|
||||
getActualRefChanHead key = do
|
||||
sto <- getStorage
|
||||
|
||||
runMaybeT do
|
||||
mbHead <- do
|
||||
lift $ find @e (RefChanHeadBlockKey key) id
|
||||
|
||||
case mbHead of
|
||||
Just hd -> do
|
||||
debug "HEAD DISCOVERED"
|
||||
pure hd
|
||||
|
||||
Nothing -> do
|
||||
headblk <- MaybeT $ getRefChanHead sto key
|
||||
debug "HEAD FOUND"
|
||||
pure headblk
|
||||
|
||||
getRefChanHead :: forall e s m . ( MonadIO m
|
||||
, s ~ Encryption e
|
||||
, ForRefChans e
|
||||
, Signatures s
|
||||
)
|
||||
=> AnyStorage
|
||||
-> RefChanHeadKey s
|
||||
-> m (Maybe (RefChanHeadBlock e))
|
||||
|
||||
getRefChanHead sto k = runMaybeT do
|
||||
h <- MaybeT $ liftIO $ getRef sto k
|
||||
hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h)
|
||||
(_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob
|
||||
pure headblk
|
||||
|
||||
|
||||
checkACL :: forall e s . (Encryption e ~ s, ForRefChans e)
|
||||
=> RefChanHeadBlock e
|
||||
-> Maybe (PubKey 'Sign s)
|
||||
-> PubKey 'Sign s
|
||||
-> Bool
|
||||
|
||||
checkACL theHead mbPeerKey authorKey = match
|
||||
where
|
||||
pips = view refChanHeadPeers theHead
|
||||
aus = view refChanHeadAuthors theHead
|
||||
match = maybe True (`HashMap.member` pips) mbPeerKey
|
||||
&& authorKey `HashSet.member` aus
|
||||
|
|
@ -162,21 +162,21 @@ data RefLogRequestI e m =
|
|||
, isRefLogSubscribed :: PubKey 'Sign (Encryption e) -> m Bool
|
||||
}
|
||||
|
||||
refLogRequestProto :: forall e s m . ( MonadIO m
|
||||
, Request e (RefLogRequest e) m
|
||||
, Response e (RefLogRequest e) m
|
||||
, HasDeferred (RefLogRequest e) e m
|
||||
refLogRequestProto :: forall e s m proto . ( MonadIO m
|
||||
, Request e proto m
|
||||
, Response e proto m
|
||||
, Sessions e (KnownPeer e) m
|
||||
, IsPeerAddr e m
|
||||
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
|
||||
, EventEmitter e (RefLogRequestAnswer e) m
|
||||
, Pretty (Peer e)
|
||||
, s ~ Encryption e
|
||||
, proto ~ RefLogRequest e
|
||||
)
|
||||
=> RefLogRequestI e m -> RefLogRequest e -> m ()
|
||||
|
||||
refLogRequestProto adapter cmd = do
|
||||
p <- thatPeer proto
|
||||
p <- thatPeer @proto
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
|
@ -186,20 +186,17 @@ refLogRequestProto adapter cmd = do
|
|||
case cmd of
|
||||
(RefLogRequest pk) -> lift do
|
||||
trace $ "got RefLogUpdateRequest for" <+> pretty (AsBase58 pk)
|
||||
pip <- thatPeer proto
|
||||
pip <- thatPeer @proto
|
||||
answ' <- onRefLogRequest adapter (pip,pk)
|
||||
maybe1 answ' none $ \answ -> do
|
||||
response (RefLogResponse @e pk answ)
|
||||
|
||||
(RefLogResponse pk h) -> lift do
|
||||
trace $ "got RefLogResponse for" <+> pretty (AsBase58 pk) <+> pretty h
|
||||
pip <- thatPeer proto
|
||||
pip <- thatPeer @proto
|
||||
emit RefLogReqAnswerKey (RefLogReqAnswerData @e pk h)
|
||||
onRefLogResponse adapter (pip,pk,h)
|
||||
|
||||
where
|
||||
proto = Proxy @(RefLogRequest e)
|
||||
|
||||
refLogUpdateProto :: forall e s m proto . ( MonadIO m
|
||||
, Request e proto m
|
||||
, Response e proto m
|
||||
|
@ -220,7 +217,7 @@ refLogUpdateProto :: forall e s m proto . ( MonadIO m
|
|||
refLogUpdateProto =
|
||||
\case
|
||||
e@RefLogUpdate{} -> do
|
||||
p <- thatPeer proto
|
||||
p <- thatPeer @proto
|
||||
auth <- find (KnownPeerKey p) id <&> isJust
|
||||
|
||||
when auth do
|
||||
|
@ -237,9 +234,6 @@ refLogUpdateProto =
|
|||
emit @e RefLogUpdateEvKey (RefLogUpdateEvData (pubk, e, Just p))
|
||||
gossip e
|
||||
|
||||
where
|
||||
proto = Proxy @(RefLogUpdate e)
|
||||
|
||||
instance ( Serialise (PubKey 'Sign (Encryption e))
|
||||
, Serialise (Nonce (RefLogUpdate e))
|
||||
, Serialise (Signature (Encryption e))
|
||||
|
|
|
@ -111,8 +111,8 @@ class ( Eq (PeerAddr e)
|
|||
fromPeerAddr :: PeerAddr e -> m (Peer e)
|
||||
|
||||
-- FIXME: type-application-instead-of-proxy
|
||||
class (Monad m, HasProtocol e p) => HasThatPeer e p (m :: Type -> Type) where
|
||||
thatPeer :: Proxy p -> m (Peer e)
|
||||
class (Monad m, HasProtocol e p) => HasThatPeer p e (m :: Type -> Type) where
|
||||
thatPeer :: m (Peer e)
|
||||
|
||||
class (MonadIO m, HasProtocol e p) => HasDeferred p e m | p -> e where
|
||||
deferred :: m () -> m ()
|
||||
|
@ -123,7 +123,7 @@ instance (HasDeferred p e m, Monad m) => HasDeferred p e (MaybeT m) where
|
|||
|
||||
class ( MonadIO m
|
||||
, HasProtocol e p
|
||||
, HasThatPeer e p m
|
||||
, HasThatPeer p e m
|
||||
) => Response e p m | p -> e where
|
||||
|
||||
response :: p -> m ()
|
||||
|
|
|
@ -10,6 +10,7 @@ import PeerConfig
|
|||
|
||||
import HBS2.Peer.RPC.Client.Unix
|
||||
|
||||
import Options.Applicative
|
||||
import Data.Kind
|
||||
import Lens.Micro.Platform
|
||||
import UnliftIO
|
||||
|
@ -45,4 +46,15 @@ withRPCMessaging o action = do
|
|||
pause @'Seconds 0.05
|
||||
cancel m1
|
||||
|
||||
rpcOpt :: Parser String
|
||||
rpcOpt = strOption ( short 'r' <> long "rpc"
|
||||
<> help "addr:port" )
|
||||
|
||||
-- FIXME: options-duped-with-peer-main
|
||||
confOpt :: Parser FilePath
|
||||
confOpt = strOption ( long "config" <> short 'c' <> help "config" )
|
||||
|
||||
pRpcCommon :: Parser RPCOpt
|
||||
pRpcCommon = do
|
||||
RPCOpt <$> optional confOpt
|
||||
<*> optional rpcOpt
|
||||
|
|
|
@ -94,18 +94,7 @@ pRefChanHeadDump= do
|
|||
print $ pretty hdblk
|
||||
|
||||
|
||||
-- FIXME: options-duped-with-peer-main
|
||||
confOpt :: Parser FilePath
|
||||
confOpt = strOption ( long "config" <> short 'c' <> help "config" )
|
||||
|
||||
rpcOpt :: Parser String
|
||||
rpcOpt = strOption ( short 'r' <> long "rpc"
|
||||
<> help "addr:port" )
|
||||
|
||||
pRpcCommon :: Parser RPCOpt
|
||||
pRpcCommon = do
|
||||
RPCOpt <$> optional confOpt
|
||||
<*> optional rpcOpt
|
||||
|
||||
pRefChanHeadPost :: Parser (IO ())
|
||||
pRefChanHeadPost = do
|
||||
|
|
|
@ -244,12 +244,6 @@ runCLI = do
|
|||
-- <> command "dial" (info pDialog (progDesc "dialog commands"))
|
||||
)
|
||||
|
||||
confOpt = strOption ( long "config" <> short 'c' <> help "config" )
|
||||
|
||||
rpcOpt = strOption ( short 'r' <> long "rpc"
|
||||
<> help "addr:port" )
|
||||
|
||||
|
||||
common = do
|
||||
pref <- optional $ strOption ( short 'p' <> long "prefix"
|
||||
<> help "storage prefix" )
|
||||
|
@ -283,10 +277,6 @@ runCLI = do
|
|||
pRun = do
|
||||
runPeer <$> common
|
||||
|
||||
pRpcCommon = do
|
||||
RPCOpt <$> optional confOpt
|
||||
<*> optional rpcOpt
|
||||
|
||||
pDie = do
|
||||
rpc <- pRpcCommon
|
||||
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
|
||||
|
@ -535,8 +525,8 @@ instance (Monad m, s ~ Encryption e) => HasCredentials s (CredentialsM e s m) wh
|
|||
instance (Monad m, s ~ Encryption e) => HasCredentials s (ResponseM e (CredentialsM e s m)) where
|
||||
getCredentials = lift getCredentials
|
||||
|
||||
instance (Monad m, HasThatPeer e p m, s ~ Encryption e) => HasThatPeer e p (CredentialsM e s m) where
|
||||
thatPeer = lift . thatPeer
|
||||
instance (Monad m, HasThatPeer p e m, s ~ Encryption e) => HasThatPeer p e (CredentialsM e s m) where
|
||||
thatPeer = lift (thatPeer @p)
|
||||
|
||||
instance ( EventEmitter e p m
|
||||
) => EventEmitter e p (CredentialsM e s m) where
|
||||
|
@ -768,7 +758,7 @@ runPeer opts = Exception.handle (\e -> myException e
|
|||
, refChanValidatePropose = refChanValidateTranFn @e rce
|
||||
|
||||
, refChanNotifyRely = \r u -> do
|
||||
debug "refChanNotifyRely MOTHERFUCKER!"
|
||||
trace "refChanNotifyRely!"
|
||||
refChanNotifyRelyFn @e rce r u
|
||||
case u of
|
||||
Notify rr s -> do
|
||||
|
@ -893,8 +883,6 @@ runPeer opts = Exception.handle (\e -> myException e
|
|||
$ knownPeers @e pl >>= mapM \pip ->
|
||||
fmap (, pip) <$> find (KnownPeerKey pip) (view peerOwnNonce)
|
||||
|
||||
let proto1 = view sockType p
|
||||
|
||||
case Map.lookup thatNonce pdkv of
|
||||
|
||||
-- TODO: prefer-local-peer-with-same-nonce-over-remote-peer
|
||||
|
@ -948,7 +936,7 @@ runPeer opts = Exception.handle (\e -> myException e
|
|||
let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do
|
||||
withPeerM env mx
|
||||
`U.withException` \e -> case fromException e of
|
||||
Just (e' :: AsyncCancelled) -> pure ()
|
||||
Just (_ :: AsyncCancelled) -> pure ()
|
||||
Nothing -> do
|
||||
err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e)
|
||||
|
||||
|
@ -1018,8 +1006,8 @@ runPeer opts = Exception.handle (\e -> myException e
|
|||
chunks <- S.toList_ $ do
|
||||
deepScan ScanDeep (const none) h (liftIO . getBlock sto) $ \ha -> do
|
||||
unless (ha == h) do
|
||||
blk <- liftIO $ getBlock sto ha
|
||||
maybe1 blk none S.yield
|
||||
blk1 <- liftIO $ getBlock sto ha
|
||||
maybe1 blk1 none S.yield
|
||||
|
||||
let box = deserialiseOrFail @(SignedBox (RefChanHeadBlock e) e) (LBS.concat chunks)
|
||||
|
||||
|
@ -1058,7 +1046,7 @@ runPeer opts = Exception.handle (\e -> myException e
|
|||
pa <- toPeerAddr p
|
||||
checkBlockAnnounce conf denv no pa (view biHash bi)
|
||||
|
||||
subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent pip nonce) -> do
|
||||
subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent{}) -> do
|
||||
-- debug $ "Got peer announce!" <+> pretty pip
|
||||
emitToPeer penv PeerAnnounceEventKey pe
|
||||
|
||||
|
@ -1087,6 +1075,7 @@ runPeer opts = Exception.handle (\e -> myException e
|
|||
let rpcSa = getRpcSocketName conf
|
||||
rpcmsg <- newMessagingUnix True 1.0 rpcSa
|
||||
|
||||
|
||||
let rpcctx = RPC2Context { rpcConfig = fromPeerConfig conf
|
||||
, rpcMessaging = rpcmsg
|
||||
, rpcPokeAnswer = pokeAnsw
|
||||
|
|
|
@ -494,7 +494,7 @@ instance (ForGossip e p (PeerM e IO)) => HasGossip e p (PeerM e IO) where
|
|||
|
||||
instance (ForGossip e p (ResponseM e m), HasGossip e p m) => HasGossip e p (ResponseM e m) where
|
||||
gossip msg = do
|
||||
that <- thatPeer (Proxy @p)
|
||||
that <- thatPeer @p
|
||||
forKnownPeers $ \pip _ -> do
|
||||
unless (that == pip) do
|
||||
request @e pip msg
|
||||
|
|
|
@ -50,7 +50,7 @@ main = do
|
|||
<> header "Raw tx test"
|
||||
)
|
||||
krData <- BS.readFile $ credentialsFile options
|
||||
creds <- pure (parseCredentials (AsCredFile krData)) `orDie` "bad keyring file"
|
||||
creds <- pure (parseCredentials @HBS2Basic (AsCredFile krData)) `orDie` "bad keyring file"
|
||||
let pubk = view peerSignPk creds
|
||||
let privk = view peerSignSk creds
|
||||
bs <- pure (fromBase58 $ BS8.pack $ tx options) `orDie` "transaction is not in Base58 format"
|
||||
|
|
|
@ -75,7 +75,7 @@ pingPongHandler :: forall e m proto . ( MonadIO m
|
|||
|
||||
pingPongHandler _ req = do
|
||||
|
||||
that <- thatPeer (Proxy @proto)
|
||||
that <- thatPeer @proto
|
||||
own <- ownPeer @e
|
||||
|
||||
case req of
|
||||
|
@ -130,7 +130,7 @@ instance HasDeferred (PingPong L4Proto) L4Proto (ResponseM L4Proto (PingPongM L
|
|||
deferred m = do
|
||||
self <- lift $ asks (view ppSelf)
|
||||
bus <- lift $ asks (view ppFab)
|
||||
who <- thatPeer (Proxy @(PingPong L4Proto))
|
||||
who <- thatPeer @(PingPong L4Proto)
|
||||
void $ liftIO $ async $ runPingPong self bus (runResponseM who m)
|
||||
|
||||
main :: IO ()
|
||||
|
|
|
@ -49,7 +49,7 @@ pingPongHandlerS :: forall e m . ( MonadIO m
|
|||
|
||||
pingPongHandlerS tv n msg = do
|
||||
|
||||
that <- thatPeer (Proxy @(PingPong e))
|
||||
that <- thatPeer @(PingPong e)
|
||||
|
||||
UIO.atomically $ UIO.modifyTVar tv ((that,msg):)
|
||||
|
||||
|
|
Loading…
Reference in New Issue