Tune refchan interface, implementation

This commit is contained in:
Snail 2025-01-28 15:20:11 +04:00 committed by voidlizard
parent 9fca167dd3
commit 2f2796603a
8 changed files with 117 additions and 58 deletions

View File

@ -2,6 +2,7 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
module HBS2.Data.Types.SignedBox where
import HBS2.Base58
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Types
import HBS2.Net.Auth.Credentials
@ -17,6 +18,11 @@ data SignedBox p s =
SignedBox (PubKey 'Sign s) ByteString (Signature s)
deriving stock (Generic)
instance (Pretty (AsBase58 (PubKey 'Sign s)), Pretty (AsBase58 (Signature s)))
=> Pretty (SignedBox p s) where
pretty (SignedBox k b s) =
"SignedBox" <+> pretty (AsBase58 k) <+> pretty (AsBase58 b) <+> pretty (AsBase58 s)
deriving stock instance
( Eq (PubKey 'Sign s)
, Eq (Signature s)

View File

@ -207,6 +207,9 @@ instance ( Serialise (PeerCredentials e)
-- FIXME: move-thouse-instances-to-appropriate-place-ASAP
instance Pretty (AsBase58 Sign.Signature) where
pretty (AsBase58 pk) = pretty $ B8.unpack $ toBase58 (Crypto.encode pk)
instance Pretty (AsBase58 Sign.PublicKey) where
pretty (AsBase58 pk) = pretty $ B8.unpack $ toBase58 (Crypto.encode pk)

View File

@ -81,6 +81,7 @@ import HBS2.Peer.Proto.LWWRef.Internal
import RPC2(RPC2Context(..))
import Codec.Serialise as Serialise
import Control.Arrow (left)
import Control.Concurrent (myThreadId)
import Control.Concurrent.STM
import Control.Exception as Exception
@ -99,6 +100,7 @@ import Data.Map qualified as Map
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
import Data.Text qualified as T
import Data.Time.Clock.POSIX
import Data.Time.Format
import Lens.Micro.Platform as Lens
@ -1145,11 +1147,13 @@ runPeer opts = Exception.handle (\e -> myException e
let refChanProposeAction (puk, box) = do
debug $ "rpc.reChanPropose" <+> pretty (AsBase58 puk)
void $ liftIO $ withPeerM penv $ do
r <- liftIO $ fmap (left (T.pack . show @SomeException)) $ try $ withPeerM penv do
me <- ownPeer @e
runMaybeT do
proposed <- MaybeT $ makeProposeTran @e pc puk box
lift $ runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed)
proposed <- maybe (liftIO $ throwIO MakeProposeTranError) pure
=<< makeProposeTran @e pc puk box
runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed)
debug $ "rpc.reChanPropose ok" <+> pretty (AsBase58 puk) <+> pretty box
pure r
-- NOTE: moved-to-rpc
let refChanNotifyAction (puk, box) = do
@ -1248,6 +1252,9 @@ runPeer opts = Exception.handle (\e -> myException e
-- we want to clean up all resources
throwM GoAgainException
data MakeProposeTranError = MakeProposeTranError deriving (Show)
instance Exception MakeProposeTranError
emitToPeer :: ( MonadIO m
, EventEmitter e a (PeerM e IO)
)

View File

@ -20,6 +20,7 @@ import HBS2.Peer.RPC.Internal.Types
import PeerTypes
import Control.Arrow (left)
import Control.Monad.Reader
type RefChanContext m = (MonadIO m, HasRpcContext RefChanAPI RPC2Context m)
@ -70,7 +71,8 @@ instance RefChanContext m => HandleMethod m RpcRefChanPropose where
handleMethod (puk, box) = do
co <- getRpcContext @RefChanAPI
debug $ "rpc.refChanNotifyAction" <+> pretty (AsBase58 puk)
liftIO $ rpcDoRefChanPropose co (puk, box)
liftIO $ left RefChanAPIError <$> do
rpcDoRefChanPropose co (puk, box)
instance RefChanContext m => HandleMethod m RpcRefChanNotify where

View File

@ -39,6 +39,7 @@ import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.Maybe
import Data.Word
import Data.Text qualified as Text
import Lens.Micro.Platform
import Data.Hashable hiding (Hashed)
import Type.Reflection (someTypeRep)
@ -240,7 +241,7 @@ refChanUpdateProto :: forall e s m proto . ( MonadUnliftIO m
-> RefChanUpdate e
-> m ()
refChanUpdateProto self pc adapter msg = do
refChanUpdateProto self pc adapter msg = flip withException (\e -> liftIO (print (e :: SomeException))) do
-- авторизовать пира
peer <- thatPeer @proto
@ -251,9 +252,9 @@ refChanUpdateProto self pc adapter msg = do
let pk = view peerSignPk pc
let sk = view peerSignSk pc
void $ runMaybeT do
do
guard (auth || self)
guard' "auth || self" (auth || self)
-- TODO: process-each-message-only-once
-- где-то тут мы разбираемся, что такое сообщеине
@ -266,13 +267,13 @@ refChanUpdateProto self pc adapter msg = do
-- "блок".
-- так-то и количество proposers можно ограничить
guard =<< lift (refChanSubscribed adapter (getRefChanId msg))
guard' "refChanSubscribed" =<< refChanSubscribed adapter (getRefChanId msg)
let h0 = hashObject @HbSync (serialise msg)
debug $ "RefchanUpdate: ALREADY" <+> pretty h0
guard =<< liftIO (hasBlock sto h0 <&> isNothing)
guard' ("has block " <> (Text.pack . show . pretty) h0) =<< liftIO (hasBlock sto h0 <&> isNothing)
case msg of
Propose chan box -> do
@ -280,30 +281,35 @@ refChanUpdateProto self pc adapter msg = do
debug $ "RefChanUpdate/Propose" <+> pretty h0
deferred @proto do
-- do
-- проверили подпись пира
(peerKey, ProposeTran headRef abox) <- MaybeT $ pure $ unboxSignedBox0 box
(peerKey, ProposeTran headRef abox) <- unboxSignedBox0 box
& justOrThrowIO "unbox signed box"
-- проверили подпись автора
(authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 abox
(authorKey, _) <- unboxSignedBox0 abox
& justOrThrowIO "unbox signed abox"
-- итак, сначала достаём голову. как мы достаём голову?
let refchanKey = RefChanHeadKey @s chan
h <- MaybeT $ liftIO $ getRef sto refchanKey
h <- liftIO (getRef sto refchanKey)
& justMOrThrowIO "getref"
-- смотрим, что у нас такая же голова.
-- если нет -- значит, кто-то рассинхронизировался.
-- может быть, потом попробуем головы запросить
guard (HashRef h == headRef)
guard' "HashRef h == headRef" (HashRef h == headRef)
debug $ "OMG! Got trans" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey)
-- теперь достаём голову
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
headBlock <- getActualRefChanHead @e refchanKey
& justMOrThrowIO "getActualRefChanHead"
let pips = view refChanHeadPeers headBlock
guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey
guard' "checkACL" $ checkACL ACLUpdate headBlock (Just peerKey) authorKey
debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey)
@ -331,7 +337,8 @@ refChanUpdateProto self pc adapter msg = do
-- это правильно, так как транза содержит ссылку на RefChanId
-- следовательно, для другого рефчана будет другая транза
hash <- MaybeT $ liftIO $ putBlock sto (serialise msg)
hash <- liftIO (putBlock sto (serialise msg))
& justMOrThrowIO "putBlock"
ts <- liftIO getTimeCoarse
@ -340,7 +347,7 @@ refChanUpdateProto self pc adapter msg = do
let rcrk = RefChanRoundKey (HashRef hash)
rndHere <- lift $ find rcrk id
rndHere <- find rcrk id
defRound <- RefChanRound @e (HashRef hash) refchanKey ttl
<$> newTVarIO False
@ -349,14 +356,14 @@ refChanUpdateProto self pc adapter msg = do
<*> newTVarIO (HashMap.singleton peerKey ())
unless (isJust rndHere) do
lift $ update defRound rcrk id
lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk)
update defRound rcrk id
emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk)
-- не обрабатывать propose, если он уже в процессе
guard (isNothing rndHere)
guard' "isNothing rndHere" (isNothing rndHere)
-- FIXME: fixed-timeout-is-no-good
validated <- either id id <$> lift ( race (pause @'Seconds 5 >> pure False)
validated <- either id id <$> ( race (pause @'Seconds 5 >> pure False)
$ refChanValidatePropose adapter chan (HashRef hash)
)
-- почему так:
@ -371,16 +378,16 @@ refChanUpdateProto self pc adapter msg = do
atomically $ writeTVar (view refChanRoundClosed rnd) True
liftIO $ delBlock sto hash
guard validated
guard' "validated" validated
debug $ "TRANS VALIDATED" <+> pretty (AsBase58 chan) <+> pretty hash
lift $ gossip msg
gossip msg
-- проверить, что мы вообще авторизованы
-- рассылать ACCEPT
guard ( pk `HashMap.member` pips )
guard' "pk in pips" ( pk `HashMap.member` pips )
-- если нет - то и всё, просто перешлём
-- по госсипу исходную транзу
@ -393,12 +400,13 @@ refChanUpdateProto self pc adapter msg = do
-- -- и рассылаем всем
debug "GOSSIP ACCEPT TRANSACTION"
lift $ gossip accept
gossip accept
-- -- рассылаем ли себе? что бы был хоть один accept
lift $ refChanUpdateProto True pc adapter accept
refChanUpdateProto True pc adapter accept
Accept chan box -> deferred @proto do
-- Accept chan box -> do
-- что если получили ACCEPT раньше PROPOSE ?
-- что если PROPOSE еще обрабатывается?
@ -409,17 +417,21 @@ refChanUpdateProto self pc adapter msg = do
debug $ "RefChanUpdate/ACCEPT" <+> pretty h0
(peerKey, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
(peerKey, headRef, hashRef) <- justOrThrowIO "accept unboxSignedBox0 box" do
(peerKey, AcceptTran _ headRef hashRef) <- unboxSignedBox0 box
Just (peerKey, headRef, hashRef)
let refchanKey = RefChanHeadKey @s chan
headBlock <- MaybeT $ getActualRefChanHead @e refchanKey
headBlock <- getActualRefChanHead @e refchanKey
& justMOrThrowIO "getActualRefChanHead"
h <- MaybeT $ liftIO $ getRef sto refchanKey
h <- liftIO (getRef sto refchanKey)
& justMOrThrowIO "getRef"
guard (HashRef h == headRef)
guard' "HashRef h == headRef" (HashRef h == headRef)
lift $ gossip msg
gossip msg
-- тут может так случиться, что propose еще нет
-- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы
@ -428,7 +440,7 @@ refChanUpdateProto self pc adapter msg = do
-- вот прямо тут надо ждать, пока придёт / завершится Propose
-- -- или до таймаута
let afterPropose = runMaybeT do
let afterPropose = do
here <- fix \next -> do
blk <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust
@ -441,36 +453,44 @@ refChanUpdateProto self pc adapter msg = do
unless here do
warn $ "No propose transaction saved yet!" <+> pretty hashRef
tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef)
tranBs <- liftIO (getBlock sto (fromHashRef hashRef))
& justMOrThrowIO "after propose getBlock"
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just
tran <- deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just
& justOrThrowIO "after propose deserialiseOrFail RefChanUpdate"
proposed <- MaybeT $ pure $ case tran of
proposed <- justOrThrowIO "after propose case tran" $
case tran of
Propose _ pbox -> Just pbox
_ -> Nothing
(_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @s proposed
(_, ptran) <- unboxSignedBox0 @(ProposeTran e) @s proposed
& justOrThrowIO "after propose unboxSignedBox0 proposed"
debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0
-- compiler bug?
let (ProposeTran _ pbox) = ptran
(authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox
(authorKey, _) <- unboxSignedBox0 pbox
& justOrThrowIO "after propose unboxSignedBox0 pbox"
-- может, и не надо второй раз проверять
guard $ checkACL ACLUpdate headBlock (Just peerKey) authorKey
guard' "after propose checkACL" $
checkACL ACLUpdate headBlock (Just peerKey) authorKey
debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef
rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id
rcRound <- find (RefChanRoundKey @e hashRef) id
& justMOrThrowIO "after propose find RefChanRoundKey"
atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ())
-- TODO: garbage-collection-strongly-required
ha <- MaybeT $ liftIO $ putBlock sto (serialise msg)
ha <- liftIO (putBlock sto (serialise msg))
& justMOrThrowIO "after propose putBlock"
atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha))
-- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it?
@ -490,7 +510,7 @@ refChanUpdateProto self pc adapter msg = do
trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList
forM_ trans $ \t -> do
lift $ refChanWriteTran adapter t
refChanWriteTran adapter t
debug $ "WRITING TRANS" <+> pretty t
let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList
@ -509,8 +529,22 @@ refChanUpdateProto self pc adapter msg = do
-- все остановят.
let w = TimeoutSec (realToFrac $ view refChanHeadWaitAccept headBlock)
void $ lift $ race ( pause (2 * w) ) afterPropose
void $ race ( pause (2 * w) ) afterPropose
where
guard' :: Text -> Bool -> m ()
guard' msg p = unless p $ throwIO (RefchanUpdateProtoFailure msg)
justOrThrowIO :: Text -> Maybe a -> m a
justOrThrowIO msg = maybe (throwIO (RefchanUpdateProtoFailure msg)) pure
justMOrThrowIO :: Text -> m (Maybe a) -> m a
justMOrThrowIO msg = (justOrThrowIO msg =<<)
orThrowIO :: Monad m => m a -> m (Maybe a) -> m a
orThrowIO md = (maybe md pure =<<)
data RefchanUpdateProtoFailure = RefchanUpdateProtoFailure Text deriving (Show)
instance Exception RefchanUpdateProtoFailure
-- TODO: refchan-poll-proto
-- Запрашиваем refchan у всех.

View File

@ -10,6 +10,7 @@ import HBS2.Data.Types.SignedBox
import Data.ByteString.Lazy ( ByteString )
import Data.ByteString qualified as BS
import Codec.Serialise
import Control.Exception
-- NOTE: refchan-head-endpoints
data RpcRefChanHeadGet
@ -32,6 +33,11 @@ type RefChanAPI = '[ RpcRefChanHeadGet
, RpcRefChanNotify
]
data RefChanAPIError = RefChanAPIError Text
deriving (Generic, Show)
instance Exception RefChanAPIError
instance Serialise RefChanAPIError
type RefChanAPIProto = 0xDA2374630001
@ -56,7 +62,7 @@ type instance Input RpcRefChanGet = PubKey 'Sign 'HBS2Basic
type instance Output RpcRefChanGet = Maybe HashRef
type instance Input RpcRefChanPropose = (PubKey 'Sign 'HBS2Basic, SignedBox BS.ByteString 'HBS2Basic)
type instance Output RpcRefChanPropose = ()
type instance Output RpcRefChanPropose = (Either RefChanAPIError ())
type instance Input RpcRefChanNotify = (PubKey 'Sign 'HBS2Basic, SignedBox BS.ByteString 'HBS2Basic)
type instance Output RpcRefChanNotify = ()

View File

@ -78,7 +78,7 @@ postRefChanTx puk box = do
api <- getClientAPI @RefChanAPI @proto
callRpcWaitMay @RpcRefChanPropose (TimeoutSec 1) api (puk, box) >>= \case
Nothing -> throwIO RpcTimeoutError
Just e -> pure e
Just e -> either throwIO pure e
fetchRefChanHead :: forall proto m . ( MonadUnliftIO m
, HasClientAPI RefChanAPI proto m

View File

@ -20,6 +20,7 @@ import Data.Config.Suckless.Syntax
import Data.Config.Suckless.Parse
import Data.Kind
import Data.Text (Text)
import Control.Monad
import Control.Monad.Reader
import Data.ByteString ( ByteString )
@ -37,7 +38,7 @@ data RPC2Context =
, rpcByPassInfo :: IO ByPassStat
, rpcDoFetch :: HashRef -> IO ()
, rpcDoRefChanHeadPost :: HashRef -> IO ()
, rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO ()
, rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO (Either Text ())
, rpcDoRefChanNotify :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO ()
}