From 2f2796603a1106a49b6432867e910fa926ab451b Mon Sep 17 00:00:00 2001 From: Snail <> Date: Tue, 28 Jan 2025 15:20:11 +0400 Subject: [PATCH] Tune refchan interface, implementation --- hbs2-core/lib/HBS2/Data/Types/SignedBox.hs | 6 + hbs2-core/lib/HBS2/Net/Auth/Credentials.hs | 3 + hbs2-peer/app/PeerMain.hs | 17 ++- hbs2-peer/app/RPC2/RefChan.hs | 4 +- .../HBS2/Peer/Proto/RefChan/RefChanUpdate.hs | 110 ++++++++++++------ hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs | 8 +- hbs2-peer/lib/HBS2/Peer/RPC/Client/RefChan.hs | 2 +- hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs | 25 ++-- 8 files changed, 117 insertions(+), 58 deletions(-) diff --git a/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs b/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs index 62f4e061..77d805b1 100644 --- a/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs +++ b/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs @@ -2,6 +2,7 @@ {-# LANGUAGE AllowAmbiguousTypes #-} module HBS2.Data.Types.SignedBox where +import HBS2.Base58 import HBS2.Prelude.Plated import HBS2.Net.Proto.Types import HBS2.Net.Auth.Credentials @@ -17,6 +18,11 @@ data SignedBox p s = SignedBox (PubKey 'Sign s) ByteString (Signature s) deriving stock (Generic) +instance (Pretty (AsBase58 (PubKey 'Sign s)), Pretty (AsBase58 (Signature s))) + => Pretty (SignedBox p s) where + pretty (SignedBox k b s) = + "SignedBox" <+> pretty (AsBase58 k) <+> pretty (AsBase58 b) <+> pretty (AsBase58 s) + deriving stock instance ( Eq (PubKey 'Sign s) , Eq (Signature s) diff --git a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs index a0a6ca81..62ab29fa 100644 --- a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs +++ b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs @@ -207,6 +207,9 @@ instance ( Serialise (PeerCredentials e) -- FIXME: move-thouse-instances-to-appropriate-place-ASAP +instance Pretty (AsBase58 Sign.Signature) where + pretty (AsBase58 pk) = pretty $ B8.unpack $ toBase58 (Crypto.encode pk) + instance Pretty (AsBase58 Sign.PublicKey) where pretty (AsBase58 pk) = pretty $ B8.unpack $ toBase58 (Crypto.encode pk) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 29686bd1..3c2ff57c 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -81,6 +81,7 @@ import HBS2.Peer.Proto.LWWRef.Internal import RPC2(RPC2Context(..)) import Codec.Serialise as Serialise +import Control.Arrow (left) import Control.Concurrent (myThreadId) import Control.Concurrent.STM import Control.Exception as Exception @@ -99,6 +100,7 @@ import Data.Map qualified as Map import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) +import Data.Text qualified as T import Data.Time.Clock.POSIX import Data.Time.Format import Lens.Micro.Platform as Lens @@ -1145,11 +1147,13 @@ runPeer opts = Exception.handle (\e -> myException e let refChanProposeAction (puk, box) = do debug $ "rpc.reChanPropose" <+> pretty (AsBase58 puk) - void $ liftIO $ withPeerM penv $ do - me <- ownPeer @e - runMaybeT do - proposed <- MaybeT $ makeProposeTran @e pc puk box - lift $ runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed) + r <- liftIO $ fmap (left (T.pack . show @SomeException)) $ try $ withPeerM penv do + me <- ownPeer @e + proposed <- maybe (liftIO $ throwIO MakeProposeTranError) pure + =<< makeProposeTran @e pc puk box + runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed) + debug $ "rpc.reChanPropose ok" <+> pretty (AsBase58 puk) <+> pretty box + pure r -- NOTE: moved-to-rpc let refChanNotifyAction (puk, box) = do @@ -1248,6 +1252,9 @@ runPeer opts = Exception.handle (\e -> myException e -- we want to clean up all resources throwM GoAgainException +data MakeProposeTranError = MakeProposeTranError deriving (Show) +instance Exception MakeProposeTranError + emitToPeer :: ( MonadIO m , EventEmitter e a (PeerM e IO) ) diff --git a/hbs2-peer/app/RPC2/RefChan.hs b/hbs2-peer/app/RPC2/RefChan.hs index 7a0078e5..e2ddcc9c 100644 --- a/hbs2-peer/app/RPC2/RefChan.hs +++ b/hbs2-peer/app/RPC2/RefChan.hs @@ -20,6 +20,7 @@ import HBS2.Peer.RPC.Internal.Types import PeerTypes +import Control.Arrow (left) import Control.Monad.Reader type RefChanContext m = (MonadIO m, HasRpcContext RefChanAPI RPC2Context m) @@ -70,7 +71,8 @@ instance RefChanContext m => HandleMethod m RpcRefChanPropose where handleMethod (puk, box) = do co <- getRpcContext @RefChanAPI debug $ "rpc.refChanNotifyAction" <+> pretty (AsBase58 puk) - liftIO $ rpcDoRefChanPropose co (puk, box) + liftIO $ left RefChanAPIError <$> do + rpcDoRefChanPropose co (puk, box) instance RefChanContext m => HandleMethod m RpcRefChanNotify where diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs index 0d693a6a..c95ec508 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs @@ -39,6 +39,7 @@ import Data.HashSet (HashSet) import Data.HashSet qualified as HashSet import Data.Maybe import Data.Word +import Data.Text qualified as Text import Lens.Micro.Platform import Data.Hashable hiding (Hashed) import Type.Reflection (someTypeRep) @@ -240,7 +241,7 @@ refChanUpdateProto :: forall e s m proto . ( MonadUnliftIO m -> RefChanUpdate e -> m () -refChanUpdateProto self pc adapter msg = do +refChanUpdateProto self pc adapter msg = flip withException (\e -> liftIO (print (e :: SomeException))) do -- авторизовать пира peer <- thatPeer @proto @@ -251,9 +252,9 @@ refChanUpdateProto self pc adapter msg = do let pk = view peerSignPk pc let sk = view peerSignSk pc - void $ runMaybeT do + do - guard (auth || self) + guard' "auth || self" (auth || self) -- TODO: process-each-message-only-once -- где-то тут мы разбираемся, что такое сообщеине @@ -266,13 +267,13 @@ refChanUpdateProto self pc adapter msg = do -- "блок". -- так-то и количество proposers можно ограничить - guard =<< lift (refChanSubscribed adapter (getRefChanId msg)) + guard' "refChanSubscribed" =<< refChanSubscribed adapter (getRefChanId msg) let h0 = hashObject @HbSync (serialise msg) debug $ "RefchanUpdate: ALREADY" <+> pretty h0 - guard =<< liftIO (hasBlock sto h0 <&> isNothing) + guard' ("has block " <> (Text.pack . show . pretty) h0) =<< liftIO (hasBlock sto h0 <&> isNothing) case msg of Propose chan box -> do @@ -280,30 +281,35 @@ refChanUpdateProto self pc adapter msg = do debug $ "RefChanUpdate/Propose" <+> pretty h0 deferred @proto do + -- do -- проверили подпись пира - (peerKey, ProposeTran headRef abox) <- MaybeT $ pure $ unboxSignedBox0 box + (peerKey, ProposeTran headRef abox) <- unboxSignedBox0 box + & justOrThrowIO "unbox signed box" -- проверили подпись автора - (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 abox + (authorKey, _) <- unboxSignedBox0 abox + & justOrThrowIO "unbox signed abox" -- итак, сначала достаём голову. как мы достаём голову? let refchanKey = RefChanHeadKey @s chan - h <- MaybeT $ liftIO $ getRef sto refchanKey + h <- liftIO (getRef sto refchanKey) + & justMOrThrowIO "getref" -- смотрим, что у нас такая же голова. -- если нет -- значит, кто-то рассинхронизировался. -- может быть, потом попробуем головы запросить - guard (HashRef h == headRef) + guard' "HashRef h == headRef" (HashRef h == headRef) debug $ "OMG! Got trans" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) -- теперь достаём голову - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + headBlock <- getActualRefChanHead @e refchanKey + & justMOrThrowIO "getActualRefChanHead" let pips = view refChanHeadPeers headBlock - guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey + guard' "checkACL" $ checkACL ACLUpdate headBlock (Just peerKey) authorKey debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) @@ -331,7 +337,8 @@ refChanUpdateProto self pc adapter msg = do -- это правильно, так как транза содержит ссылку на RefChanId -- следовательно, для другого рефчана будет другая транза - hash <- MaybeT $ liftIO $ putBlock sto (serialise msg) + hash <- liftIO (putBlock sto (serialise msg)) + & justMOrThrowIO "putBlock" ts <- liftIO getTimeCoarse @@ -340,7 +347,7 @@ refChanUpdateProto self pc adapter msg = do let rcrk = RefChanRoundKey (HashRef hash) - rndHere <- lift $ find rcrk id + rndHere <- find rcrk id defRound <- RefChanRound @e (HashRef hash) refchanKey ttl <$> newTVarIO False @@ -349,14 +356,14 @@ refChanUpdateProto self pc adapter msg = do <*> newTVarIO (HashMap.singleton peerKey ()) unless (isJust rndHere) do - lift $ update defRound rcrk id - lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk) + update defRound rcrk id + emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk) -- не обрабатывать propose, если он уже в процессе - guard (isNothing rndHere) + guard' "isNothing rndHere" (isNothing rndHere) -- FIXME: fixed-timeout-is-no-good - validated <- either id id <$> lift ( race (pause @'Seconds 5 >> pure False) + validated <- either id id <$> ( race (pause @'Seconds 5 >> pure False) $ refChanValidatePropose adapter chan (HashRef hash) ) -- почему так: @@ -371,16 +378,16 @@ refChanUpdateProto self pc adapter msg = do atomically $ writeTVar (view refChanRoundClosed rnd) True liftIO $ delBlock sto hash - guard validated + guard' "validated" validated debug $ "TRANS VALIDATED" <+> pretty (AsBase58 chan) <+> pretty hash - lift $ gossip msg + gossip msg -- проверить, что мы вообще авторизованы -- рассылать ACCEPT - guard ( pk `HashMap.member` pips ) + guard' "pk in pips" ( pk `HashMap.member` pips ) -- если нет - то и всё, просто перешлём -- по госсипу исходную транзу @@ -393,12 +400,13 @@ refChanUpdateProto self pc adapter msg = do -- -- и рассылаем всем debug "GOSSIP ACCEPT TRANSACTION" - lift $ gossip accept + gossip accept -- -- рассылаем ли себе? что бы был хоть один accept - lift $ refChanUpdateProto True pc adapter accept + refChanUpdateProto True pc adapter accept Accept chan box -> deferred @proto do + -- Accept chan box -> do -- что если получили ACCEPT раньше PROPOSE ? -- что если PROPOSE еще обрабатывается? @@ -409,17 +417,21 @@ refChanUpdateProto self pc adapter msg = do debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 - (peerKey, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box + (peerKey, headRef, hashRef) <- justOrThrowIO "accept unboxSignedBox0 box" do + (peerKey, AcceptTran _ headRef hashRef) <- unboxSignedBox0 box + Just (peerKey, headRef, hashRef) let refchanKey = RefChanHeadKey @s chan - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + headBlock <- getActualRefChanHead @e refchanKey + & justMOrThrowIO "getActualRefChanHead" - h <- MaybeT $ liftIO $ getRef sto refchanKey + h <- liftIO (getRef sto refchanKey) + & justMOrThrowIO "getRef" - guard (HashRef h == headRef) + guard' "HashRef h == headRef" (HashRef h == headRef) - lift $ gossip msg + gossip msg -- тут может так случиться, что propose еще нет -- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы @@ -428,7 +440,7 @@ refChanUpdateProto self pc adapter msg = do -- вот прямо тут надо ждать, пока придёт / завершится Propose -- -- или до таймаута - let afterPropose = runMaybeT do + let afterPropose = do here <- fix \next -> do blk <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust @@ -441,36 +453,44 @@ refChanUpdateProto self pc adapter msg = do unless here do warn $ "No propose transaction saved yet!" <+> pretty hashRef - tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) + tranBs <- liftIO (getBlock sto (fromHashRef hashRef)) + & justMOrThrowIO "after propose getBlock" - tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just + tran <- deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just + & justOrThrowIO "after propose deserialiseOrFail RefChanUpdate" - proposed <- MaybeT $ pure $ case tran of + proposed <- justOrThrowIO "after propose case tran" $ + case tran of Propose _ pbox -> Just pbox _ -> Nothing - (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @s proposed + (_, ptran) <- unboxSignedBox0 @(ProposeTran e) @s proposed + & justOrThrowIO "after propose unboxSignedBox0 proposed" debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 -- compiler bug? let (ProposeTran _ pbox) = ptran - (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox + (authorKey, _) <- unboxSignedBox0 pbox + & justOrThrowIO "after propose unboxSignedBox0 pbox" -- может, и не надо второй раз проверять - guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey + guard' "after propose checkACL" $ + checkACL ACLUpdate headBlock (Just peerKey) authorKey debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef - rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id + rcRound <- find (RefChanRoundKey @e hashRef) id + & justMOrThrowIO "after propose find RefChanRoundKey" atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) -- TODO: garbage-collection-strongly-required - ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) + ha <- liftIO (putBlock sto (serialise msg)) + & justMOrThrowIO "after propose putBlock" atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? @@ -490,7 +510,7 @@ refChanUpdateProto self pc adapter msg = do trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList forM_ trans $ \t -> do - lift $ refChanWriteTran adapter t + refChanWriteTran adapter t debug $ "WRITING TRANS" <+> pretty t let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList @@ -509,8 +529,22 @@ refChanUpdateProto self pc adapter msg = do -- все остановят. let w = TimeoutSec (realToFrac $ view refChanHeadWaitAccept headBlock) - void $ lift $ race ( pause (2 * w) ) afterPropose + void $ race ( pause (2 * w) ) afterPropose + where + guard' :: Text -> Bool -> m () + guard' msg p = unless p $ throwIO (RefchanUpdateProtoFailure msg) + justOrThrowIO :: Text -> Maybe a -> m a + justOrThrowIO msg = maybe (throwIO (RefchanUpdateProtoFailure msg)) pure + + justMOrThrowIO :: Text -> m (Maybe a) -> m a + justMOrThrowIO msg = (justOrThrowIO msg =<<) + +orThrowIO :: Monad m => m a -> m (Maybe a) -> m a +orThrowIO md = (maybe md pure =<<) + +data RefchanUpdateProtoFailure = RefchanUpdateProtoFailure Text deriving (Show) +instance Exception RefchanUpdateProtoFailure -- TODO: refchan-poll-proto -- Запрашиваем refchan у всех. diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs index f8018cc6..90a0aa74 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/RefChan.hs @@ -10,6 +10,7 @@ import HBS2.Data.Types.SignedBox import Data.ByteString.Lazy ( ByteString ) import Data.ByteString qualified as BS import Codec.Serialise +import Control.Exception -- NOTE: refchan-head-endpoints data RpcRefChanHeadGet @@ -32,6 +33,11 @@ type RefChanAPI = '[ RpcRefChanHeadGet , RpcRefChanNotify ] +data RefChanAPIError = RefChanAPIError Text + deriving (Generic, Show) +instance Exception RefChanAPIError +instance Serialise RefChanAPIError + type RefChanAPIProto = 0xDA2374630001 @@ -56,7 +62,7 @@ type instance Input RpcRefChanGet = PubKey 'Sign 'HBS2Basic type instance Output RpcRefChanGet = Maybe HashRef type instance Input RpcRefChanPropose = (PubKey 'Sign 'HBS2Basic, SignedBox BS.ByteString 'HBS2Basic) -type instance Output RpcRefChanPropose = () +type instance Output RpcRefChanPropose = (Either RefChanAPIError ()) type instance Input RpcRefChanNotify = (PubKey 'Sign 'HBS2Basic, SignedBox BS.ByteString 'HBS2Basic) type instance Output RpcRefChanNotify = () diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Client/RefChan.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Client/RefChan.hs index c6e15b7a..109b6ac0 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Client/RefChan.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Client/RefChan.hs @@ -78,7 +78,7 @@ postRefChanTx puk box = do api <- getClientAPI @RefChanAPI @proto callRpcWaitMay @RpcRefChanPropose (TimeoutSec 1) api (puk, box) >>= \case Nothing -> throwIO RpcTimeoutError - Just e -> pure e + Just e -> either throwIO pure e fetchRefChanHead :: forall proto m . ( MonadUnliftIO m , HasClientAPI RefChanAPI proto m diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs index f35a6cee..dfb23ede 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs @@ -20,6 +20,7 @@ import Data.Config.Suckless.Syntax import Data.Config.Suckless.Parse import Data.Kind +import Data.Text (Text) import Control.Monad import Control.Monad.Reader import Data.ByteString ( ByteString ) @@ -27,18 +28,18 @@ import UnliftIO data RPC2Context = RPC2Context - { rpcConfig :: [Syntax C] - , rpcMessaging :: MessagingUnix - , rpcPokeAnswer :: String - , rpcPeerEnv :: PeerEnv L4Proto - , rpcLocalMultiCast :: Peer L4Proto - , rpcStorage :: AnyStorage - , rpcBrains :: SomeBrains L4Proto - , rpcByPassInfo :: IO ByPassStat - , rpcDoFetch :: HashRef -> IO () - , rpcDoRefChanHeadPost :: HashRef -> IO () - , rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO () - , rpcDoRefChanNotify :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO () + { rpcConfig :: [Syntax C] + , rpcMessaging :: MessagingUnix + , rpcPokeAnswer :: String + , rpcPeerEnv :: PeerEnv L4Proto + , rpcLocalMultiCast :: Peer L4Proto + , rpcStorage :: AnyStorage + , rpcBrains :: SomeBrains L4Proto + , rpcByPassInfo :: IO ByPassStat + , rpcDoFetch :: HashRef -> IO () + , rpcDoRefChanHeadPost :: HashRef -> IO () + , rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO (Either Text ()) + , rpcDoRefChanNotify :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO () } instance (Monad m, Messaging MessagingUnix UNIX (Encoded UNIX)) => HasFabriq UNIX (ReaderT RPC2Context m) where