From 835a0322e0f70d5075894672c2ed0950c29b632d Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 28 Dec 2023 06:21:57 +0300 Subject: [PATCH] refchan refactoring --- CHANGELOG.md | 1 + hbs2-core/hbs2-core.cabal | 4 + hbs2-core/lib/HBS2/Actors/Peer.hs | 7 +- hbs2-core/lib/HBS2/Data/KeyRing.hs | 23 +- hbs2-core/lib/HBS2/Net/Auth/Credentials.hs | 5 +- hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs | 2 +- hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs | 5 +- hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs | 6 +- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 1 + hbs2-core/lib/HBS2/Net/Proto/Dialog.hs | 4 +- .../lib/HBS2/Net/Proto/EncryptionHandshake.hs | 9 +- hbs2-core/lib/HBS2/Net/Proto/Notify.hs | 2 +- hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 44 +- hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs | 2 +- hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs | 7 +- hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs | 4 +- hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 1062 +---------------- .../lib/HBS2/Net/Proto/RefChan/RefChanHead.hs | 77 ++ .../HBS2/Net/Proto/RefChan/RefChanNotify.hs | 102 ++ .../HBS2/Net/Proto/RefChan/RefChanUpdate.hs | 591 +++++++++ hbs2-core/lib/HBS2/Net/Proto/RefChan/Types.hs | 349 ++++++ hbs2-core/lib/HBS2/Net/Proto/RefLog.hs | 36 +- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 6 +- hbs2-peer/app/CLI/Common.hs | 12 + hbs2-peer/app/CLI/RefChan.hs | 11 - hbs2-peer/app/PeerMain.hs | 51 +- hbs2-peer/app/PeerTypes.hs | 2 +- hbs2-tests/test/TestRawTx.hs | 4 +- hbs2-tests/test/TestTCPNet.hs | 4 +- hbs2-tests/test/TestUNIX.hs | 2 +- 30 files changed, 1253 insertions(+), 1182 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanHead.hs create mode 100644 hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanNotify.hs create mode 100644 hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanUpdate.hs create mode 100644 hbs2-core/lib/HBS2/Net/Proto/RefChan/Types.hs diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..792d6005 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1 @@ +# diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 1ee888b7..e791ba97 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -127,6 +127,10 @@ library , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.RefLog , HBS2.Net.Proto.RefChan + , HBS2.Net.Proto.RefChan.Types + , HBS2.Net.Proto.RefChan.RefChanHead + , HBS2.Net.Proto.RefChan.RefChanNotify + , HBS2.Net.Proto.RefChan.RefChanUpdate , HBS2.Net.Proto.AnyRef , HBS2.Net.Proto.Types , HBS2.OrDie diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 2ff3c8a4..fff00d4b 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -452,8 +452,8 @@ runProto hh = do }) -> maybe (pure ()) (runResponseM pip . h) (decoder msg) -instance (Monad m, HasProtocol e p) => HasThatPeer e p (ResponseM e m) where - thatPeer _ = asks (view answTo) +instance (Monad m, HasProtocol e p) => HasThatPeer p e (ResponseM e m) where + thatPeer = asks (view answTo) instance HasProtocol e p => HasDeferred p e (ResponseM e (PeerM e IO)) where deferred action = do @@ -465,6 +465,7 @@ instance HasProtocol e p => HasDeferred p e (ResponseM e (PeerM e IO)) where instance ( HasProtocol e p , MonadTrans (ResponseM e) + , HasThatPeer p e (ResponseM e m) , HasStorage (PeerM e IO) , Pretty (Peer e) , PeerMessaging e @@ -475,7 +476,7 @@ instance ( HasProtocol e p response msg = do let proto = protoId @e @p (Proxy @p) - who <- thatPeer (Proxy @p) + who <- thatPeer @p self <- lift $ ownPeer @e fab <- lift $ getFabriq @e sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto (encode msg)) diff --git a/hbs2-core/lib/HBS2/Data/KeyRing.hs b/hbs2-core/lib/HBS2/Data/KeyRing.hs index ec9db965..5a797bef 100644 --- a/hbs2-core/lib/HBS2/Data/KeyRing.hs +++ b/hbs2-core/lib/HBS2/Data/KeyRing.hs @@ -14,6 +14,7 @@ import Data.ByteString qualified as BS import Lens.Micro.Platform import UnliftIO import Control.Monad.Trans.Maybe +import Data.HashSet qualified as HashSet splitPattern :: FilePath -> (FilePath, FilePath) @@ -64,22 +65,26 @@ findKeyRing fp kr = do pure (catMaybes kf) -findKeyRingEntry :: forall s m . ( MonadUnliftIO m - , SerialisedCredentials s - , ForHBS2Basic s - ) +findKeyRingEntries :: forall s m . ( MonadUnliftIO m + , SerialisedCredentials s + , Hashable (PubKey 'Encrypt s) + -- , ForHBS2Basic s + ) => [FilePattern] - -> PubKey 'Encrypt s - -> m (Maybe (KeyringEntry s)) + -> [PubKey 'Encrypt s] + -> m [KeyringEntry s] + +findKeyRingEntries fp pkl = do + + let pks = HashSet.fromList pkl -findKeyRingEntry fp pk0 = do fs <- findFilesBy fp w <- for fs $ \f -> runMaybeT do bs <- liftIO (try @_ @IOException (BS.readFile f)) >>= toMPlus krf <- parseCredentials (AsCredFile bs) & toMPlus - MaybeT $ pure $ headMay [ e | e@(KeyringEntry pk _ _) <- _peerKeyring krf, pk == pk0 ] + MaybeT $ pure $ headMay [ e | e@(KeyringEntry pk _ _) <- _peerKeyring krf, pk `HashSet.member` pks ] - pure $ headMay (catMaybes w) + pure $ catMaybes w diff --git a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs index 06e43e2b..f45f6653 100644 --- a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs +++ b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs @@ -58,6 +58,7 @@ data KeyringEntry e = pattern KeyringKeys :: forall {s} . PubKey 'Encrypt s -> PrivKey 'Encrypt s -> KeyringEntry s pattern KeyringKeys a b <- KeyringEntry a b _ +{-# COMPLETE KeyringKeys #-} deriving stock instance (Eq (PubKey 'Encrypt e), Eq (PrivKey 'Encrypt e)) => Eq (KeyringEntry e) @@ -139,8 +140,8 @@ delKeyPair (AsBase58 pks) cred = do pure $ cred & set peerKeyring rest -parseCredentials :: forall s . ( ForHBS2Basic s - , SerialisedCredentials s +parseCredentials :: forall s . ( -- ForHBS2Basic s + SerialisedCredentials s ) => AsCredFile ByteString -> Maybe (PeerCredentials s) parseCredentials (AsCredFile bs) = parseSerialisableFromBase58 bs diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs index 0f5f7e4c..252a066d 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs @@ -57,7 +57,7 @@ blockAnnounceProto :: forall e m . ( MonadIO m blockAnnounceProto = \case BlockAnnounce n info -> do - that <- thatPeer (Proxy @(BlockAnnounce e)) + that <- thatPeer @(BlockAnnounce e) emit @e BlockAnnounceInfoKey (BlockAnnounceEvent that info n) data instance EventKey e (BlockAnnounce e) = diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index 0b42b206..b36a292c 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -90,7 +90,7 @@ blockChunksProto :: forall e m proto . ( MonadIO m blockChunksProto adapter (BlockChunks c p) = do - peer <- thatPeer (Proxy @(BlockChunks e)) + peer <- thatPeer @proto auth <- find (KnownPeerKey peer) id <&> isJust case p of @@ -122,7 +122,7 @@ blockChunksProto adapter (BlockChunks c p) = do maybe (pure ()) (response_ . BlockChunk @e i) chunk BlockChunk n bs | auth -> deferred @(BlockChunks e) do - who <- thatPeer proto + who <- thatPeer @proto h <- blkGetHash adapter (who, c) maybe1 h (response_ (BlockLost @e)) $ \hh -> do @@ -140,7 +140,6 @@ blockChunksProto adapter (BlockChunks c p) = do pure () where - proto = Proxy @(BlockChunks e) response_ pt = response (BlockChunks c pt) diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index accd0bad..44149446 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -37,7 +37,7 @@ blockSizeProto getBlockSize evHasBlock onNoBlock = \case GetBlockSize h -> do -- liftIO $ print "GetBlockSize" - p <- thatPeer (Proxy @(BlockInfo e)) + p <- thatPeer @proto auth <- find (KnownPeerKey p) id <&> isJust when auth do deferred @proto $ do @@ -48,12 +48,12 @@ blockSizeProto getBlockSize evHasBlock onNoBlock = response (NoBlock @e h) NoBlock h -> do - that <- thatPeer (Proxy @(BlockInfo e)) + that <- thatPeer @proto emit @e (BlockSizeEventKey h) (NoBlockEvent that) evHasBlock ( that, h, Nothing ) BlockSize h sz -> do - that <- thatPeer (Proxy @(BlockInfo e)) + that <- thatPeer @proto emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz)) evHasBlock ( that, h, Just sz ) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 9502bd4b..edd8881f 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -191,6 +191,7 @@ instance HasProtocol L4Proto (RefChanNotify L4Proto) where -- возьмем пока 10 секунд requestPeriodLim = NoLimit + instance HasProtocol L4Proto (DialReq L4Proto) where type instance ProtocolId (DialReq L4Proto) = 96000 type instance Encoded L4Proto = ByteString diff --git a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs index 925980bb..7be6918b 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs @@ -84,7 +84,7 @@ dialReqProto :: forall e s m . -> DialReq e -> m () dialReqProto adapter = unDialReq >>> \frames -> do - peer <- thatPeer dialReqProtoProxy + peer <- thatPeer @(DialReq e) -- let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e)) -- dialReqEnv = DialogRequestEnv @@ -100,8 +100,6 @@ dialReqProto adapter = unDialReq >>> \frames -> do liftIO $ (dialReqProtoAdapterDApp adapter) frames replyToPeerIO - where - dialReqProtoProxy = Proxy @(DialReq e) --- diff --git a/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs b/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs index 2532e2e2..78155603 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/EncryptionHandshake.hs @@ -76,7 +76,7 @@ data EncryptionHandshakeAdapter e m s = EncryptionHandshakeAdapter } -encryptionHandshakeProto :: forall e s m . +encryptionHandshakeProto :: forall e s m proto . ( MonadIO m , Response e (EncryptionHandshake e) m , Request e (EncryptionHandshake e) m @@ -91,6 +91,7 @@ encryptionHandshakeProto :: forall e s m . , PubKey 'Encrypt s ~ Encrypt.PublicKey , Show (PubKey 'Sign s) , Show (Nonce ()) + , proto ~ EncryptionHandshake e ) => EncryptionHandshakeAdapter e m s -> EncryptionHandshake e @@ -99,7 +100,7 @@ encryptionHandshakeProto :: forall e s m . encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case ResetEncryptionKeys -> do - peer <- thatPeer proto + peer <- thatPeer @proto mpeerData <- find (KnownPeerKey peer) id -- TODO: check theirsign trace $ "ENCRYPTION EHSP ResetEncryptionKeys from" <+> viaShow (peer, mpeerData) @@ -112,7 +113,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case sendBeginEncryptionExchange @e creds ourpubkey peer BeginEncryptionExchange theirsign theirpubkey -> do - peer <- thatPeer proto + peer <- thatPeer @proto mpeerData <- find (KnownPeerKey peer) id -- TODO: check theirsign @@ -137,7 +138,7 @@ encryptionHandshakeProto EncryptionHandshakeAdapter{..} = \case encHandshake_considerPeerAsymmKey peer (Just theirpubkey) AckEncryptionExchange theirsign theirpubkey -> do - peer <- thatPeer proto + peer <- thatPeer @proto mpeerData <- find (KnownPeerKey peer) id -- TODO: check theirsign diff --git a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs index 948d6e65..a4a90004 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs @@ -122,7 +122,7 @@ makeNotifyServer (NotifyEnv{..}) what = do debug "SERVER: NotifyWant" - who <- thatPeer (Proxy @(NotifyProto ev e)) + who <- thatPeer @(NotifyProto ev e) hndl <- startNotify @ev @src @m notifySource key $ \ha d -> do atomically $ writeTQueue notifyQ (ha, who, NotifyEvent key d) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index fd99f402..8e7c1881 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -91,24 +91,25 @@ newtype PeerHandshakeAdapter e m = } -peerHandShakeProto :: forall e s m . ( MonadIO m - , Response e (PeerHandshake e) m - , Request e (PeerHandshake e) m - , Sessions e (PeerHandshake e) m - , Sessions e (KnownPeer e) m - , HasNonces (PeerHandshake e) m - , HasPeerNonce e m - , Nonce (PeerHandshake e) ~ PingNonce - , Pretty (Peer e) - , EventEmitter e (PeerHandshake e) m - , EventEmitter e (ConcretePeer e) m - , HasCredentials s m - , Asymm s - , Signatures s - , Serialise (PubKey 'Encrypt (Encryption e)) - , s ~ Encryption e - , e ~ L4Proto - ) +peerHandShakeProto :: forall e s m proto . ( MonadIO m + , Response e proto m + , Request e proto m + , Sessions e proto m + , Sessions e (KnownPeer e) m + , HasNonces proto m + , HasPeerNonce e m + , Nonce proto ~ PingNonce + , Pretty (Peer e) + , EventEmitter e proto m + , EventEmitter e (ConcretePeer e) m + , HasCredentials s m + , Asymm s + , Signatures s + , Serialise (PubKey 'Encrypt (Encryption e)) + , s ~ Encryption e + , e ~ L4Proto + , proto ~ PeerHandshake e + ) => PeerHandshakeAdapter e m -> PeerEnv e -> PeerHandshake e @@ -117,7 +118,7 @@ peerHandShakeProto :: forall e s m . ( MonadIO m peerHandShakeProto adapter penv = \case PeerPing nonce -> do - pip <- thatPeer proto + pip <- thatPeer @proto -- взять свои ключи creds <- getCredentials @s @@ -137,7 +138,7 @@ peerHandShakeProto adapter penv = sendPing pip PeerPong nonce0 sign d -> do - pip <- thatPeer proto + pip <- thatPeer @proto se' <- find @e (PeerHandshakeKey (nonce0,pip)) id @@ -163,9 +164,6 @@ peerHandShakeProto adapter penv = emit AnyKnownPeerEventKey (KnownPeerEvent pip d) emit (ConcretePeerKey pip) (ConcretePeerData pip d) - where - proto = Proxy @(PeerHandshake e) - data ConcretePeer e = ConcretePeer newtype instance EventKey e (ConcretePeer e) = diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs index a2d2ad55..049cf947 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs @@ -40,7 +40,7 @@ peerAnnounceProto :: forall e m . ( MonadIO m peerAnnounceProto = \case PeerAnnounce nonce -> do - who <- thatPeer (Proxy @(PeerAnnounce e)) + who <- thatPeer @(PeerAnnounce e) emit @e PeerAnnounceEventKey (PeerAnnounceEvent who nonce) diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs index 24b8f503..f0f3c3de 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs @@ -79,12 +79,11 @@ peerExchangeProto msg = do PeerExchangePeers2 nonce pips -> peerExchangePeers2 nonce pips where - proto = Proxy @(PeerExchange e) fromPEXAddr1 = fromPeerAddr . L4Address UDP peerExchangePeers1 nonce pips = do - pip <- thatPeer proto + pip <- thatPeer @proto ok <- find (PeerExchangeKey @e nonce) id <&> isJust @@ -95,7 +94,7 @@ peerExchangeProto msg = do emit @e PeerExchangePeersKey (PeerExchangePeersData sa) peerExchangePeers2 nonce pips = do - pip <- thatPeer proto + pip <- thatPeer @proto ok <- find (PeerExchangeKey @e nonce) id <&> isJust @@ -106,7 +105,7 @@ peerExchangeProto msg = do emit @e PeerExchangePeersKey (PeerExchangePeersData sa) peerExchangeGet pex n = deferred @proto do - that <- thatPeer proto + that <- thatPeer @proto debug $ "PeerExchangeGet" <+> "from" <+> pretty that diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs index 07ff625b..a2b0a9cc 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs @@ -42,7 +42,7 @@ peerMetaProto :: forall e m proto . ( MonadIO m peerMetaProto peerMeta = \case GetPeerMeta -> do - p <- thatPeer (Proxy @(PeerMetaProto e)) + p <- thatPeer @proto auth <- find (KnownPeerKey p) id <&> isJust when auth do debug $ "PEER META: ANSWERING" <+> pretty p <+> viaShow peerMeta @@ -50,7 +50,7 @@ peerMetaProto peerMeta = response (ThePeerMeta @e peerMeta) ThePeerMeta meta -> do - that <- thatPeer (Proxy @(PeerMetaProto e)) + that <- thatPeer @proto debug $ "GOT PEER META FROM" <+> pretty that <+> viaShow meta emit @e (PeerMetaEventKey that) (PeerMetaEvent meta) diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 83f78e3a..92938a94 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -1,1054 +1,14 @@ {-# Language UndecidableInstances #-} {-# Language AllowAmbiguousTypes #-} -{-# Language TemplateHaskell #-} -{-# Language FunctionalDependencies #-} -{-# LANGUAGE ImplicitParams #-} -{-# LANGUAGE PatternSynonyms, ViewPatterns #-} -module HBS2.Net.Proto.RefChan where - -import HBS2.Prelude.Plated -import HBS2.Hash -import HBS2.Data.Detect -import HBS2.Clock -import HBS2.Net.Proto -import HBS2.Net.Auth.Credentials -import HBS2.Base58 -import HBS2.Defaults -import HBS2.Events -import HBS2.Net.Proto.Peer -import HBS2.Net.Proto.BlockAnnounce -import HBS2.Net.Proto.Sessions -import HBS2.Data.Types.Refs -import HBS2.Data.Types.SignedBox -import HBS2.Actors.Peer.Types -import HBS2.Data.Types.Peer -import HBS2.Storage - -import Data.Config.Suckless - -import HBS2.System.Logger.Simple - -import Codec.Serialise -import Control.Monad.Identity -import Control.Monad.Trans.Maybe -import Data.ByteString (ByteString) -import Data.Either -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HashMap -import Data.HashSet (HashSet) -import Data.HashSet qualified as HashSet -import Data.Maybe -import Data.Word -import Data.Text qualified as Text -import Lens.Micro.Platform -import Data.Hashable hiding (Hashed) -import Type.Reflection (someTypeRep) -import Data.Time.Clock.POSIX (getPOSIXTime) - -import UnliftIO - - -{- HLINT ignore "Use newtype instead of data" -} - -type RefChanId e = PubKey 'Sign (Encryption e) -type RefChanOwner e = PubKey 'Sign (Encryption e) -type RefChanAuthor e = PubKey 'Sign (Encryption e) - -type Weight = Integer - - -data RefChanHeadBlock e = - RefChanHeadBlockSmall - { _refChanHeadVersion :: Integer - , _refChanHeadQuorum :: Integer - , _refChanHeadWaitAccept :: Integer - , _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight - , _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e)) - } - | RefChanHeadBlock1 - { _refChanHeadVersion :: Integer - , _refChanHeadQuorum :: Integer - , _refChanHeadWaitAccept :: Integer - , _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight - , _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e)) - , _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e)) - , _refChanHeadExt :: ByteString - } - deriving stock (Generic) - -makeLenses ''RefChanHeadBlock - - - -type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e)) - , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) - , FromStringMaybe (PubKey 'Sign (Encryption e)) - , FromStringMaybe (PubKey 'Encrypt (Encryption e)) - , Signatures (Encryption e) - , Serialise (Signature (Encryption e)) - , Serialise (PubKey 'Encrypt (Encryption e)) - , Hashable (PubKey 'Encrypt (Encryption e)) - , Hashable (PubKey 'Sign (Encryption e)) - ) - - - -refChanHeadReaders :: ForRefChans e => Lens (RefChanHeadBlock e) - (RefChanHeadBlock e) - (HashSet (PubKey 'Encrypt (Encryption e))) - (HashSet (PubKey 'Encrypt (Encryption e))) - -refChanHeadReaders = lens g s - where - g (RefChanHeadBlockSmall{}) = mempty - g (RefChanHeadBlock1{..}) = _refChanHeadReaders' - s v@(RefChanHeadBlock1{}) x = v { _refChanHeadReaders' = x } - s x _ = x - -instance ForRefChans e => Serialise (RefChanHeadBlock e) - -type instance SessionData e (RefChanHeadBlock e) = RefChanHeadBlock e - -newtype instance SessionKey e (RefChanHeadBlock e) = - RefChanHeadBlockKey (RefChanHeadKey (Encryption e)) - -deriving newtype instance ForRefChans L4Proto - => Hashable (SessionKey L4Proto (RefChanHeadBlock L4Proto)) - -deriving stock instance ForRefChans L4Proto - => Eq (SessionKey L4Proto (RefChanHeadBlock L4Proto)) - --- TODO: define-expiration-time -instance Expires (SessionKey L4Proto (RefChanHeadBlock L4Proto)) where - expiresIn = const (Just defCookieTimeoutSec) - -newtype RefChanHeadKey s = RefChanHeadKey (PubKey 'Sign s) - -instance RefMetaData (RefChanHeadKey s) - -deriving stock instance IsRefPubKey s => Eq (RefChanHeadKey s) - -instance IsRefPubKey s => Hashable (RefChanHeadKey s) where - hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) - -instance IsRefPubKey s => Hashed HbSync (RefChanHeadKey s) where - hashObject (RefChanHeadKey pk) = hashObject ("refchanhead|" <> serialise pk) - -instance IsRefPubKey s => FromStringMaybe (RefChanHeadKey s) where - fromStringMay s = RefChanHeadKey <$> fromStringMay s - -instance IsRefPubKey s => IsString (RefChanHeadKey s) where - fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) - -instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanHeadKey s)) where - pretty (AsBase58 (RefChanHeadKey k)) = pretty (AsBase58 k) - -instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanHeadKey s) where - pretty (RefChanHeadKey k) = pretty (AsBase58 k) - - -newtype RefChanLogKey s = RefChanLogKey { fromRefChanLogKey :: PubKey 'Sign s } - -instance RefMetaData (RefChanLogKey s) - -deriving stock instance IsRefPubKey s => Eq (RefChanLogKey s) - -instance IsRefPubKey s => Hashable (RefChanLogKey s) where - hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) - -instance IsRefPubKey s => Hashed HbSync (RefChanLogKey s) where - hashObject (RefChanLogKey pk) = hashObject ("refchanlog|" <> serialise pk) - -instance IsRefPubKey s => FromStringMaybe (RefChanLogKey s) where - fromStringMay s = RefChanLogKey <$> fromStringMay s - -instance IsRefPubKey s => IsString (RefChanLogKey s) where - fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) - -instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanLogKey s)) where - pretty (AsBase58 (RefChanLogKey k)) = pretty (AsBase58 k) - -instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where - pretty (RefChanLogKey k) = pretty (AsBase58 k) - - --- блок головы может быть довольно большой. --- поэтому посылаем его, как merkle tree -newtype RefChanHeadBlockTran e = - RefChanHeadBlockTran { fromRefChanHeadBlockTran :: HashRef } - deriving stock (Generic) - -instance Serialise (RefChanHeadBlockTran e) - -data RefChanHead e = - RefChanHead (RefChanId e) (RefChanHeadBlockTran e) - | RefChanGetHead (RefChanId e) - deriving stock (Generic) - -instance ForRefChans e => Serialise (RefChanHead e) - - -data ProposeTran e = ProposeTran HashRef (SignedBox ByteString e) -- произвольная бинарная транзакция, - deriving stock (Generic) -- подписанная ключом **АВТОРА**, который её рассылает - -newtype AcceptTime = AcceptTime Word64 - deriving stock (Eq,Ord,Data,Generic) - deriving newtype (Enum,Num,Real,Integral) - -instance Serialise AcceptTime - -data AcceptTran e = AcceptTran1 HashRef HashRef -- ссылка на (ProposTran e) - | AcceptTran2 (Maybe AcceptTime) HashRef HashRef - deriving stock (Generic) - -acceptTime :: SimpleGetter (AcceptTran e) (Maybe AcceptTime) -acceptTime = to getter - where - getter (AcceptTran1 _ _) = Nothing - getter (AcceptTran2 a _ _) = a - -unpackAcceptTran :: AcceptTran e -> (Maybe AcceptTime, HashRef, HashRef) -unpackAcceptTran (AcceptTran1 a b) = (Nothing, a, b) -unpackAcceptTran (AcceptTran2 t a b) = (t, a, b) - -pattern AcceptTran :: Maybe AcceptTime -> HashRef -> HashRef -> AcceptTran e -pattern AcceptTran t a b <- (unpackAcceptTran -> (t, a, b)) - where - AcceptTran Nothing a b = AcceptTran1 a b - AcceptTran (Just t) a b = AcceptTran2 (Just t) a b - -instance ForRefChans e => Serialise (ProposeTran e) -instance ForRefChans e => Serialise (AcceptTran e) - -data RefChanRound e = - RefChanRound - { _refChanRoundKey :: HashRef -- ^ hash of the Propose transaction - , _refChanHeadKey :: RefChanHeadKey (Encryption e) - , _refChanRoundTTL :: TimeSpec - , _refChanRoundClosed :: TVar Bool - , _refChanRoundPropose :: TVar (Maybe (ProposeTran e)) -- ^ propose transaction itself - , _refChanRoundTrans :: TVar (HashSet HashRef) - , _refChanRoundAccepts :: TVar (HashMap (PubKey 'Sign (Encryption e)) ()) - } - deriving stock (Typeable, Generic) - -makeLenses 'RefChanRound - -newtype instance SessionKey e (RefChanRound e) = - RefChanRoundKey HashRef - deriving stock (Generic, Eq, Typeable) - deriving newtype (Pretty) - -deriving newtype instance Hashable (SessionKey e (RefChanRound e)) - -type instance SessionData e (RefChanRound e) = RefChanRound e - -instance Expires (SessionKey e (RefChanRound e)) where - expiresIn _ = Just 300 - -data instance EventKey e (RefChanRound e) = - RefChanRoundEventKey - deriving (Generic,Typeable,Eq) - -newtype instance Event e (RefChanRound e) = - RefChanRoundEvent (SessionKey e (RefChanRound e)) - deriving (Typeable,Generic) - deriving newtype (Pretty) - -instance Typeable (RefChanRound e) => Hashable (EventKey e (RefChanRound e)) where - hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) - where - p = Proxy @(RefChanRound e) - -instance EventType ( Event e (RefChanRound e) ) where - isPersistent = True - -instance Expires (EventKey e (RefChanRound e)) where - expiresIn = const Nothing - --- TODO: find-out-sure-transaction-size --- транзакция должна быть маленькая! --- хочешь что-то большое просунуть -- шли хэши. --- черт его знает, какой там останется пайлоад. --- надо посмотреть. байт, небось, 400 -data RefChanUpdate e = - Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира - | Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира - deriving stock (Generic) - -instance ForRefChans e => Serialise (RefChanUpdate e) - -data RefChanRequest e = - RefChanRequest (RefChanId e) - | RefChanResponse (RefChanId e) HashRef - deriving stock (Generic,Typeable) - -instance ForRefChans e => Serialise (RefChanRequest e) - -data instance EventKey e (RefChanRequest e) = - RefChanRequestEventKey - deriving (Generic,Typeable,Eq) - -data instance Event e (RefChanRequest e) = - RefChanRequestEvent (RefChanId e) HashRef - deriving (Typeable,Generic) - -instance EventType ( Event e (RefChanRequest e) ) where - isPersistent = True - -instance Expires (EventKey e (RefChanRequest e)) where - expiresIn = const Nothing - -instance Typeable (RefChanRequest e) => Hashable (EventKey e (RefChanRequest e)) where - hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) - where - p = Proxy @(RefChanRequest e) - - -data RefChanActionRequest = - RefChanAnnounceBlock HashRef - | RefChanFetch HashRef - deriving stock (Generic) - -instance Serialise RefChanActionRequest - --- принимается, только если соответствует текущему HEAD --- не пишется в журнал - -data RefChanNotify e = - Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора - -- довольно уместно будет добавить эти команды сюда - - -- они постоянно нужны, и это сильно упростит коммуникации - | ActionRequest (RefChanId e) RefChanActionRequest - - deriving stock (Generic) - -instance ForRefChans e => Serialise (RefChanNotify e) - -type RefChanValidateNonce e = Nonce (RefChanValidate e) - -data RefChanValidate e = - RefChanValidate - { rcvNonce :: Nonce (RefChanValidate e) - , rcvChan :: RefChanId e - , rcvData :: RefChanValidateData e - } - deriving stock (Generic) - -data RefChanValidateData e = - Validate HashRef - | Accepted HashRef - | Rejected HashRef - | Poke - deriving stock (Generic) - -instance Serialise (RefChanValidateData e) - -instance ( Serialise (PubKey 'Sign (Encryption e)) - , Serialise (Nonce (RefChanValidate e)) ) - => Serialise (RefChanValidate e) - -instance ( ForRefChans e - , Pretty (AsBase58 (Nonce (RefChanValidate e))) - , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) - ) => Pretty (RefChanValidate e) where - pretty (RefChanValidate n c d) = case d of - Validate r -> pretty "validate" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r - Accepted r -> pretty "accepted" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r - Rejected r -> pretty "rejected" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r - Poke -> pretty "poke" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) - --- FIXME: rename -data RefChanAdapter e m = - RefChanAdapter - { refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () - , refChanSubscribed :: RefChanId e -> m Bool - , refChanWriteTran :: HashRef -> m () - , refChanValidatePropose :: RefChanId e -> HashRef -> m Bool - , refChanNotifyRely :: RefChanId e -> RefChanNotify e -> m () - } - -class HasRefChanId e p | p -> e where - getRefChanId :: p -> RefChanId e - -instance HasRefChanId e (RefChanUpdate e) where - getRefChanId = \case - Propose c _ -> c - Accept c _ -> c - -instance HasRefChanId e (RefChanRequest e) where - getRefChanId = \case - RefChanRequest c -> c - RefChanResponse c _ -> c - -instance HasRefChanId e (RefChanNotify e) where - getRefChanId = \case - Notify c _ -> c - ActionRequest c _ -> c - -instance HasRefChanId e (RefChanValidate e) where - getRefChanId = rcvChan - -refChanHeadProto :: forall e s m proto . ( MonadIO m - , Request e proto m - , Response e proto m - , Request e (BlockAnnounce e) m - , HasPeerNonce e m - , HasDeferred proto e m - , IsPeerAddr e m - , Pretty (Peer e) - , Sessions e (KnownPeer e) m - , HasStorage m - , Signatures s - , IsRefPubKey s - , Pretty (AsBase58 (PubKey 'Sign s)) - , s ~ Encryption e - , proto ~ RefChanHead e - ) - => Bool - -> RefChanAdapter e m - -> RefChanHead e - -> m () - -refChanHeadProto self adapter msg = do - -- авторизовать пира - peer <- thatPeer proto - - auth <- find (KnownPeerKey peer) id <&> isJust - - no <- peerNonce @e - - void $ runMaybeT do - - guard (auth || self) - - case msg of - RefChanHead chan pkt -> do - guard =<< lift (refChanSubscribed adapter chan) - trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan) - -- TODO: notify-others-for-new-head - -- нужно ли уведомить остальных, что голова поменялась? - -- всех, от кого мы еще не получали данное сообщение - -- откуда мы знаем, от кого мы получали данное сообщение? - lift $ refChanOnHead adapter chan pkt - - RefChanGetHead chan -> deferred @proto do - trace $ "RefChanGetHead" <+> pretty self <+> pretty (AsBase58 chan) - - sto <- getStorage - ref <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan) - sz <- MaybeT $ liftIO $ hasBlock sto ref - - let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta sz ref - let announce = BlockAnnounce @e no annInfo - lift $ request peer announce - lift $ request peer (RefChanHead @e chan (RefChanHeadBlockTran (HashRef ref))) - - where - proto = Proxy @(RefChanHead e) - - -refChanUpdateProto :: forall e s m proto . ( MonadIO m - , MonadUnliftIO m - , Request e proto m - , Response e proto m - , HasDeferred proto e m - , IsPeerAddr e m - , Pretty (Peer e) - , Sessions e (KnownPeer e) m - , Sessions e (RefChanHeadBlock e) m - , Sessions e (RefChanRound e) m - , EventEmitter e (RefChanRound e) m - , HasStorage m - , HasGossip e proto m - , Signatures s - , IsRefPubKey s - , ForRefChans e - , s ~ Encryption e - , proto ~ RefChanUpdate e - ) - => Bool - -> PeerCredentials s - -> RefChanAdapter e m - -> RefChanUpdate e - -> m () - -refChanUpdateProto self pc adapter msg = do - -- авторизовать пира - peer <- thatPeer proto - - auth <- find (KnownPeerKey peer) id <&> isJust - - sto <- getStorage - - let pk = view peerSignPk pc - let sk = view peerSignSk pc - - void $ runMaybeT do - - guard (auth || self) - - -- TODO: process-each-message-only-once - -- где-то тут мы разбираемся, что такое сообщеине - -- уже отправляли и больше одного раза не реагируем - - -- У нас тут получается раунд на каждый Propose - -- Это может быть и хорошо и похо. Если очень - -- много транзакций, это плохо. Если не очень - -- то это нормально и можно обойтись без понятия - -- "блок". - -- так-то и количество proposers можно ограничить - - guard =<< lift (refChanSubscribed adapter (getRefChanId msg)) - - let h0 = hashObject @HbSync (serialise msg) - guard =<< liftIO (hasBlock sto h0 <&> isNothing) - - case msg of - Propose chan box -> do - - debug $ "RefChanUpdate/Propose" <+> pretty h0 - - deferred @proto do - - -- проверили подпись пира - (peerKey, ProposeTran headRef abox) <- MaybeT $ pure $ unboxSignedBox0 box - - -- проверили подпись автора - (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 abox - - -- итак, сначала достаём голову. как мы достаём голову? - - let refchanKey = RefChanHeadKey @s chan - h <- MaybeT $ liftIO $ getRef sto refchanKey - -- смотрим, что у нас такая же голова. - -- если нет -- значит, кто-то рассинхронизировался. - -- может быть, потом попробуем головы запросить - guard (HashRef h == headRef) - - debug $ "OMG! Got trans" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) - - -- теперь достаём голову - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey - - let pips = view refChanHeadPeers headBlock - - guard $ checkACL headBlock (Just peerKey) authorKey - - debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) - - -- TODO: validate-transaction - -- итак, как нам валидировать транзакцию? - -- HTTP ? - -- TCP ? - -- UDP ? (кстати, годно и быстро) - -- CLI ? - -- получается, риалтайм: ждём не более X секунд валидации, - -- иначе не валидируем. - -- по хорошему, не блокироваться бы в запросе. - -- тут мы зависим от состояния конвейра, нас можно DDoS-ить - -- большим количеством запросов и транзакции будут отклоняться - -- при большом потоке. - -- но решается это.. тадам! PoW! подбором красивых хэшей - -- при увеличении нагрузки. - -- тогда, правда, в открытой системе работает паттерн -- DDoS - -- всех, кроме своих узлов, а свои узлы всё принимают. - - -- для начала: сделаем хук для валидации, которыйне будет делать ничего - - -- если не смогли сохранить транзу, то и Accept разослать - -- не сможем - -- это правильно, так как транза содержит ссылку на RefChanId - -- следовательно, для другого рефчана будет другая транза - - hash <- MaybeT $ liftIO $ putBlock sto (serialise msg) - - ts <- liftIO getTimeCoarse - - let toWait = TimeoutSec (fromIntegral $ 2 * view refChanHeadWaitAccept headBlock) - let ttl = ts + fromNanoSecs (fromIntegral $ toNanoSeconds toWait) - - let rcrk = RefChanRoundKey (HashRef hash) - - rndHere <- lift $ find rcrk id - - defRound <- RefChanRound @e (HashRef hash) refchanKey ttl - <$> newTVarIO False - <*> newTVarIO Nothing - <*> newTVarIO (HashSet.singleton (HashRef hash)) -- save propose - <*> newTVarIO (HashMap.singleton peerKey ()) - - unless (isJust rndHere) do - lift $ update defRound rcrk id - lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk) - - -- не обрабатывать propose, если он уже в процессе - guard (isNothing rndHere) - - -- FIXME: fixed-timeout-is-no-good - validated <- either id id <$> lift ( race (pause @'Seconds 5 >> pure False) - $ refChanValidatePropose adapter chan (HashRef hash) - ) - -- почему так: - -- мы можем тормозить в проверке транзакции, - -- другие пиры могут работать быстрее и от них - -- может прийти accept. - -- так что раунд всё равно нужно завести, - -- даже если транза не очень. - - unless validated do - maybe1 rndHere none $ \rnd -> do - atomically $ writeTVar (view refChanRoundClosed rnd) True - liftIO $ delBlock sto hash - - guard validated - - debug $ "TRANS VALIDATED" <+> pretty (AsBase58 chan) <+> pretty hash - - lift $ gossip msg - - -- проверить, что мы вообще авторизованы - -- рассылать ACCEPT - - guard ( pk `HashMap.member` pips ) - - -- если нет - то и всё, просто перешлём - -- по госсипу исходную транзу - - ts <- liftIO getPOSIXTime <&> round <&> Just - let tran = AcceptTran ts headRef (HashRef hash) - - -- -- генерируем Accept - let accept = Accept chan (makeSignedBox @e pk sk tran) - - -- -- и рассылаем всем - debug "GOSSIP ACCEPT TRANSACTION" - lift $ gossip accept - - -- -- рассылаем ли себе? что бы был хоть один accept - lift $ refChanUpdateProto True pc adapter accept - - Accept chan box -> deferred @proto do - - -- что если получили ACCEPT раньше PROPOSE ? - -- что если PROPOSE еще обрабатывается? - -- надо, короче, блокироваться и ждать тут Propose - -- но если блокироваться --- то конвейр вообще - -- может встать. что делать? - -- - - debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 - - (peerKey, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box - - let refchanKey = RefChanHeadKey @s chan - - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey - - h <- MaybeT $ liftIO $ getRef sto refchanKey - - guard (HashRef h == headRef) - - lift $ gossip msg - - -- тут может так случиться, что propose еще нет - -- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы - -- почти одновременно. ну или не успело записаться. и что делать? - - -- вот прямо тут надо ждать, пока придёт / завершится Propose - -- -- или до таймаута - - let afterPropose = runMaybeT do - - here <- fix \next -> do - blk <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust - if blk then - pure blk - else do - pause @'Seconds 0.25 - next - - unless here do - warn $ "No propose transaction saved yet!" <+> pretty hashRef - - tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) - - tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just - - - proposed <- MaybeT $ pure $ case tran of - Propose _ pbox -> Just pbox - _ -> Nothing - - - (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed - - debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 - - -- compiler bug? - let (ProposeTran _ pbox) = ptran - - (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox - - -- может, и не надо второй раз проверять - guard $ checkACL headBlock (Just peerKey) authorKey - - debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef - - rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id - - atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) - - -- TODO: garbage-collection-strongly-required - ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) - - atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) - -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? - - accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size - - -- FIXME: why-accepts-quorum-on-failed-proposal? - - debug $ "ACCEPTS:" <+> pretty accepts - - closed <- readTVarIO (view refChanRoundClosed rcRound) - - -- FIXME: round! - when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do - debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef - - trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList - - forM_ trans $ \t -> do - lift $ refChanWriteTran adapter t - debug $ "WRITING TRANS" <+> pretty t - - let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList - votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys - - debug $ "PIPS" <+> pretty (HashSet.toList pips & fmap AsBase58) - debug $ "VOTES" <+> pretty (HashSet.toList votes & fmap AsBase58) - - when (pips `HashSet.isSubsetOf` votes) do - debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans) - atomically $ writeTVar (view refChanRoundClosed rcRound) True - - -- мы не можем ждать / поллить в deferred потому, - -- что мы так забьем конвейр - там сейчас всего 8 - -- воркеров, и 8 параллельных ждущих запросов - -- все остановят. - - let w = TimeoutSec (realToFrac $ view refChanHeadWaitAccept headBlock) - void $ lift $ race ( pause (2 * w) ) afterPropose - - where - proto = Proxy @(RefChanUpdate e) - - -checkACL :: forall e s . (Encryption e ~ s, ForRefChans e) - => RefChanHeadBlock e - -> Maybe (PubKey 'Sign s) - -> PubKey 'Sign s - -> Bool - -checkACL theHead mbPeerKey authorKey = match - where - pips = view refChanHeadPeers theHead - aus = view refChanHeadAuthors theHead - match = maybe True (`HashMap.member` pips) mbPeerKey - && authorKey `HashSet.member` aus - --- TODO: refchan-poll-proto --- Запрашиваем refchan у всех. --- Пишем в итоговый лог только такие --- propose + accept у которых больше quorum accept --- каждую транзу обрабатываем только один раз --- - -refChanRequestProto :: forall e s m . ( MonadIO m - , Request e (RefChanRequest e) m - , Response e (RefChanRequest e) m - , HasDeferred (RefChanRequest e) e m - , IsPeerAddr e m - , Pretty (Peer e) - , Sessions e (KnownPeer e) m - , Sessions e (RefChanHeadBlock e) m - , EventEmitter e (RefChanRequest e) m - , HasStorage m - , Signatures s - , IsRefPubKey s - , Pretty (AsBase58 (PubKey 'Sign s)) - , ForRefChans e - , s ~ Encryption e - ) - => Bool - -> RefChanAdapter e m - -> RefChanRequest e - -> m () - -refChanRequestProto self adapter msg = do - - peer <- thatPeer proto - - auth' <- find (KnownPeerKey peer) id - - sto <- getStorage - - void $ runMaybeT do - - guard (self || isJust auth') - - auth <- MaybeT $ pure auth' - - guard =<< lift (refChanSubscribed adapter (getRefChanId @e msg)) - - case msg of - - RefChanRequest chan -> do - rv <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan) - lift $ response @e (RefChanResponse @e chan (HashRef rv)) - - RefChanResponse chan val -> do - hd <- MaybeT $ getActualRefChanHead @e (RefChanHeadKey @s chan) - let ppk = view peerSignKey auth - - guard $ ppk `HashMap.member` view refChanHeadPeers hd - - lift $ emit RefChanRequestEventKey (RefChanRequestEvent @e chan val) - debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val - - where - proto = Proxy @(RefChanRequest e) - - -newtype instance EventKey e (RefChanNotify e) = - RefChanNotifyEventKey (RefChanId e) - -deriving stock instance ForRefChans e => Typeable (EventKey e (RefChanNotify e)) -deriving stock instance ForRefChans e => Eq (EventKey e (RefChanNotify e)) -deriving newtype instance ForRefChans e => Hashable (EventKey e (RefChanNotify e)) - -data instance Event e (RefChanNotify e) = - RefChanNotifyEvent HashRef (RefChanNotify e) - --- FIXME: remove-default-instance? -instance EventType (Event e (RefChanNotify e)) where - isPersistent = True - -instance Expires (EventKey e (RefChanNotify e)) where - expiresIn = const Nothing -- (Just defCookieTimeoutSec) - - - -refChanNotifyProto :: forall e s m proto . ( MonadIO m - , Request e proto m - , Response e proto m - , HasRefChanId e proto - , HasDeferred proto e m - , HasGossip e proto m - , IsPeerAddr e m - , Pretty (Peer e) - , Sessions e (RefChanHeadBlock e) m - , Sessions e (KnownPeer e) m - , EventEmitter e proto m - , HasStorage m - , Signatures s - , IsRefPubKey s - , ForRefChans e - , s ~ Encryption e - , proto ~ RefChanNotify e - ) - => Bool - -> RefChanAdapter e m - -> RefChanNotify e - -> m () - -refChanNotifyProto self adapter msg@(ActionRequest rchan a) = do - debug $ "RefChanNotify ACTION REQUEST" - pure () - -refChanNotifyProto self adapter msg@(Notify rchan box) = do - -- аутентифицируем - -- проверяем ACL - -- пересылаем всем - - sto <- getStorage - - peer <- thatPeer proto - - let h0 = hashObject @HbSync (serialise msg) - - auth <- find (KnownPeerKey peer) id <&> isJust - - void $ runMaybeT do - - guard =<< lift (refChanSubscribed adapter rchan) - - guard (self || auth) - - debug $ "&&& refChanNotifyProto" <+> pretty self - - deferred @proto do - - guard =<< liftIO (hasBlock sto h0 <&> isNothing) - - (authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box - - let refchanKey = RefChanHeadKey @s rchan - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey - - guard $ checkACL headBlock Nothing authorKey - - -- FIXME: garbage-collection-required - liftIO $ putBlock sto (serialise msg) - - -- теперь пересылаем по госсипу - lift $ gossip msg - - -- FIXME: remove-debug - let h1 = hashObject @HbSync (serialise box) - debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 <+> pretty h1 - - -- тут надо заслать во внешнее приложение, - -- равно как и в остальных refchan-протоколах - - unless self do - debug $ "^^^ CALL refChanNotifyRely" <+> pretty h0 - lift $ refChanNotifyRely adapter rchan msg - - lift $ emit @e (RefChanNotifyEventKey rchan) (RefChanNotifyEvent (HashRef h0) msg) - - where - proto = Proxy @(RefChanNotify e) - - -getActualRefChanHead :: forall e s m . ( MonadIO m - , Sessions e (RefChanHeadBlock e) m - , HasStorage m - , Signatures s - , IsRefPubKey s - -- , Pretty (AsBase58 (PubKey 'Sign s)) - -- , Serialise (Signature s) - , ForRefChans e - , HasStorage m - , s ~ Encryption e - ) - => RefChanHeadKey s - -> m (Maybe (RefChanHeadBlock e)) - -getActualRefChanHead key = do - sto <- getStorage - - runMaybeT do - mbHead <- do - lift $ find @e (RefChanHeadBlockKey key) id - - case mbHead of - Just hd -> do - debug "HEAD DISCOVERED" - pure hd - - Nothing -> do - headblk <- MaybeT $ getRefChanHead sto key - debug "HEAD FOUND" - pure headblk - -getRefChanHead :: forall e s m . ( MonadIO m - , s ~ Encryption e - , ForRefChans e - , Signatures s - ) - => AnyStorage - -> RefChanHeadKey s - -> m (Maybe (RefChanHeadBlock e)) - -getRefChanHead sto k = runMaybeT do - h <- MaybeT $ liftIO $ getRef sto k - hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h) - (_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob - pure headblk - -makeProposeTran :: forall e s m . ( MonadIO m - , ForRefChans e - , Signatures (Encryption e) - , HasStorage m - , s ~ Encryption e - ) - => PeerCredentials s - -> RefChanId e - -> SignedBox ByteString e - -> m (Maybe (SignedBox (ProposeTran e) e)) - -makeProposeTran creds chan box1 = do - sto <- getStorage - runMaybeT do - h <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan) - let tran = ProposeTran @e (HashRef h) box1 - let pk = view peerSignPk creds - let sk = view peerSignSk creds - pure $ makeSignedBox @e pk sk tran - - -instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where - - fromStringMay str = - case readers of - [] -> RefChanHeadBlockSmall <$> version - <*> quorum - <*> wait - <*> pure (HashMap.fromList peers) - <*> pure (HashSet.fromList authors) - - rs -> RefChanHeadBlock1 <$> version - <*> quorum - <*> wait - <*> pure (HashMap.fromList peers) - <*> pure (HashSet.fromList authors) - <*> pure (HashSet.fromList rs) - <*> pure mempty - - where - parsed = parseTop str & fromRight mempty - version = lastMay [ n | (ListVal [SymbolVal "version", LitIntVal n] ) <- parsed ] - quorum = lastMay [ n | (ListVal [SymbolVal "quorum", LitIntVal n] ) <- parsed ] - wait = lastMay [ n | (ListVal [SymbolVal "wait", LitIntVal n] ) <- parsed ] - - peers = catMaybes [ (,) <$> fromStringMay (Text.unpack s) <*> pure w - | (ListVal [SymbolVal "peer", LitStrVal s, LitIntVal w] ) <- parsed - ] - - authors = catMaybes [ fromStringMay (Text.unpack s) - | (ListVal [SymbolVal "author", LitStrVal s] ) <- parsed - ] - - readers = catMaybes [ fromStringMay @(PubKey 'Encrypt (Encryption e)) (Text.unpack s) - | (ListVal [SymbolVal "reader", LitStrVal s] ) <- parsed - ] - -instance (ForRefChans e - , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) - , Pretty (AsBase58 (PubKey 'Encrypt (Encryption e))) - ) => Pretty (RefChanHeadBlock e) where - pretty blk = parens ("version" <+> pretty (view refChanHeadVersion blk)) <> line - <> - parens ("quorum" <+> pretty (view refChanHeadQuorum blk)) <> line - <> - parens ("wait" <+> pretty (view refChanHeadWaitAccept blk)) <> line - <> - vcat (fmap peer (HashMap.toList $ view refChanHeadPeers blk)) <> line - <> - vcat (fmap author (HashSet.toList $ view refChanHeadAuthors blk)) <> line - <> - vcat (fmap reader (HashSet.toList $ view refChanHeadReaders blk)) <> line - - where - peer (p,w) = parens ("peer" <+> dquotes (pretty (AsBase58 p)) <+> pretty w) - author p = parens ("author" <+> dquotes (pretty (AsBase58 p))) - reader p = parens ("reader" <+> dquotes (pretty (AsBase58 p))) - - --- FIXME: reconnect-validator-client-after-restart --- почему-то сейчас если рестартовать пира, --- но не растартовать валидатор --- то не получится --- повторно соединиться с валидатором. +module HBS2.Net.Proto.RefChan + ( module HBS2.Net.Proto.RefChan.Types + , module HBS2.Net.Proto.RefChan.RefChanHead + , module HBS2.Net.Proto.RefChan.RefChanUpdate + , module HBS2.Net.Proto.RefChan.RefChanNotify + ) where + +import HBS2.Net.Proto.RefChan.Types +import HBS2.Net.Proto.RefChan.RefChanHead +import HBS2.Net.Proto.RefChan.RefChanUpdate +import HBS2.Net.Proto.RefChan.RefChanNotify diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanHead.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanHead.hs new file mode 100644 index 00000000..0c636cc7 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanHead.hs @@ -0,0 +1,77 @@ +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +module HBS2.Net.Proto.RefChan.RefChanHead where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Net.Auth.Credentials +import HBS2.Base58 +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.BlockAnnounce +import HBS2.Net.Proto.Sessions +import HBS2.Data.Types.Refs +import HBS2.Storage + +import HBS2.Net.Proto.RefChan.Types + +import HBS2.System.Logger.Simple + +import Control.Monad.Trans.Maybe +import Data.Maybe + + +refChanHeadProto :: forall e s m proto . ( MonadIO m + , Request e proto m + , Response e proto m + , Request e (BlockAnnounce e) m + , HasPeerNonce e m + , HasDeferred proto e m + , IsPeerAddr e m + , Pretty (Peer e) + , Sessions e (KnownPeer e) m + , HasStorage m + , Signatures s + , IsRefPubKey s + , s ~ Encryption e + , proto ~ RefChanHead e + ) + => Bool + -> RefChanAdapter e m + -> RefChanHead e + -> m () + +refChanHeadProto self adapter msg = do + -- авторизовать пира + peer <- thatPeer @proto + + auth <- find (KnownPeerKey peer) id <&> isJust + + no <- peerNonce @e + + void $ runMaybeT do + + guard (auth || self) + + case msg of + RefChanHead chan pkt -> do + guard =<< lift (refChanSubscribed adapter chan) + trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan) + -- TODO: notify-others-for-new-head + -- нужно ли уведомить остальных, что голова поменялась? + -- всех, от кого мы еще не получали данное сообщение + -- откуда мы знаем, от кого мы получали данное сообщение? + lift $ refChanOnHead adapter chan pkt + + RefChanGetHead chan -> deferred @proto do + trace $ "RefChanGetHead" <+> pretty self <+> pretty (AsBase58 chan) + + sto <- getStorage + ref <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan) + sz <- MaybeT $ liftIO $ hasBlock sto ref + + let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta sz ref + let announce = BlockAnnounce @e no annInfo + lift $ request peer announce + lift $ request peer (RefChanHead @e chan (RefChanHeadBlockTran (HashRef ref))) + + diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanNotify.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanNotify.hs new file mode 100644 index 00000000..18bc7c6d --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanNotify.hs @@ -0,0 +1,102 @@ +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +module HBS2.Net.Proto.RefChan.RefChanNotify where + + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Net.Proto +import HBS2.Net.Auth.Credentials +import HBS2.Events +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Data.Types.Refs +import HBS2.Data.Types.SignedBox +import HBS2.Actors.Peer.Types +import HBS2.Storage + +import HBS2.Net.Proto.RefChan.Types + +import HBS2.System.Logger.Simple + +import Control.Monad.Trans.Maybe +import Data.Maybe + +refChanNotifyProto :: forall e s m proto . ( MonadIO m + , Request e proto m + , Response e proto m + , HasRefChanId e proto + , HasDeferred proto e m + , HasGossip e proto m + , IsPeerAddr e m + , Pretty (Peer e) + , Sessions e (RefChanHeadBlock e) m + , Sessions e (KnownPeer e) m + , EventEmitter e proto m + , HasStorage m + , Signatures s + , IsRefPubKey s + , ForRefChans e + , proto ~ RefChanNotify e + , s ~ Encryption e + ) + => Bool + -> RefChanAdapter e m + -> RefChanNotify e + -> m () + +refChanNotifyProto self adapter msg@(ActionRequest rchan a) = do + debug $ "RefChanNotify ACTION REQUEST" + pure () + +refChanNotifyProto self adapter msg@(Notify rchan box) = do + -- аутентифицируем + -- проверяем ACL + -- пересылаем всем + + sto <- getStorage + + peer <- thatPeer @proto + + let h0 = hashObject @HbSync (serialise msg) + + auth <- find (KnownPeerKey peer) id <&> isJust + + void $ runMaybeT do + + guard =<< lift (refChanSubscribed adapter rchan) + + guard (self || auth) + + debug $ "&&& refChanNotifyProto" <+> pretty self + + deferred @proto do + + guard =<< liftIO (hasBlock sto h0 <&> isNothing) + + (authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box + + let refchanKey = RefChanHeadKey @s rchan + headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + + guard $ checkACL headBlock Nothing authorKey + + -- FIXME: garbage-collection-required + liftIO $ putBlock sto (serialise msg) + + -- теперь пересылаем по госсипу + lift $ gossip msg + + -- FIXME: remove-debug + let h1 = hashObject @HbSync (serialise box) + debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 <+> pretty h1 + + -- тут надо заслать во внешнее приложение, + -- равно как и в остальных refchan-протоколах + + unless self do + debug $ "^^^ CALL refChanNotifyRely" <+> pretty h0 + lift $ refChanNotifyRely adapter rchan msg + + lift $ emit @e (RefChanNotifyEventKey rchan) (RefChanNotifyEvent (HashRef h0) msg) + diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanUpdate.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanUpdate.hs new file mode 100644 index 00000000..d0d8a8d0 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan/RefChanUpdate.hs @@ -0,0 +1,591 @@ +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +{-# Language TemplateHaskell #-} +{-# Language FunctionalDependencies #-} +{-# LANGUAGE ImplicitParams #-} +{-# LANGUAGE PatternSynonyms, ViewPatterns #-} +module HBS2.Net.Proto.RefChan.RefChanUpdate where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Clock +import HBS2.Net.Proto +import HBS2.Net.Auth.Credentials +import HBS2.Base58 +import HBS2.Events +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Data.Types.Refs +import HBS2.Data.Types.SignedBox +import HBS2.Actors.Peer.Types +import HBS2.Data.Types.Peer +import HBS2.Storage + +import HBS2.Net.Proto.RefChan.Types + +import HBS2.System.Logger.Simple + +import Codec.Serialise +import Control.Monad.Identity +import Control.Monad.Trans.Maybe +import Data.ByteString (ByteString) +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet +import Data.Maybe +import Data.Word +import Lens.Micro.Platform +import Data.Hashable hiding (Hashed) +import Type.Reflection (someTypeRep) +import Data.Time.Clock.POSIX (getPOSIXTime) + +import UnliftIO + +data ProposeTran e = ProposeTran HashRef (SignedBox ByteString e) -- произвольная бинарная транзакция, + deriving stock (Generic) -- подписанная ключом **АВТОРА**, который её рассылает + +newtype AcceptTime = AcceptTime Word64 + deriving stock (Eq,Ord,Data,Generic) + deriving newtype (Enum,Num,Real,Integral) + +instance Serialise AcceptTime + +data AcceptTran e = AcceptTran1 HashRef HashRef -- ссылка на (ProposTran e) + | AcceptTran2 (Maybe AcceptTime) HashRef HashRef + deriving stock (Generic) + +acceptTime :: SimpleGetter (AcceptTran e) (Maybe AcceptTime) +acceptTime = to getter + where + getter (AcceptTran1 _ _) = Nothing + getter (AcceptTran2 a _ _) = a + +unpackAcceptTran :: AcceptTran e -> (Maybe AcceptTime, HashRef, HashRef) +unpackAcceptTran (AcceptTran1 a b) = (Nothing, a, b) +unpackAcceptTran (AcceptTran2 t a b) = (t, a, b) + +pattern AcceptTran :: Maybe AcceptTime -> HashRef -> HashRef -> AcceptTran e +pattern AcceptTran t a b <- (unpackAcceptTran -> (t, a, b)) + where + AcceptTran Nothing a b = AcceptTran1 a b + AcceptTran (Just t) a b = AcceptTran2 (Just t) a b + +instance ForRefChans e => Serialise (ProposeTran e) +instance ForRefChans e => Serialise (AcceptTran e) + +data RefChanRound e = + RefChanRound + { _refChanRoundKey :: HashRef -- ^ hash of the Propose transaction + , _refChanHeadKey :: RefChanHeadKey (Encryption e) + , _refChanRoundTTL :: TimeSpec + , _refChanRoundClosed :: TVar Bool + , _refChanRoundPropose :: TVar (Maybe (ProposeTran e)) -- ^ propose transaction itself + , _refChanRoundTrans :: TVar (HashSet HashRef) + , _refChanRoundAccepts :: TVar (HashMap (PubKey 'Sign (Encryption e)) ()) + } + deriving stock (Typeable, Generic) + +makeLenses 'RefChanRound + +newtype instance SessionKey e (RefChanRound e) = + RefChanRoundKey HashRef + deriving stock (Generic, Eq, Typeable) + deriving newtype (Pretty) + +deriving newtype instance Hashable (SessionKey e (RefChanRound e)) + +type instance SessionData e (RefChanRound e) = RefChanRound e + +instance Expires (SessionKey e (RefChanRound e)) where + expiresIn _ = Just 300 + +data instance EventKey e (RefChanRound e) = + RefChanRoundEventKey + deriving (Generic,Typeable,Eq) + +newtype instance Event e (RefChanRound e) = + RefChanRoundEvent (SessionKey e (RefChanRound e)) + deriving (Typeable,Generic) + deriving newtype (Pretty) + +instance Typeable (RefChanRound e) => Hashable (EventKey e (RefChanRound e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(RefChanRound e) + +instance EventType ( Event e (RefChanRound e) ) where + isPersistent = True + +instance Expires (EventKey e (RefChanRound e)) where + expiresIn = const Nothing + +-- TODO: find-out-sure-transaction-size +-- транзакция должна быть маленькая! +-- хочешь что-то большое просунуть -- шли хэши. +-- черт его знает, какой там останется пайлоад. +-- надо посмотреть. байт, небось, 400 +data RefChanUpdate e = + Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира + | Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира + deriving stock (Generic) + +instance ForRefChans e => Serialise (RefChanUpdate e) + +data RefChanRequest e = + RefChanRequest (RefChanId e) + | RefChanResponse (RefChanId e) HashRef + deriving stock (Generic,Typeable) + +instance ForRefChans e => Serialise (RefChanRequest e) + +data instance EventKey e (RefChanRequest e) = + RefChanRequestEventKey + deriving (Generic,Typeable,Eq) + +data instance Event e (RefChanRequest e) = + RefChanRequestEvent (RefChanId e) HashRef + deriving (Typeable,Generic) + +instance EventType ( Event e (RefChanRequest e) ) where + isPersistent = True + +instance Expires (EventKey e (RefChanRequest e)) where + expiresIn = const Nothing + +instance Typeable (RefChanRequest e) => Hashable (EventKey e (RefChanRequest e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(RefChanRequest e) + + + +type RefChanValidateNonce e = Nonce (RefChanValidate e) + +data RefChanValidate e = + RefChanValidate + { rcvNonce :: Nonce (RefChanValidate e) + , rcvChan :: RefChanId e + , rcvData :: RefChanValidateData e + } + deriving stock (Generic) + +data RefChanValidateData e = + Validate HashRef + | Accepted HashRef + | Rejected HashRef + | Poke + deriving stock (Generic) + +instance Serialise (RefChanValidateData e) + +instance ( Serialise (PubKey 'Sign (Encryption e)) + , Serialise (Nonce (RefChanValidate e)) ) + => Serialise (RefChanValidate e) + +instance ( ForRefChans e + , Pretty (AsBase58 (Nonce (RefChanValidate e))) + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + ) => Pretty (RefChanValidate e) where + pretty (RefChanValidate n c d) = case d of + Validate r -> pretty "validate" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r + Accepted r -> pretty "accepted" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r + Rejected r -> pretty "rejected" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r + Poke -> pretty "poke" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) + + + +instance HasRefChanId e (RefChanUpdate e) where + getRefChanId = \case + Propose c _ -> c + Accept c _ -> c + +instance HasRefChanId e (RefChanRequest e) where + getRefChanId = \case + RefChanRequest c -> c + RefChanResponse c _ -> c + + +instance HasRefChanId e (RefChanValidate e) where + getRefChanId = rcvChan + + +refChanUpdateProto :: forall e s m proto . ( MonadUnliftIO m + , Request e proto m + , Response e proto m + , HasDeferred proto e m + , IsPeerAddr e m + , Pretty (Peer e) + , Sessions e (KnownPeer e) m + , Sessions e (RefChanHeadBlock e) m + , Sessions e (RefChanRound e) m + , EventEmitter e (RefChanRound e) m + , HasStorage m + , HasGossip e proto m + , Signatures s + , IsRefPubKey s + , Pretty (AsBase58 (PubKey 'Sign s)) + , ForRefChans e + , s ~ Encryption e + , proto ~ RefChanUpdate e + ) + => Bool + -> PeerCredentials s + -> RefChanAdapter e m + -> RefChanUpdate e + -> m () + +refChanUpdateProto self pc adapter msg = do + -- авторизовать пира + peer <- thatPeer @proto + + auth <- find (KnownPeerKey peer) id <&> isJust + + sto <- getStorage + + let pk = view peerSignPk pc + let sk = view peerSignSk pc + + void $ runMaybeT do + + guard (auth || self) + + -- TODO: process-each-message-only-once + -- где-то тут мы разбираемся, что такое сообщеине + -- уже отправляли и больше одного раза не реагируем + + -- У нас тут получается раунд на каждый Propose + -- Это может быть и хорошо и похо. Если очень + -- много транзакций, это плохо. Если не очень + -- то это нормально и можно обойтись без понятия + -- "блок". + -- так-то и количество proposers можно ограничить + + guard =<< lift (refChanSubscribed adapter (getRefChanId msg)) + + let h0 = hashObject @HbSync (serialise msg) + guard =<< liftIO (hasBlock sto h0 <&> isNothing) + + case msg of + Propose chan box -> do + + debug $ "RefChanUpdate/Propose" <+> pretty h0 + + deferred @proto do + + -- проверили подпись пира + (peerKey, ProposeTran headRef abox) <- MaybeT $ pure $ unboxSignedBox0 box + + -- проверили подпись автора + (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 abox + + -- итак, сначала достаём голову. как мы достаём голову? + + let refchanKey = RefChanHeadKey @s chan + h <- MaybeT $ liftIO $ getRef sto refchanKey + -- смотрим, что у нас такая же голова. + -- если нет -- значит, кто-то рассинхронизировался. + -- может быть, потом попробуем головы запросить + guard (HashRef h == headRef) + + debug $ "OMG! Got trans" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) + + -- теперь достаём голову + headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + + let pips = view refChanHeadPeers headBlock + + guard $ checkACL headBlock (Just peerKey) authorKey + + debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) + + -- TODO: validate-transaction + -- итак, как нам валидировать транзакцию? + -- HTTP ? + -- TCP ? + -- UDP ? (кстати, годно и быстро) + -- CLI ? + -- получается, риалтайм: ждём не более X секунд валидации, + -- иначе не валидируем. + -- по хорошему, не блокироваться бы в запросе. + -- тут мы зависим от состояния конвейра, нас можно DDoS-ить + -- большим количеством запросов и транзакции будут отклоняться + -- при большом потоке. + -- но решается это.. тадам! PoW! подбором красивых хэшей + -- при увеличении нагрузки. + -- тогда, правда, в открытой системе работает паттерн -- DDoS + -- всех, кроме своих узлов, а свои узлы всё принимают. + + -- для начала: сделаем хук для валидации, которыйне будет делать ничего + + -- если не смогли сохранить транзу, то и Accept разослать + -- не сможем + -- это правильно, так как транза содержит ссылку на RefChanId + -- следовательно, для другого рефчана будет другая транза + + hash <- MaybeT $ liftIO $ putBlock sto (serialise msg) + + ts <- liftIO getTimeCoarse + + let toWait = TimeoutSec (fromIntegral $ 2 * view refChanHeadWaitAccept headBlock) + let ttl = ts + fromNanoSecs (fromIntegral $ toNanoSeconds toWait) + + let rcrk = RefChanRoundKey (HashRef hash) + + rndHere <- lift $ find rcrk id + + defRound <- RefChanRound @e (HashRef hash) refchanKey ttl + <$> newTVarIO False + <*> newTVarIO Nothing + <*> newTVarIO (HashSet.singleton (HashRef hash)) -- save propose + <*> newTVarIO (HashMap.singleton peerKey ()) + + unless (isJust rndHere) do + lift $ update defRound rcrk id + lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk) + + -- не обрабатывать propose, если он уже в процессе + guard (isNothing rndHere) + + -- FIXME: fixed-timeout-is-no-good + validated <- either id id <$> lift ( race (pause @'Seconds 5 >> pure False) + $ refChanValidatePropose adapter chan (HashRef hash) + ) + -- почему так: + -- мы можем тормозить в проверке транзакции, + -- другие пиры могут работать быстрее и от них + -- может прийти accept. + -- так что раунд всё равно нужно завести, + -- даже если транза не очень. + + unless validated do + maybe1 rndHere none $ \rnd -> do + atomically $ writeTVar (view refChanRoundClosed rnd) True + liftIO $ delBlock sto hash + + guard validated + + debug $ "TRANS VALIDATED" <+> pretty (AsBase58 chan) <+> pretty hash + + lift $ gossip msg + + -- проверить, что мы вообще авторизованы + -- рассылать ACCEPT + + guard ( pk `HashMap.member` pips ) + + -- если нет - то и всё, просто перешлём + -- по госсипу исходную транзу + + ts <- liftIO getPOSIXTime <&> round <&> Just + let tran = AcceptTran ts headRef (HashRef hash) + + -- -- генерируем Accept + let accept = Accept chan (makeSignedBox @e pk sk tran) + + -- -- и рассылаем всем + debug "GOSSIP ACCEPT TRANSACTION" + lift $ gossip accept + + -- -- рассылаем ли себе? что бы был хоть один accept + lift $ refChanUpdateProto True pc adapter accept + + Accept chan box -> deferred @proto do + + -- что если получили ACCEPT раньше PROPOSE ? + -- что если PROPOSE еще обрабатывается? + -- надо, короче, блокироваться и ждать тут Propose + -- но если блокироваться --- то конвейр вообще + -- может встать. что делать? + -- + + debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 + + (peerKey, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box + + let refchanKey = RefChanHeadKey @s chan + + headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + + h <- MaybeT $ liftIO $ getRef sto refchanKey + + guard (HashRef h == headRef) + + lift $ gossip msg + + -- тут может так случиться, что propose еще нет + -- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы + -- почти одновременно. ну или не успело записаться. и что делать? + + -- вот прямо тут надо ждать, пока придёт / завершится Propose + -- -- или до таймаута + + let afterPropose = runMaybeT do + + here <- fix \next -> do + blk <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust + if blk then + pure blk + else do + pause @'Seconds 0.25 + next + + unless here do + warn $ "No propose transaction saved yet!" <+> pretty hashRef + + tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) + + tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just + + + proposed <- MaybeT $ pure $ case tran of + Propose _ pbox -> Just pbox + _ -> Nothing + + + (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed + + debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 + + -- compiler bug? + let (ProposeTran _ pbox) = ptran + + (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox + + -- может, и не надо второй раз проверять + guard $ checkACL headBlock (Just peerKey) authorKey + + debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef + + rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id + + atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) + + -- TODO: garbage-collection-strongly-required + ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) + + atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) + -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? + + accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size + + -- FIXME: why-accepts-quorum-on-failed-proposal? + + debug $ "ACCEPTS:" <+> pretty accepts + + closed <- readTVarIO (view refChanRoundClosed rcRound) + + -- FIXME: round! + when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do + debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef + + trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList + + forM_ trans $ \t -> do + lift $ refChanWriteTran adapter t + debug $ "WRITING TRANS" <+> pretty t + + let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList + votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys + + debug $ "PIPS" <+> pretty (HashSet.toList pips & fmap AsBase58) + debug $ "VOTES" <+> pretty (HashSet.toList votes & fmap AsBase58) + + when (pips `HashSet.isSubsetOf` votes) do + debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans) + atomically $ writeTVar (view refChanRoundClosed rcRound) True + + -- мы не можем ждать / поллить в deferred потому, + -- что мы так забьем конвейр - там сейчас всего 8 + -- воркеров, и 8 параллельных ждущих запросов + -- все остановят. + + let w = TimeoutSec (realToFrac $ view refChanHeadWaitAccept headBlock) + void $ lift $ race ( pause (2 * w) ) afterPropose + + +-- TODO: refchan-poll-proto +-- Запрашиваем refchan у всех. +-- Пишем в итоговый лог только такие +-- propose + accept у которых больше quorum accept +-- каждую транзу обрабатываем только один раз +-- + +refChanRequestProto :: forall e s m proto . ( MonadIO m + , Request e proto m + , Response e proto m + , HasDeferred proto e m + , IsPeerAddr e m + , Pretty (Peer e) + , Sessions e (KnownPeer e) m + , Sessions e (RefChanHeadBlock e) m + , EventEmitter e proto m + , HasStorage m + , Signatures s + , IsRefPubKey s + , ForRefChans e + , s ~ Encryption e + , proto ~ RefChanRequest e + ) + => Bool + -> RefChanAdapter e m + -> RefChanRequest e + -> m () + +refChanRequestProto self adapter msg = do + + peer <- thatPeer @proto + + auth' <- find (KnownPeerKey peer) id + + sto <- getStorage + + void $ runMaybeT do + + guard (self || isJust auth') + + auth <- MaybeT $ pure auth' + + guard =<< lift (refChanSubscribed adapter (getRefChanId @e msg)) + + case msg of + + RefChanRequest chan -> do + rv <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan) + lift $ response @e (RefChanResponse @e chan (HashRef rv)) + + RefChanResponse chan val -> do + hd <- MaybeT $ getActualRefChanHead @e (RefChanHeadKey @s chan) + let ppk = view peerSignKey auth + + guard $ ppk `HashMap.member` view refChanHeadPeers hd + + lift $ emit RefChanRequestEventKey (RefChanRequestEvent @e chan val) + debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val + + +makeProposeTran :: forall e s m . ( MonadIO m + , ForRefChans e + , Signatures (Encryption e) + , HasStorage m + , s ~ Encryption e + ) + => PeerCredentials s + -> RefChanId e + -> SignedBox ByteString e + -> m (Maybe (SignedBox (ProposeTran e) e)) + +makeProposeTran creds chan box1 = do + sto <- getStorage + runMaybeT do + h <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan) + let tran = ProposeTran @e (HashRef h) box1 + let pk = view peerSignPk creds + let sk = view peerSignSk creds + pure $ makeSignedBox @e pk sk tran + +-- FIXME: reconnect-validator-client-after-restart +-- почему-то сейчас если рестартовать пира, +-- но не растартовать валидатор --- то не получится +-- повторно соединиться с валидатором. + diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan/Types.hs new file mode 100644 index 00000000..1e39268c --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan/Types.hs @@ -0,0 +1,349 @@ +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +{-# Language TemplateHaskell #-} +{-# Language FunctionalDependencies #-} +{-# LANGUAGE ImplicitParams #-} +{-# LANGUAGE PatternSynonyms, ViewPatterns #-} +module HBS2.Net.Proto.RefChan.Types where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Data.Detect +import HBS2.Clock +import HBS2.Net.Proto +import HBS2.Net.Auth.Credentials +import HBS2.Base58 +import HBS2.Defaults +import HBS2.Events +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Data.Types.Refs +import HBS2.Data.Types.SignedBox +import HBS2.Storage + +import Data.Config.Suckless + +import HBS2.System.Logger.Simple + +import Control.Monad.Trans.Maybe +import Data.ByteString (ByteString) +import Data.Either +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet +import Data.Maybe +import Data.Text qualified as Text +import Lens.Micro.Platform +import Data.Hashable hiding (Hashed) + + +{- HLINT ignore "Use newtype instead of data" -} + +type RefChanId e = PubKey 'Sign (Encryption e) +type RefChanOwner e = PubKey 'Sign (Encryption e) +type RefChanAuthor e = PubKey 'Sign (Encryption e) + +type Weight = Integer + +data RefChanHeadBlock e = + RefChanHeadBlockSmall + { _refChanHeadVersion :: Integer + , _refChanHeadQuorum :: Integer + , _refChanHeadWaitAccept :: Integer + , _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight + , _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e)) + } + | RefChanHeadBlock1 + { _refChanHeadVersion :: Integer + , _refChanHeadQuorum :: Integer + , _refChanHeadWaitAccept :: Integer + , _refChanHeadPeers :: HashMap (PubKey 'Sign (Encryption e)) Weight + , _refChanHeadAuthors :: HashSet (PubKey 'Sign (Encryption e)) + , _refChanHeadReaders' :: HashSet (PubKey 'Encrypt (Encryption e)) + , _refChanHeadExt :: ByteString + } + deriving stock (Generic) + +makeLenses ''RefChanHeadBlock + +data RefChanActionRequest = + RefChanAnnounceBlock HashRef + | RefChanFetch HashRef + deriving stock (Generic) + +instance Serialise RefChanActionRequest + +data RefChanNotify e = + Notify (RefChanId e) (SignedBox ByteString e) -- подписано ключом автора + -- довольно уместно будет добавить эти команды сюда - + -- они постоянно нужны, и это сильно упростит коммуникации + | ActionRequest (RefChanId e) RefChanActionRequest + + deriving stock (Generic) + +instance ForRefChans e => Serialise (RefChanNotify e) + +newtype instance EventKey e (RefChanNotify e) = + RefChanNotifyEventKey (RefChanId e) + +deriving stock instance ForRefChans e => Typeable (EventKey e (RefChanNotify e)) +deriving stock instance ForRefChans e => Eq (EventKey e (RefChanNotify e)) +deriving newtype instance ForRefChans e => Hashable (EventKey e (RefChanNotify e)) + +data instance Event e (RefChanNotify e) = + RefChanNotifyEvent HashRef (RefChanNotify e) + +-- FIXME: remove-default-instance? +instance EventType (Event e (RefChanNotify e)) where + isPersistent = True + +instance Expires (EventKey e (RefChanNotify e)) where + expiresIn = const Nothing -- (Just defCookieTimeoutSec) + + + +type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e)) + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + , FromStringMaybe (PubKey 'Sign (Encryption e)) + , FromStringMaybe (PubKey 'Encrypt (Encryption e)) + , Signatures (Encryption e) + , Serialise (Signature (Encryption e)) + , Serialise (PubKey 'Encrypt (Encryption e)) + , Hashable (PubKey 'Encrypt (Encryption e)) + , Hashable (PubKey 'Sign (Encryption e)) + ) + + + +refChanHeadReaders :: ForRefChans e + => Lens (RefChanHeadBlock e) + (RefChanHeadBlock e) + (HashSet (PubKey 'Encrypt (Encryption e))) + (HashSet (PubKey 'Encrypt (Encryption e))) + +refChanHeadReaders = lens g s + where + g (RefChanHeadBlockSmall{}) = mempty + g (RefChanHeadBlock1{..}) = _refChanHeadReaders' + s v@(RefChanHeadBlock1{}) x = v { _refChanHeadReaders' = x } + s x _ = x + +instance ForRefChans e => Serialise (RefChanHeadBlock e) + +type instance SessionData e (RefChanHeadBlock e) = RefChanHeadBlock e + +newtype instance SessionKey e (RefChanHeadBlock e) = + RefChanHeadBlockKey (RefChanHeadKey (Encryption e)) + +deriving newtype instance ForRefChans L4Proto + => Hashable (SessionKey L4Proto (RefChanHeadBlock L4Proto)) + +deriving stock instance ForRefChans L4Proto + => Eq (SessionKey L4Proto (RefChanHeadBlock L4Proto)) + +-- TODO: define-expiration-time +instance Expires (SessionKey L4Proto (RefChanHeadBlock L4Proto)) where + expiresIn = const (Just defCookieTimeoutSec) + +newtype RefChanHeadKey s = RefChanHeadKey (PubKey 'Sign s) + +instance RefMetaData (RefChanHeadKey s) + +deriving stock instance IsRefPubKey s => Eq (RefChanHeadKey s) + +instance IsRefPubKey s => Hashable (RefChanHeadKey s) where + hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) + +instance IsRefPubKey s => Hashed HbSync (RefChanHeadKey s) where + hashObject (RefChanHeadKey pk) = hashObject ("refchanhead|" <> serialise pk) + +instance IsRefPubKey s => FromStringMaybe (RefChanHeadKey s) where + fromStringMay s = RefChanHeadKey <$> fromStringMay s + +instance IsRefPubKey s => IsString (RefChanHeadKey s) where + fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanHeadKey s)) where + pretty (AsBase58 (RefChanHeadKey k)) = pretty (AsBase58 k) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanHeadKey s) where + pretty (RefChanHeadKey k) = pretty (AsBase58 k) + + +newtype RefChanLogKey s = RefChanLogKey { fromRefChanLogKey :: PubKey 'Sign s } + +instance RefMetaData (RefChanLogKey s) + +deriving stock instance IsRefPubKey s => Eq (RefChanLogKey s) + +instance IsRefPubKey s => Hashable (RefChanLogKey s) where + hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) + +instance IsRefPubKey s => Hashed HbSync (RefChanLogKey s) where + hashObject (RefChanLogKey pk) = hashObject ("refchanlog|" <> serialise pk) + +instance IsRefPubKey s => FromStringMaybe (RefChanLogKey s) where + fromStringMay s = RefChanLogKey <$> fromStringMay s + +instance IsRefPubKey s => IsString (RefChanLogKey s) where + fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanLogKey s)) where + pretty (AsBase58 (RefChanLogKey k)) = pretty (AsBase58 k) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanLogKey s) where + pretty (RefChanLogKey k) = pretty (AsBase58 k) + +instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where + + fromStringMay str = + case readers of + [] -> RefChanHeadBlockSmall <$> version + <*> quorum + <*> wait + <*> pure (HashMap.fromList peers) + <*> pure (HashSet.fromList authors) + + rs -> RefChanHeadBlock1 <$> version + <*> quorum + <*> wait + <*> pure (HashMap.fromList peers) + <*> pure (HashSet.fromList authors) + <*> pure (HashSet.fromList rs) + <*> pure mempty + + where + parsed = parseTop str & fromRight mempty + version = lastMay [ n | (ListVal [SymbolVal "version", LitIntVal n] ) <- parsed ] + quorum = lastMay [ n | (ListVal [SymbolVal "quorum", LitIntVal n] ) <- parsed ] + wait = lastMay [ n | (ListVal [SymbolVal "wait", LitIntVal n] ) <- parsed ] + + peers = catMaybes [ (,) <$> fromStringMay (Text.unpack s) <*> pure w + | (ListVal [SymbolVal "peer", LitStrVal s, LitIntVal w] ) <- parsed + ] + + authors = catMaybes [ fromStringMay (Text.unpack s) + | (ListVal [SymbolVal "author", LitStrVal s] ) <- parsed + ] + + readers = catMaybes [ fromStringMay @(PubKey 'Encrypt (Encryption e)) (Text.unpack s) + | (ListVal [SymbolVal "reader", LitStrVal s] ) <- parsed + ] + +instance (ForRefChans e + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + , Pretty (AsBase58 (PubKey 'Encrypt (Encryption e))) + ) => Pretty (RefChanHeadBlock e) where + pretty blk = parens ("version" <+> pretty (view refChanHeadVersion blk)) <> line + <> + parens ("quorum" <+> pretty (view refChanHeadQuorum blk)) <> line + <> + parens ("wait" <+> pretty (view refChanHeadWaitAccept blk)) <> line + <> + vcat (fmap peer (HashMap.toList $ view refChanHeadPeers blk)) <> line + <> + vcat (fmap author (HashSet.toList $ view refChanHeadAuthors blk)) <> line + <> + vcat (fmap reader (HashSet.toList $ view refChanHeadReaders blk)) <> line + + where + peer (p,w) = parens ("peer" <+> dquotes (pretty (AsBase58 p)) <+> pretty w) + author p = parens ("author" <+> dquotes (pretty (AsBase58 p))) + reader p = parens ("reader" <+> dquotes (pretty (AsBase58 p))) + + +-- блок головы может быть довольно большой. +-- поэтому посылаем его, как merkle tree +newtype RefChanHeadBlockTran e = + RefChanHeadBlockTran { fromRefChanHeadBlockTran :: HashRef } + deriving stock (Generic) + +instance Serialise (RefChanHeadBlockTran e) + +data RefChanHead e = + RefChanHead (RefChanId e) (RefChanHeadBlockTran e) + | RefChanGetHead (RefChanId e) + deriving stock (Generic) + +instance ForRefChans e => Serialise (RefChanHead e) + +-- FIXME: rename +data RefChanAdapter e m = + RefChanAdapter + { refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () + , refChanSubscribed :: RefChanId e -> m Bool + , refChanWriteTran :: HashRef -> m () + , refChanValidatePropose :: RefChanId e -> HashRef -> m Bool + , refChanNotifyRely :: RefChanId e -> RefChanNotify e -> m () + } + +class HasRefChanId e p | p -> e where + getRefChanId :: p -> RefChanId e + +instance HasRefChanId e (RefChanNotify e) where + getRefChanId = \case + Notify c _ -> c + ActionRequest c _ -> c + + +getActualRefChanHead :: forall e s m . ( MonadIO m + , Sessions e (RefChanHeadBlock e) m + , HasStorage m + , Signatures s + , IsRefPubKey s + -- , Pretty (AsBase58 (PubKey 'Sign s)) + -- , Serialise (Signature s) + , ForRefChans e + , HasStorage m + , s ~ Encryption e + ) + => RefChanHeadKey s + -> m (Maybe (RefChanHeadBlock e)) + +getActualRefChanHead key = do + sto <- getStorage + + runMaybeT do + mbHead <- do + lift $ find @e (RefChanHeadBlockKey key) id + + case mbHead of + Just hd -> do + debug "HEAD DISCOVERED" + pure hd + + Nothing -> do + headblk <- MaybeT $ getRefChanHead sto key + debug "HEAD FOUND" + pure headblk + +getRefChanHead :: forall e s m . ( MonadIO m + , s ~ Encryption e + , ForRefChans e + , Signatures s + ) + => AnyStorage + -> RefChanHeadKey s + -> m (Maybe (RefChanHeadBlock e)) + +getRefChanHead sto k = runMaybeT do + h <- MaybeT $ liftIO $ getRef sto k + hdblob <- MaybeT $ readBlobFromTree ( getBlock sto ) (HashRef h) + (_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob + pure headblk + + +checkACL :: forall e s . (Encryption e ~ s, ForRefChans e) + => RefChanHeadBlock e + -> Maybe (PubKey 'Sign s) + -> PubKey 'Sign s + -> Bool + +checkACL theHead mbPeerKey authorKey = match + where + pips = view refChanHeadPeers theHead + aus = view refChanHeadAuthors theHead + match = maybe True (`HashMap.member` pips) mbPeerKey + && authorKey `HashSet.member` aus + diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs b/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs index a0e303d5..4fd368d3 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs @@ -162,21 +162,21 @@ data RefLogRequestI e m = , isRefLogSubscribed :: PubKey 'Sign (Encryption e) -> m Bool } -refLogRequestProto :: forall e s m . ( MonadIO m - , Request e (RefLogRequest e) m - , Response e (RefLogRequest e) m - , HasDeferred (RefLogRequest e) e m - , Sessions e (KnownPeer e) m - , IsPeerAddr e m - , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) - , EventEmitter e (RefLogRequestAnswer e) m - , Pretty (Peer e) - , s ~ Encryption e - ) +refLogRequestProto :: forall e s m proto . ( MonadIO m + , Request e proto m + , Response e proto m + , Sessions e (KnownPeer e) m + , IsPeerAddr e m + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + , EventEmitter e (RefLogRequestAnswer e) m + , Pretty (Peer e) + , s ~ Encryption e + , proto ~ RefLogRequest e + ) => RefLogRequestI e m -> RefLogRequest e -> m () refLogRequestProto adapter cmd = do - p <- thatPeer proto + p <- thatPeer @proto void $ runMaybeT do @@ -186,20 +186,17 @@ refLogRequestProto adapter cmd = do case cmd of (RefLogRequest pk) -> lift do trace $ "got RefLogUpdateRequest for" <+> pretty (AsBase58 pk) - pip <- thatPeer proto + pip <- thatPeer @proto answ' <- onRefLogRequest adapter (pip,pk) maybe1 answ' none $ \answ -> do response (RefLogResponse @e pk answ) (RefLogResponse pk h) -> lift do trace $ "got RefLogResponse for" <+> pretty (AsBase58 pk) <+> pretty h - pip <- thatPeer proto + pip <- thatPeer @proto emit RefLogReqAnswerKey (RefLogReqAnswerData @e pk h) onRefLogResponse adapter (pip,pk,h) - where - proto = Proxy @(RefLogRequest e) - refLogUpdateProto :: forall e s m proto . ( MonadIO m , Request e proto m , Response e proto m @@ -220,7 +217,7 @@ refLogUpdateProto :: forall e s m proto . ( MonadIO m refLogUpdateProto = \case e@RefLogUpdate{} -> do - p <- thatPeer proto + p <- thatPeer @proto auth <- find (KnownPeerKey p) id <&> isJust when auth do @@ -237,9 +234,6 @@ refLogUpdateProto = emit @e RefLogUpdateEvKey (RefLogUpdateEvData (pubk, e, Just p)) gossip e - where - proto = Proxy @(RefLogUpdate e) - instance ( Serialise (PubKey 'Sign (Encryption e)) , Serialise (Nonce (RefLogUpdate e)) , Serialise (Signature (Encryption e)) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 23458c71..0a449c7e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -111,8 +111,8 @@ class ( Eq (PeerAddr e) fromPeerAddr :: PeerAddr e -> m (Peer e) -- FIXME: type-application-instead-of-proxy -class (Monad m, HasProtocol e p) => HasThatPeer e p (m :: Type -> Type) where - thatPeer :: Proxy p -> m (Peer e) +class (Monad m, HasProtocol e p) => HasThatPeer p e (m :: Type -> Type) where + thatPeer :: m (Peer e) class (MonadIO m, HasProtocol e p) => HasDeferred p e m | p -> e where deferred :: m () -> m () @@ -123,7 +123,7 @@ instance (HasDeferred p e m, Monad m) => HasDeferred p e (MaybeT m) where class ( MonadIO m , HasProtocol e p - , HasThatPeer e p m + , HasThatPeer p e m ) => Response e p m | p -> e where response :: p -> m () diff --git a/hbs2-peer/app/CLI/Common.hs b/hbs2-peer/app/CLI/Common.hs index 5975eb81..60013607 100644 --- a/hbs2-peer/app/CLI/Common.hs +++ b/hbs2-peer/app/CLI/Common.hs @@ -10,6 +10,7 @@ import PeerConfig import HBS2.Peer.RPC.Client.Unix +import Options.Applicative import Data.Kind import Lens.Micro.Platform import UnliftIO @@ -45,4 +46,15 @@ withRPCMessaging o action = do pause @'Seconds 0.05 cancel m1 +rpcOpt :: Parser String +rpcOpt = strOption ( short 'r' <> long "rpc" + <> help "addr:port" ) +-- FIXME: options-duped-with-peer-main +confOpt :: Parser FilePath +confOpt = strOption ( long "config" <> short 'c' <> help "config" ) + +pRpcCommon :: Parser RPCOpt +pRpcCommon = do + RPCOpt <$> optional confOpt + <*> optional rpcOpt diff --git a/hbs2-peer/app/CLI/RefChan.hs b/hbs2-peer/app/CLI/RefChan.hs index e79c115d..c8b191bb 100644 --- a/hbs2-peer/app/CLI/RefChan.hs +++ b/hbs2-peer/app/CLI/RefChan.hs @@ -94,18 +94,7 @@ pRefChanHeadDump= do print $ pretty hdblk --- FIXME: options-duped-with-peer-main -confOpt :: Parser FilePath -confOpt = strOption ( long "config" <> short 'c' <> help "config" ) -rpcOpt :: Parser String -rpcOpt = strOption ( short 'r' <> long "rpc" - <> help "addr:port" ) - -pRpcCommon :: Parser RPCOpt -pRpcCommon = do - RPCOpt <$> optional confOpt - <*> optional rpcOpt pRefChanHeadPost :: Parser (IO ()) pRefChanHeadPost = do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 182cf476..b351f995 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -244,12 +244,6 @@ runCLI = do -- <> command "dial" (info pDialog (progDesc "dialog commands")) ) - confOpt = strOption ( long "config" <> short 'c' <> help "config" ) - - rpcOpt = strOption ( short 'r' <> long "rpc" - <> help "addr:port" ) - - common = do pref <- optional $ strOption ( short 'p' <> long "prefix" <> help "storage prefix" ) @@ -283,10 +277,6 @@ runCLI = do pRun = do runPeer <$> common - pRpcCommon = do - RPCOpt <$> optional confOpt - <*> optional rpcOpt - pDie = do rpc <- pRpcCommon pure $ withMyRPC @PeerAPI rpc $ \caller -> do @@ -535,8 +525,8 @@ instance (Monad m, s ~ Encryption e) => HasCredentials s (CredentialsM e s m) wh instance (Monad m, s ~ Encryption e) => HasCredentials s (ResponseM e (CredentialsM e s m)) where getCredentials = lift getCredentials -instance (Monad m, HasThatPeer e p m, s ~ Encryption e) => HasThatPeer e p (CredentialsM e s m) where - thatPeer = lift . thatPeer +instance (Monad m, HasThatPeer p e m, s ~ Encryption e) => HasThatPeer p e (CredentialsM e s m) where + thatPeer = lift (thatPeer @p) instance ( EventEmitter e p m ) => EventEmitter e p (CredentialsM e s m) where @@ -768,7 +758,7 @@ runPeer opts = Exception.handle (\e -> myException e , refChanValidatePropose = refChanValidateTranFn @e rce , refChanNotifyRely = \r u -> do - debug "refChanNotifyRely MOTHERFUCKER!" + trace "refChanNotifyRely!" refChanNotifyRelyFn @e rce r u case u of Notify rr s -> do @@ -893,8 +883,6 @@ runPeer opts = Exception.handle (\e -> myException e $ knownPeers @e pl >>= mapM \pip -> fmap (, pip) <$> find (KnownPeerKey pip) (view peerOwnNonce) - let proto1 = view sockType p - case Map.lookup thatNonce pdkv of -- TODO: prefer-local-peer-with-same-nonce-over-remote-peer @@ -948,7 +936,7 @@ runPeer opts = Exception.handle (\e -> myException e let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do withPeerM env mx `U.withException` \e -> case fromException e of - Just (e' :: AsyncCancelled) -> pure () + Just (_ :: AsyncCancelled) -> pure () Nothing -> do err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) @@ -1018,8 +1006,8 @@ runPeer opts = Exception.handle (\e -> myException e chunks <- S.toList_ $ do deepScan ScanDeep (const none) h (liftIO . getBlock sto) $ \ha -> do unless (ha == h) do - blk <- liftIO $ getBlock sto ha - maybe1 blk none S.yield + blk1 <- liftIO $ getBlock sto ha + maybe1 blk1 none S.yield let box = deserialiseOrFail @(SignedBox (RefChanHeadBlock e) e) (LBS.concat chunks) @@ -1058,7 +1046,7 @@ runPeer opts = Exception.handle (\e -> myException e pa <- toPeerAddr p checkBlockAnnounce conf denv no pa (view biHash bi) - subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent pip nonce) -> do + subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent{}) -> do -- debug $ "Got peer announce!" <+> pretty pip emitToPeer penv PeerAnnounceEventKey pe @@ -1087,19 +1075,20 @@ runPeer opts = Exception.handle (\e -> myException e let rpcSa = getRpcSocketName conf rpcmsg <- newMessagingUnix True 1.0 rpcSa + let rpcctx = RPC2Context { rpcConfig = fromPeerConfig conf - , rpcMessaging = rpcmsg - , rpcPokeAnswer = pokeAnsw - , rpcPeerEnv = penv - , rpcLocalMultiCast = localMulticast - , rpcStorage = AnyStorage s - , rpcBrains = SomeBrains brains - , rpcByPassInfo = liftIO (getStat byPass) - , rpcDoFetch = liftIO . fetchHash penv denv - , rpcDoRefChanHeadPost = refChanHeadPostAction - , rpcDoRefChanPropose = refChanProposeAction - , rpcDoRefChanNotify = refChanNotifyAction - } + , rpcMessaging = rpcmsg + , rpcPokeAnswer = pokeAnsw + , rpcPeerEnv = penv + , rpcLocalMultiCast = localMulticast + , rpcStorage = AnyStorage s + , rpcBrains = SomeBrains brains + , rpcByPassInfo = liftIO (getStat byPass) + , rpcDoFetch = liftIO . fetchHash penv denv + , rpcDoRefChanHeadPost = refChanHeadPostAction + , rpcDoRefChanPropose = refChanProposeAction + , rpcDoRefChanNotify = refChanNotifyAction + } m1 <- async $ runMessagingUnix rpcmsg diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 2112802c..f87c4491 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -494,7 +494,7 @@ instance (ForGossip e p (PeerM e IO)) => HasGossip e p (PeerM e IO) where instance (ForGossip e p (ResponseM e m), HasGossip e p m) => HasGossip e p (ResponseM e m) where gossip msg = do - that <- thatPeer (Proxy @p) + that <- thatPeer @p forKnownPeers $ \pip _ -> do unless (that == pip) do request @e pip msg diff --git a/hbs2-tests/test/TestRawTx.hs b/hbs2-tests/test/TestRawTx.hs index 68cb0263..528d0cba 100644 --- a/hbs2-tests/test/TestRawTx.hs +++ b/hbs2-tests/test/TestRawTx.hs @@ -50,7 +50,7 @@ main = do <> header "Raw tx test" ) krData <- BS.readFile $ credentialsFile options - creds <- pure (parseCredentials (AsCredFile krData)) `orDie` "bad keyring file" + creds <- pure (parseCredentials @HBS2Basic (AsCredFile krData)) `orDie` "bad keyring file" let pubk = view peerSignPk creds let privk = view peerSignSk creds bs <- pure (fromBase58 $ BS8.pack $ tx options) `orDie` "transaction is not in Base58 format" @@ -67,6 +67,6 @@ main = do case statusCode (getResponseStatus resp) of 200 -> do - let r = LBS.unpack $ getResponseBody resp + let r = LBS.unpack $ getResponseBody resp print r s -> print $ "error: status " <> show s diff --git a/hbs2-tests/test/TestTCPNet.hs b/hbs2-tests/test/TestTCPNet.hs index 70113749..5d1aab25 100644 --- a/hbs2-tests/test/TestTCPNet.hs +++ b/hbs2-tests/test/TestTCPNet.hs @@ -75,7 +75,7 @@ pingPongHandler :: forall e m proto . ( MonadIO m pingPongHandler _ req = do - that <- thatPeer (Proxy @proto) + that <- thatPeer @proto own <- ownPeer @e case req of @@ -130,7 +130,7 @@ instance HasDeferred (PingPong L4Proto) L4Proto (ResponseM L4Proto (PingPongM L deferred m = do self <- lift $ asks (view ppSelf) bus <- lift $ asks (view ppFab) - who <- thatPeer (Proxy @(PingPong L4Proto)) + who <- thatPeer @(PingPong L4Proto) void $ liftIO $ async $ runPingPong self bus (runResponseM who m) main :: IO () diff --git a/hbs2-tests/test/TestUNIX.hs b/hbs2-tests/test/TestUNIX.hs index f5a061fd..0cf6917f 100644 --- a/hbs2-tests/test/TestUNIX.hs +++ b/hbs2-tests/test/TestUNIX.hs @@ -49,7 +49,7 @@ pingPongHandlerS :: forall e m . ( MonadIO m pingPongHandlerS tv n msg = do - that <- thatPeer (Proxy @(PingPong e)) + that <- thatPeer @(PingPong e) UIO.atomically $ UIO.modifyTVar tv ((that,msg):)