diff --git a/.fixme/log b/.fixme/log index 30abe5a9..e69de29b 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,2 +0,0 @@ - -(fixme-set "workflow" "test" "3nmxU5Ro8b") \ No newline at end of file diff --git a/docs/devlog.md b/docs/devlog.md index 5039caf1..87e98a23 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1,3 +1,7 @@ +## 2023-07-30 + +какие-то косяки + ## 2023-07-25 кажется, git push --force что-то портит @@ -1304,6 +1308,6 @@ PR: tcp-pex PR: bus-crypt branch: iv/bus-crypt - Шифрование протокола общения нод. + Шифрование протокола общения нод. Обмен асимметричными публичными ключами выполняется на стадии хэндшейка в ping/pong. Для шифрования данных создаётся симметричный ключ по diffie-hellman. diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 5503512c..b337620d 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -92,6 +92,7 @@ library , HBS2.Net.Messaging.Fake , HBS2.Net.Messaging.UDP , HBS2.Net.Messaging.TCP + , HBS2.Net.Messaging.Unix , HBS2.Net.PeerLocator , HBS2.Net.PeerLocator.Static , HBS2.Net.Proto diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 6fb6737b..bf82a7e7 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -277,16 +277,6 @@ instance ( MonadIO m se <- asks (view envSessions) liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) -class HasProtocol e p => HasTimeLimits e p m where - tryLockForPeriod :: Peer e -> p -> m Bool - -instance {-# OVERLAPPABLE #-} - (MonadIO (t m), Monad m, MonadTrans t, HasProtocol e p, HasTimeLimits e p m) => HasTimeLimits e p (t m) where - tryLockForPeriod p m = lift (tryLockForPeriod p m) - -- pure True - -- liftIO $ print "LIMIT DOES NOT WORK" - -- pure True - instance (MonadIO m, HasProtocol e p, Hashable (Encoded e)) => HasTimeLimits e p (PeerM e m) where tryLockForPeriod peer msg = case requestPeriodLim @e @p of diff --git a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs index 9ac45496..0b865f95 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs @@ -1,6 +1,7 @@ {-# Language AllowAmbiguousTypes #-} module HBS2.Actors.Peer.Types where +import HBS2.Prelude import HBS2.Storage import HBS2.Net.Proto.Types import HBS2.Hash @@ -10,6 +11,17 @@ import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) +class HasProtocol e p => HasTimeLimits e p m where + tryLockForPeriod :: Peer e -> p -> m Bool + +instance {-# OVERLAPPABLE #-} + (MonadIO (t m), Monad m, MonadTrans t, HasProtocol e p, HasTimeLimits e p m) => HasTimeLimits e p (t m) where + tryLockForPeriod p m = lift (tryLockForPeriod p m) + -- pure True + -- liftIO $ print "LIMIT DOES NOT WORK" + -- pure True + + instance (IsKey HbSync) => Storage AnyStorage HbSync ByteString IO where putBlock (AnyStorage s) = putBlock s enqueueBlock (AnyStorage s) = enqueueBlock s diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs new file mode 100644 index 00000000..b1b948d8 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -0,0 +1,230 @@ +module HBS2.Net.Messaging.Unix where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Types +import HBS2.Net.Messaging +import HBS2.Clock + +import HBS2.System.Logger.Simple + +import Control.Monad.Trans.Resource +import Control.Monad +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.Function +import Data.Functor +import Data.Hashable +import Data.List qualified as List +import Network.ByteOrder hiding (ByteString) +import Network.Socket +import Network.Socket.ByteString +import Control.Concurrent.STM.TQueue (flushTQueue) +import Data.Set (Set) +import Data.Set qualified as Set +import UnliftIO + +data UNIX + +{- HLINT ignore "Use newtype instead of data" -} +data MessagingUnixOpts = + MUWatchdog Int + deriving (Eq,Ord,Show,Generic,Data) + +-- FIXME: use-bounded-queues +data MessagingUnix = + MessagingUnix + { msgUnixSockPath :: FilePath + , msgUnixServer :: Bool + , msgUnixRetryTime :: Timeout 'Seconds + , msgUnixSelf :: Peer UNIX + , msgUnixOpts :: Set MessagingUnixOpts + , msgUnixInbox :: TQueue ByteString + , msgUnixRecv :: TQueue (From UNIX, ByteString) + , msgUnixLast :: TVar TimeSpec + , msgUnixAccepts :: TVar Int + } + + + +newMessagingUnix :: MonadIO m + => Bool + -> Timeout 'Seconds + -> FilePath + -> m MessagingUnix + +newMessagingUnix server tsec path = do + newMessagingUnixOpts mempty server tsec path + +newMessagingUnixOpts :: MonadIO m + => [MessagingUnixOpts] + -> Bool + -> Timeout 'Seconds + -> FilePath + -> m MessagingUnix + +newMessagingUnixOpts opts server tsec path = do + let sa = SockAddrUnix path + now <- getTimeCoarse + MessagingUnix path + server + tsec + (PeerUNIX sa) + (Set.fromList opts) + <$> liftIO newTQueueIO + <*> liftIO newTQueueIO + <*> liftIO (newTVarIO now) + <*> liftIO (newTVarIO 0) + +instance HasPeer UNIX where + newtype instance Peer UNIX = PeerUNIX {fromPeerUnix :: SockAddr} + deriving stock (Eq,Ord,Show,Generic) + deriving newtype (Pretty) + +instance IsString (Peer UNIX) where + fromString p = PeerUNIX (SockAddrUnix p) + +-- FIXME: fix-code-dup? +instance Hashable (Peer UNIX) where + hashWithSalt salt p = case fromPeerUnix p of + SockAddrInet pn h -> hashWithSalt salt (4, fromIntegral pn, h) + SockAddrInet6 pn _ h _ -> hashWithSalt salt (6, fromIntegral pn, h) + SockAddrUnix s -> hashWithSalt salt ("unix", s) + + +data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable) + +instance Exception ReadTimeoutException + + +runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m () +runMessagingUnix env = do + + if msgUnixServer env then + runServer + else + runClient + + where + + runServer = forever $ handleAny cleanupAndRetry $ runResourceT do + + t0 <- getTimeCoarse + atomically $ writeTVar (msgUnixLast env) t0 + + sock <- liftIO $ socket AF_UNIX Stream defaultProtocol + + void $ allocate (pure sock) (`shutdown` ShutdownBoth) + + liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) + liftIO $ listen sock 1 + + watchdog <- async $ do + + let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] + + maybe1 mwd (forever (pause @'Seconds 60)) $ \wd -> do + + forever do + + pause $ TimeoutSec $ realToFrac $ min (wd `div` 2) 1 + + now <- getTimeCoarse + seen <- readTVarIO (msgUnixLast env) + acc <- readTVarIO (msgUnixAccepts env) + + trace $ "watchdog" <+> pretty now <+> pretty seen <+> pretty acc + + let diff = toNanoSeconds $ TimeoutTS (now - seen) + + when ( acc > 0 && diff >= toNanoSeconds (TimeoutSec $ realToFrac wd) ) do + throwIO ReadTimeoutException + + run <- async $ forever $ runResourceT do + (so, sa) <- liftIO $ accept sock + + atomically $ modifyTVar (msgUnixAccepts env) succ + + void $ allocate (pure so) close + + writer <- async $ forever do + msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll so $ bytestring32 (fromIntegral len) + liftIO $ sendAll so $ LBS.toStrict msg + + void $ allocate (pure writer) cancel + + link writer + + fix \next -> do + -- FIXME: timeout-hardcode + frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral + frame <- liftIO $ recv so frameLen + atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame) + now <- getTimeCoarse + atomically $ writeTVar (msgUnixLast env) now + next + + (_, r) <- waitAnyCatchCancel [run, watchdog] + + case r of + Left e -> throwIO e + Right{} -> pure () + + + runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do + + sock <- liftIO $ socket AF_UNIX Stream defaultProtocol + + void $ allocate (pure sock) close + + let sa = SockAddrUnix (msgUnixSockPath env) + + let attemptConnect = do + result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env) + case result of + Right _ -> return () + Left (e :: SomeException) -> do + pause (msgUnixRetryTime env) + warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e + attemptConnect + + attemptConnect + + reader <- async $ forever do + -- Read response from server + frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral + frame <- liftIO $ recv sock frameLen + atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame) + + forever do + msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll sock $ bytestring32 (fromIntegral len) + liftIO $ sendAll sock $ LBS.toStrict msg + + void $ waitAnyCatchCancel [reader] + + cleanupAndRetry e = liftIO do + warn $ "MessagingUnix. client seems gone. restaring server" <+> pretty (msgUnixSelf env) + err (viaShow e) + atomically $ writeTVar (msgUnixAccepts env) 0 + liftIO $ atomically $ void $ flushTQueue (msgUnixInbox env) + liftIO $ atomically $ void $ flushTQueue (msgUnixRecv env) + pause (msgUnixRetryTime env) + + logAndRetry :: SomeException -> IO () + logAndRetry e = do + warn $ "MessagingUnix. runClient failed, probably server is gone. Retrying:" <+> pretty (msgUnixSelf env) + err (viaShow e) + pause (msgUnixRetryTime env) + + +instance Messaging MessagingUnix UNIX ByteString where + + sendTo bus (To _) _ msg = liftIO do + atomically $ writeTQueue (msgUnixInbox bus) msg + + receive bus _ = liftIO do + atomically $ readTQueue (msgUnixRecv bus) <&> List.singleton + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 0b540944..e1223184 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -9,6 +9,7 @@ module HBS2.Net.Proto.Definition import HBS2.Clock import HBS2.Defaults import HBS2.Hash +import HBS2.Actors.Peer.Types import HBS2.Net.Auth.Credentials import HBS2.Net.Proto import HBS2.Net.Proto.BlockAnnounce @@ -22,6 +23,7 @@ import HBS2.Net.Proto.PeerExchange import HBS2.Net.Proto.PeerMeta import HBS2.Net.Proto.RefLog import HBS2.Net.Proto.RefChan +import HBS2.Net.Messaging.Unix (UNIX) import HBS2.Prelude import Control.Monad @@ -39,6 +41,8 @@ import Crypto.Saltine.Core.Box qualified as Encrypt type instance Encryption L4Proto = HBS2Basic +type instance Encryption UNIX = HBS2Basic + type instance PubKey 'Sign HBS2Basic = Sign.PublicKey type instance PrivKey 'Sign HBS2Basic = Sign.SecretKey type instance PubKey 'Encrypt HBS2Basic = Encrypt.PublicKey @@ -191,6 +195,22 @@ instance HasProtocol L4Proto (DialResp L4Proto) where decode = dialRespDecode . BSL.toStrict encode = BSL.fromStrict . dialRespEncode + +instance Serialise (RefChanValidate UNIX) => HasProtocol UNIX (RefChanValidate UNIX) where + type instance ProtocolId (RefChanValidate UNIX) = 0xFFFA0001 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance MonadIO m => HasNonces (RefChanValidate UNIX) m where + type instance Nonce (RefChanValidate UNIX) = BS.ByteString + newNonce = do + n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) + pure $ BS.take 8 n + +instance HasTimeLimits UNIX (RefChanValidate UNIX) IO where + tryLockForPeriod _ _ = pure True + instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 9a417947..4c7d8fc2 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -30,6 +30,7 @@ import Codec.Serialise import Control.Monad.Identity import Control.Monad.Trans.Maybe import Data.ByteString (ByteString) +import Data.ByteString qualified as BS import Data.ByteString.Lazy qualified as LBS import Data.Either import Data.HashMap.Strict (HashMap) @@ -258,12 +259,43 @@ data RefChanNotify e = instance ForRefChans e => Serialise (RefChanNotify e) +type RefChanValidateNonce e = Nonce (RefChanValidate e) + +data RefChanValidate e = + RefChanValidate + { rcvNonce :: Nonce (RefChanValidate e) + , rcvChan :: RefChanId e + , rcvData :: RefChanValidateData e + } + deriving stock (Generic) + +data RefChanValidateData e = + Validate HashRef + | Accepted HashRef + | Rejected HashRef + | Poke + deriving stock (Generic) + +instance Serialise (RefChanValidateData e) + +instance ( Serialise (PubKey 'Sign (Encryption e)) + , Serialise (Nonce (RefChanValidate e)) ) + => Serialise (RefChanValidate e) + +instance (ForRefChans e, Pretty (AsBase58 (Nonce (RefChanValidate e)))) => Pretty (RefChanValidate e) where + pretty (RefChanValidate n c d) = case d of + Validate r -> pretty "validate" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r + Accepted r -> pretty "accepted" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r + Rejected r -> pretty "rejected" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) <+> pretty r + Poke -> pretty "poke" <+> pretty (AsBase58 n) <+> pretty (AsBase58 c) + -- FIXME: rename data RefChanAdapter e m = RefChanAdapter { refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () , refChanSubscribed :: RefChanId e -> m Bool , refChanWriteTran :: HashRef -> m () + , refChanValidatePropose :: RefChanId e -> HashRef -> m Bool } class HasRefChanId e p | p -> e where @@ -279,6 +311,13 @@ instance HasRefChanId e (RefChanRequest e) where RefChanRequest c -> c RefChanResponse c _ -> c +instance HasRefChanId e (RefChanNotify e) where + getRefChanId = \case + Notify c _ -> c + +instance HasRefChanId e (RefChanValidate e) where + getRefChanId = rcvChan + refChanHeadProto :: forall e s m . ( MonadIO m , Request e (RefChanHead e) m , Request e (BlockAnnounce e) m @@ -338,6 +377,7 @@ refChanHeadProto self adapter msg = do refChanUpdateProto :: forall e s m . ( MonadIO m + , MonadUnliftIO m , Request e (RefChanUpdate e) m , Response e (RefChanUpdate e) m , HasDeferred e (RefChanUpdate e) m @@ -422,15 +462,36 @@ refChanUpdateProto self pc adapter msg = do let pips = view refChanHeadPeers headBlock - guard $ checkACL headBlock peerKey authorKey + guard $ checkACL headBlock (Just peerKey) authorKey debug $ "OMG!!! TRANS AUTHORIZED" <+> pretty (AsBase58 peerKey) <+> pretty (AsBase58 authorKey) + -- TODO: validate-transaction + -- итак, как нам валидировать транзакцию? + -- HTTP ? + -- TCP ? + -- UDP ? (кстати, годно и быстро) + -- CLI ? + -- получается, риалтайм: ждём не более X секунд валидации, + -- иначе не валидируем. + -- по хорошему, не блокироваться бы в запросе. + -- тут мы зависим от состояния конвейра, нас можно DDoS-ить + -- большим количеством запросов и транзакции будут отклоняться + -- при большом потоке. + -- но решается это.. тадам! PoW! подбором красивых хэшей + -- при увеличении нагрузки. + -- тогда, правда, в открытой системе работает паттерн -- DDoS + -- всех, кроме своих узлов, а свои узлы всё принимают. + + -- для начала: сделаем хук для валидации, которыйне будет делать ничего + -- если не смогли сохранить транзу, то и Accept разослать -- не сможем -- это правильно, так как транза содержит ссылку на RefChanId -- следовательно, для другого рефчана будет другая транза + hash <- MaybeT $ liftIO $ putBlock sto (serialise msg) + ts <- liftIO getTimeCoarse let toWait = TimeoutSec (fromIntegral $ 2 * view refChanHeadWaitAccept headBlock) @@ -450,14 +511,31 @@ refChanUpdateProto self pc adapter msg = do lift $ update defRound rcrk id lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk) + -- не обрабатывать propose, если он уже в процессе + guard (isNothing rndHere) + + -- FIXME: fixed-timeout-is-no-good + validated <- either id id <$> lift ( race (pause @'Seconds 5 >> pure False) + $ refChanValidatePropose adapter chan (HashRef hash) + ) + -- почему так: + -- мы можем тормозить в проверке транзакции, + -- другие пиры могут работать быстрее и от них + -- может прийти accept. + -- так что раунд всё равно нужно завести, + -- даже если транза не очень. + + unless validated do + maybe1 rndHere none $ \rnd -> do + atomically $ writeTVar (view refChanRoundClosed rnd) True + liftIO $ delBlock sto hash + + guard validated + + debug $ "TRANS VALIDATED" <+> pretty (AsBase58 chan) <+> pretty hash + lift $ gossip msg - -- FIXME: random-delay-to-avoid-race - -- выглядит не очень хорошо, 100ms - -- и не гарантирует от гонок - -- pause @'Seconds 0.25 - - -- FIXME: check-if-we-authorized -- проверить, что мы вообще авторизованы -- рассылать ACCEPT @@ -480,11 +558,21 @@ refChanUpdateProto self pc adapter msg = do Accept chan box -> deferred proto do + -- что если получили ACCEPT раньше PROPOSE ? + -- что если PROPOSE еще обрабатывается? + -- надо, короче, блокироваться и ждать тут Propose + -- но если блокироваться --- то конвейр вообще + -- может встать. что делать? + -- + debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 (peerKey, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box let refchanKey = RefChanHeadKey @s chan + + headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + h <- MaybeT $ liftIO $ getRef sto refchanKey guard (HashRef h == headRef) @@ -495,68 +583,91 @@ refChanUpdateProto self pc adapter msg = do -- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы -- почти одновременно. ну или не успело записаться. и что делать? - here <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust + -- вот прямо тут надо ждать, пока придёт / завершится Propose + -- -- или до таймаута - unless here do - warn $ "No propose transaction saved yet!" <+> pretty hashRef + let afterPropose = runMaybeT do - tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) + here <- fix \next -> do + blk <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust + if blk then + pure blk + else do + pause @'Seconds 0.25 + next - tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just + unless here do + warn $ "No propose transaction saved yet!" <+> pretty hashRef - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) - proposed <- MaybeT $ pure $ case tran of - Propose _ pbox -> Just pbox - _ -> Nothing + tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just - (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed + proposed <- MaybeT $ pure $ case tran of + Propose _ pbox -> Just pbox + _ -> Nothing - debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 - -- compiler bug? - let (ProposeTran _ pbox) = ptran + (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed - (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox + debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 - -- может, и не надо второй раз проверять - guard $ checkACL headBlock peerKey authorKey + -- compiler bug? + let (ProposeTran _ pbox) = ptran - debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef + (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox - rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id + -- может, и не надо второй раз проверять + guard $ checkACL headBlock (Just peerKey) authorKey - atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) + debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef - -- TODO: garbage-collection-strongly-required - ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) + rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id - atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) - -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? + atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) - accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size + -- TODO: garbage-collection-strongly-required + ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) - debug $ "ACCEPTS:" <+> pretty accepts + atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) + -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? - closed <- readTVarIO (view refChanRoundClosed rcRound) + accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size - -- FIXME: round! - when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do - debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef + -- FIXME: why-accepts-quorum-on-failed-proposal? - trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList + debug $ "ACCEPTS:" <+> pretty accepts - forM_ trans $ \t -> do - lift $ refChanWriteTran adapter t - debug $ "WRITING TRANS" <+> pretty t + closed <- readTVarIO (view refChanRoundClosed rcRound) - let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList - votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys + -- FIXME: round! + when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do + debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef - when (pips `HashSet.isSubsetOf` votes) do - debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans) - atomically $ writeTVar (view refChanRoundClosed rcRound) True + trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList + + forM_ trans $ \t -> do + lift $ refChanWriteTran adapter t + debug $ "WRITING TRANS" <+> pretty t + + let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList + votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys + + debug $ "PIPS" <+> pretty (HashSet.toList pips & fmap AsBase58) + debug $ "VOTES" <+> pretty (HashSet.toList votes & fmap AsBase58) + + when (pips `HashSet.isSubsetOf` votes) do + debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans) + atomically $ writeTVar (view refChanRoundClosed rcRound) True + + -- мы не можем ждать / поллить в deferred потому, + -- что мы так забьем конвейр - там сейчас всего 8 + -- воркеров, и 8 параллельных ждущих запросов + -- все остановят. + + let w = TimeoutSec (realToFrac $ view refChanHeadWaitAccept headBlock) + void $ lift $ race ( pause (2 * w) ) afterPropose where proto = Proxy @(RefChanUpdate e) @@ -564,15 +675,15 @@ refChanUpdateProto self pc adapter msg = do checkACL :: forall e s . (Encryption e ~ s, ForRefChans e) => RefChanHeadBlock e - -> PubKey 'Sign s + -> Maybe (PubKey 'Sign s) -> PubKey 'Sign s -> Bool -checkACL theHead peerKey authorKey = match +checkACL theHead mbPeerKey authorKey = match where pips = view refChanHeadPeers theHead aus = view refChanHeadAuthors theHead - match = peerKey `HashMap.member` pips + match = maybe True (`HashMap.member` pips) mbPeerKey && authorKey `HashSet.member` aus -- TODO: refchan-poll-proto @@ -641,14 +752,18 @@ refChanRequestProto self adapter msg = do refChanNotifyProto :: forall e s m . ( MonadIO m , Request e (RefChanNotify e) m + , Response e (RefChanNotify e) m + , HasRefChanId e (RefChanNotify e) , HasDeferred e (RefChanNotify e) m , HasGossip e (RefChanNotify e) m , IsPeerAddr e m , Pretty (Peer e) + , Sessions e (RefChanHeadBlock e) m , Sessions e (KnownPeer e) m , HasStorage m , Signatures s , IsRefPubKey s + , ForRefChans e , Pretty (AsBase58 (PubKey 'Sign s)) , s ~ Encryption e ) @@ -657,11 +772,38 @@ refChanNotifyProto :: forall e s m . ( MonadIO m -> RefChanNotify e -> m () -refChanNotifyProto _ _ _ = do +refChanNotifyProto self adapter msg@(Notify rchan box) = do -- аутентифицируем -- проверяем ACL -- пересылаем всем - pure () + + peer <- thatPeer proto + + auth <- find (KnownPeerKey peer) id <&> isJust + + void $ runMaybeT do + + guard =<< lift (refChanSubscribed adapter rchan) + + guard (self || auth) + + (authorKey, bs) <- MaybeT $ pure $ unboxSignedBox0 box + + let refchanKey = RefChanHeadKey @s rchan + headBlock <- MaybeT $ getActualRefChanHead @e refchanKey + + guard $ checkACL headBlock Nothing authorKey + + -- теперь пересылаем по госсипу + lift $ gossip msg + + trace $ "refChanNotifyProto" <+> pretty (BS.length bs) + + -- тут надо заслать во внешнее приложение, + -- равно как и в остальных refchan-протоколах + + where + proto = Proxy @(RefChanNotify e) getActualRefChanHead :: forall e s m . ( MonadIO m @@ -788,3 +930,8 @@ instance ForRefChans e => Pretty (RefChanHeadBlock e) where +-- FIXME: reconnect-validator-client-after-restart +-- почему-то сейчас если рестартовать пира, +-- но не растартовать валидатор --- то не получится +-- повторно соединиться с валидатором. + diff --git a/hbs2-peer/app/CLI/RefChan.hs b/hbs2-peer/app/CLI/RefChan.hs index 83a63b25..c21ad6c4 100644 --- a/hbs2-peer/app/CLI/RefChan.hs +++ b/hbs2-peer/app/CLI/RefChan.hs @@ -22,6 +22,7 @@ import Data.Maybe pRefChan :: Parser (IO ()) pRefChan = hsubparser ( command "head" (info pRefChanHead (progDesc "head commands" )) <> command "propose" (info pRefChanPropose (progDesc "post propose transaction")) + <> command "notify" (info pRefChanNotify (progDesc "post notify message")) <> command "fetch" (info pRefChanFetch (progDesc "fetch and sync refchan value")) <> command "get" (info pRefChanGet (progDesc "get refchan value")) ) @@ -119,6 +120,23 @@ pRefChanPropose = do else do runRpcCommand opts (REFCHANPROPOSE (puk, serialise box)) +pRefChanNotify :: Parser (IO ()) +pRefChanNotify = do + opts <- pRpcCommon + kra <- strOption (long "author" <> short 'a' <> help "author credentials") + fn <- optional $ strOption (long "file" <> short 'f' <> help "file") + sref <- strArgument (metavar "REFCHAH-REF") + pure do + sc <- BS.readFile kra + puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" + creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile sc)) `orDie` "bad keyring file" + + lbs <- maybe1 fn LBS.getContents LBS.readFile + + let box = makeSignedBox @L4Proto @BS.ByteString (view peerSignPk creds) (view peerSignSk creds) (LBS.toStrict lbs) + + runRpcCommand opts (REFCHANNOTIFY (puk, serialise box)) + pRefChanGet :: Parser (IO ()) pRefChanGet = do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 2c37bdaa..df01d834 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -621,6 +621,7 @@ runPeer opts = U.handle (\e -> myException e { refChanOnHead = refChanOnHeadFn rce , refChanSubscribed = isPolledRef @e brains , refChanWriteTran = refChanWriteTranFn rce + , refChanValidatePropose = refChanValidateTranFn @e rce } let pexFilt pips = do @@ -1131,14 +1132,22 @@ runPeer opts = U.handle (\e -> myException e box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just proposed <- MaybeT $ makeProposeTran @e pc puk box - debug $ "PROPOSAL:" <+> pretty (LBS.length (serialise proposed)) - lift $ broadCastMessage (Propose @e puk proposed) + -- debug $ "PROPOSAL:" <+> pretty (LBS.length (serialise proposed)) + -- lift $ broadCastMessage (Propose @e puk proposed) -- FIXME: remove-this-debug-stuff -- или оставить? нода будет сама себе -- консенсус слать тогда. может, и оставить lift $ runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed) + let refChanNotifyAction (puk, lbs) = do + trace "refChanNotifyAction" + void $ liftIO $ async $ withPeerM penv $ do + me <- ownPeer @e + runMaybeT do + box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just + lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (Notify @e puk box) + let refChanGetAction puk = do trace $ "refChanGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) @@ -1182,6 +1191,7 @@ runPeer opts = U.handle (\e -> myException e , rpcOnRefChanGetAnsw = dontHandle -- rpcOnRefChanGetAnsw , rpcOnRefChanPropose = refChanProposeAction + , rpcOnRefChanNotify = refChanNotifyAction } dialReqProtoAdapter <- do diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index cf992a70..12f806f9 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -69,6 +69,7 @@ data RPCCommand = | REFCHANFETCH (PubKey 'Sign (Encryption L4Proto)) | REFCHANGET (PubKey 'Sign (Encryption L4Proto)) | REFCHANPROPOSE (PubKey 'Sign (Encryption L4Proto), ByteString) + | REFCHANNOTIFY (PubKey 'Sign (Encryption L4Proto), ByteString) data RPC e = RPCDie @@ -99,6 +100,7 @@ data RPC e = | RPCRefChanGetAnsw (Maybe (Hash HbSync)) | RPCRefChanPropose (PubKey 'Sign (Encryption e), ByteString) + | RPCRefChanNotify (PubKey 'Sign (Encryption e), ByteString) deriving stock (Generic) @@ -155,6 +157,7 @@ data RpcAdapter e m = , rpcOnRefChanGetAnsw :: Maybe (Hash HbSync) -> m () , rpcOnRefChanPropose :: (PubKey 'Sign (Encryption e), ByteString) -> m () + , rpcOnRefChanNotify :: (PubKey 'Sign (Encryption e), ByteString) -> m () } newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } @@ -224,6 +227,7 @@ rpcHandler adapter = \case (RPCRefChanFetch s) -> rpcOnRefChanFetch adapter s (RPCRefChanPropose s) -> rpcOnRefChanPropose adapter s + (RPCRefChanNotify s) -> rpcOnRefChanNotify adapter s data RPCOpt = RPCOpt @@ -258,6 +262,7 @@ runRpcCommand opt = \case REFCHANFETCH s -> withRPC opt (RPCRefChanFetch s) REFCHANPROPOSE s -> withRPC opt (RPCRefChanPropose s) + REFCHANNOTIFY s -> withRPC opt (RPCRefChanNotify s) _ -> pure () @@ -323,6 +328,8 @@ withRPC o cmd = rpcClientMain o $ runResourceT do , rpcOnRefChanGetAnsw = (liftIO . putMVar rchangetMVar) , rpcOnRefChanPropose = dontHandle + + , rpcOnRefChanNotify = dontHandle } prpc <- async $ runRPC udp1 do @@ -426,6 +433,10 @@ withRPC o cmd = rpcClientMain o $ runResourceT do pause @'Seconds 0.25 exitSuccess + RPCRefChanNotify{} -> liftIO do + pause @'Seconds 0.25 + exitSuccess + _ -> pure () void $ liftIO $ waitAnyCancel [proto] diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index e71dee61..f996fb49 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -6,6 +6,7 @@ module RefChan ( , refChanWorkerEnvDownload , refChanOnHeadFn , refChanWriteTranFn + , refChanValidateTranFn , refChanWorker , refChanWorkerEnv , refChanNotifyOnUpdated @@ -15,18 +16,18 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 -import HBS2.Merkle import HBS2.Clock -import HBS2.Events -import HBS2.Net.Proto.Peer -import HBS2.Net.Proto.Sessions import HBS2.Data.Detect import HBS2.Data.Types.Refs -import HBS2.Net.Auth.Credentials -import HBS2.Net.Proto -import HBS2.Net.Proto.RefChan -import HBS2.Net.Proto.Definition() +import HBS2.Events import HBS2.Merkle +import HBS2.Net.Auth.Credentials +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto +import HBS2.Net.Proto.Definition() +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.RefChan +import HBS2.Net.Proto.Sessions import HBS2.Storage import HBS2.System.Logger.Simple @@ -36,23 +37,27 @@ import PeerConfig import BlockDownload import Brains +import Codec.Serialise +import Control.Concurrent.STM (flushTQueue) import Control.Exception () import Control.Monad.Except (throwError, runExceptT) import Control.Monad.Reader import Control.Monad.Trans.Maybe -import Control.Concurrent.STM (flushTQueue) import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.Coerce import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.HashSet (HashSet) import Data.HashSet qualified as HashSet +import Data.Heap () +-- import Data.Heap qualified as Heap import Data.List qualified as List import Data.Maybe +import Data.Text qualified as Text import Lens.Micro.Platform --- import Data.Heap qualified as Heap -import Data.Heap () -import Codec.Serialise import UnliftIO import Streaming.Prelude qualified as S @@ -66,13 +71,21 @@ instance Exception DataNotReady type OnDownloadComplete = HashRef -> IO () +data RefChanValidator = + RefChanValidator + { rcvInbox :: TQueue (RefChanValidate UNIX, MVar (RefChanValidate UNIX)) + , rcvAsync :: Async () + } + data RefChanWorkerEnv e = RefChanWorkerEnv - { _refChanWorkerEnvDEnv :: DownloadEnv e + { _refChanWorkerConf :: PeerConfig + , _refChanWorkerEnvDEnv :: DownloadEnv e , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete))) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) , _refChanWorkerEnvWriteQ :: TQueue HashRef + , _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator) } makeLenses 'RefChanWorkerEnv @@ -82,11 +95,12 @@ refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) -> DownloadEnv e -> m (RefChanWorkerEnv e) -refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTQueueIO - +refChanWorkerEnv conf de = liftIO $ RefChanWorkerEnv @e conf de + <$> newTQueueIO + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTQueueIO + <*> newTVarIO mempty refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do @@ -97,6 +111,34 @@ refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m () refChanWriteTranFn env href = do atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href +refChanValidateTranFn :: forall e m . ( MonadUnliftIO m + , ForRefChans e, e ~ L4Proto + , HasNonces (RefChanValidate UNIX) m + ) + => RefChanWorkerEnv e + -> RefChanId e + -> HashRef + -> m Bool + +refChanValidateTranFn env chan htran = do + mbv <- readTVarIO (view refChanWorkerValidators env) <&> HashMap.lookup chan + -- отправить запрос в соответствующий... что? + -- ждать ответа + debug $ "VALIDATE TRAN" <+> pretty (AsBase58 chan) <+> pretty htran + + r <- maybe1 mbv (pure True) $ \(RefChanValidator q _) -> do + answ <- newEmptyMVar + nonce <- newNonce @(RefChanValidate UNIX) + atomically $ writeTQueue q (RefChanValidate nonce chan (Validate @UNIX htran), answ) + withMVar answ $ \msg -> case rcvData msg of + Accepted{} -> pure True + _ -> pure False + + + debug $ "TRANS VALIDATION RESULT: " <+> pretty htran <+> pretty r + + pure r + -- FIXME: leak-when-block-never-really-updated refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m () refChanNotifyOnUpdated env chan = do @@ -110,6 +152,7 @@ refChanAddDownload :: forall e m . ( m ~ PeerM e IO -> HashRef -> OnDownloadComplete -> m () + refChanAddDownload env chan r onComlete = do penv <- ask t <- getTimeCoarse @@ -144,6 +187,143 @@ readLog sto (HashRef h) = Left{} -> pure () Right (hrr :: [HashRef]) -> S.each hrr +data ValidateEnv = + ValidateEnv + { _validateClient :: Fabriq UNIX + , _validateSelf :: Peer UNIX + } + +newtype ValidateProtoM m a = PingPongM { fromValidateProto :: ReaderT ValidateEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadUnliftIO + , MonadReader ValidateEnv + , MonadTrans + ) + + +runValidateProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> ValidateProtoM m a -> m a +runValidateProtoM tran m = runReaderT (fromValidateProto m) (ValidateEnv (Fabriq tran) (msgUnixSelf tran)) + + +instance Monad m => HasFabriq UNIX (ValidateProtoM m) where + getFabriq = asks _validateClient + +instance Monad m => HasOwnPeer UNIX (ValidateProtoM m) where + ownPeer = asks _validateSelf + + +refChanValidateProto :: forall e m . ( MonadIO m + , Request e (RefChanValidate e) m + , Response e (RefChanValidate e) m + , e ~ UNIX + ) + => Cache (RefChanValidateNonce e) (MVar (RefChanValidate e)) + -> RefChanValidate e + -> m () + +refChanValidateProto waiters msg = do + debug $ "GOT ANSWER FROM VALIDATOR" <+> pretty msg + case rcvData msg of + Accepted h -> emitAnswer h msg + Rejected h -> emitAnswer h msg + _ -> none + + where + emitAnswer h m = liftIO do + debug $ "EMIT ANSWER" <+> pretty h + mbAnsw <- Cache.lookup waiters (rcvNonce m) + maybe1 mbAnsw none $ \answ -> do + putMVar answ m + + +refChanWorkerInitValidators :: forall e m . ( MonadIO m + , MonadUnliftIO m + -- , MyPeer e + -- , ForRefChans e + -- , ForRefChans UNIX + -- , m ~ PeerM e IO + , e ~ L4Proto + ) + => RefChanWorkerEnv e + -> m () + + +refChanWorkerInitValidators env = do + debug "refChanWorkerInitValidators" + + let (PeerConfig syn) = view refChanWorkerConf env + + let validators = [ mkV rc x | ListVal [ SymbolVal "validate" + , SymbolVal "refchan" + , LitStrVal rc + , ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ] + ] <- syn + ] & catMaybes + + + forM_ validators $ \(rc, sa) -> do + debug $ "** VALIDATOR FOR" <+> pretty (AsBase58 rc, sa) + + here <- readTVarIO (_refChanWorkerValidators env) <&> HashMap.member rc + + unless here do + q <- newTQueueIO + val <- async $ validatorThread rc sa q + let rcv = RefChanValidator q val + atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) + + where + + mkV :: Text -> Text -> Maybe (RefChanId e, String) + mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) + + -- FIXME: make-thread-respawning + validatorThread chan sa q = liftIO do + client <- newMessagingUnix False 1.0 sa + msg <- async $ runMessagingUnix client + + -- FIXME: hardcoded-timeout + waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) + + runValidateProtoM client do + + poke <- async $ forever do + pause @'Seconds 10 + mv <- newEmptyMVar + nonce <- newNonce @(RefChanValidate UNIX) + atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) + + z <- async $ runProto + [ makeResponse (refChanValidateProto waiters) + ] + + forever do + (req, answ) <- atomically $ readTQueue q + case rcvData req of + Validate href -> do + debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa + liftIO $ Cache.insert waiters (rcvNonce req) answ + let pa = fromString sa + request pa req + + Poke{} -> do + debug "DO SEND POKE" + let pa = fromString sa + request pa req + + _ -> pure () + + + (_, r) <- waitAnyCatch [z,poke] + debug $ "SOMETHING WRONG:" <+> viaShow r + + cancel msg + warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan) + + refChanWorker :: forall e s m . ( MonadIO m , MonadUnliftIO m , MyPeer e @@ -154,12 +334,13 @@ refChanWorker :: forall e s m . ( MonadIO m , Signatures s , s ~ Encryption e , IsRefPubKey s - , Pretty (AsBase58 (PubKey 'Sign s)) + -- , Pretty (AsBase58 (PubKey 'Sign s)) , ForRefChans e , EventListener e (RefChanRound e) m , EventListener e (RefChanRequest e) m , Sessions e (RefChanRound e) m , m ~ PeerM e IO + , e ~ L4Proto ) => RefChanWorkerEnv e -> SomeBrains e @@ -174,6 +355,11 @@ refChanWorker env brains = do -- FIXME: resume-on-exception hw <- async (refChanHeadMon penv) + -- FIXME: insist-more-during-download + -- что-то частая ситуация, когда блоки + -- с трудом докачиваются. надо бы + -- разобраться. возможно переделать + -- механизм скачивания блоков downloads <- async monitorHeadDownloads polls <- async refChanPoll @@ -186,6 +372,8 @@ refChanWorker env brains = do sto <- getStorage + liftIO $ refChanWorkerInitValidators env + subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val @@ -218,7 +406,7 @@ refChanWorker env brains = do forever do -- FIXME: use-polling-function-and-respect-wait - pause @'Seconds 30 + pause @'Seconds 10 now <- getTimeCoarse xs <- readTVarIO rounds <&> HashSet.toList @@ -516,7 +704,7 @@ logMergeProcess env q = do hd <- MaybeT $ lift $ getHead menv headRef let quo = view refChanHeadQuorum hd & fromIntegral - guard $ checkACL hd pk ak + guard $ checkACL hd (Just pk) ak pure [(href, (quo,mempty))] Accept _ box -> do @@ -559,5 +747,3 @@ logMergeProcess env q = do updateRef sto chanKey nref - - diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 749b3a4f..90907ba1 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -161,6 +161,108 @@ executable test-udp , uniplate , vector + +executable refchan-dummy-validator + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: refchan-dummy-validator + main-is: DummyValidatorMain.hs + + build-depends: + base, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , optparse-applicative + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector + , unliftio + + + +executable test-unix + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestUNIX.hs + + build-depends: + base, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector + + test-suite test-tcp import: shared-properties import: common-deps @@ -598,4 +700,4 @@ executable create-raw-tx -- , transformers -- , uniplate -- , vector - -- , fast-logger \ No newline at end of file + -- , fast-logger diff --git a/hbs2-tests/refchan-dummy-validator/DummyValidatorMain.hs b/hbs2-tests/refchan-dummy-validator/DummyValidatorMain.hs new file mode 100644 index 00000000..4c6c283e --- /dev/null +++ b/hbs2-tests/refchan-dummy-validator/DummyValidatorMain.hs @@ -0,0 +1,167 @@ +module Main where + +import HBS2.Prelude +import HBS2.Base58 +import HBS2.OrDie +import HBS2.Net.Proto.Types +import HBS2.Actors.Peer +import HBS2.Net.Proto.RefChan +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto.Definition() +import HBS2.Net.Auth.Credentials() + +import HBS2.System.Logger.Simple + +import Control.Monad.Reader +import Data.Functor +import Data.List qualified as List +import Options.Applicative hiding (info) +import Options.Applicative qualified as O +import System.Directory +import UnliftIO + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[notice] " + + +data Verdict = DoAccept | DoReject + deriving (Eq,Ord,Show) + +instance Pretty Verdict where + pretty = viaShow + +withLogging :: MonadIO m => m a -> m () +withLogging m = do + setLogging @DEBUG debugPrefix + setLogging @INFO defLog + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + + m + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + +data MyEnv = + MyEnv + { mySelf :: Peer UNIX + , myFab :: Fabriq UNIX + , myChan :: RefChanId UNIX + } + + +newtype App m a = App { fromApp :: ReaderT MyEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader MyEnv + , MonadTrans + ) + +runApp :: (MonadIO m, PeerMessaging UNIX) => MyEnv -> App m a -> m a +runApp env m = runReaderT (fromApp m) env + +instance Monad m => HasFabriq UNIX (App m) where + getFabriq = asks myFab + +instance Monad m => HasOwnPeer UNIX (App m) where + ownPeer = asks mySelf + + +runMe :: String -> FilePath -> Verdict -> IO () +runMe chan' sa verdict = withLogging do + chan <- pure (fromStringMay @(RefChanId UNIX) chan') `orDie` "invalid REFCHAN" + + info $ "I'm dummy refchan validator" <+> pretty (AsBase58 chan) <+> pretty sa <+> pretty verdict + + here <- doesFileExist sa + + when here do + removeFile sa + + server <- newMessagingUnix True 1.0 sa + + abus <- async $ runMessagingUnix server + + let env = MyEnv (fromString sa) (Fabriq server) chan + + runApp env do + debug "BOO" + runProto $ List.singleton $ makeResponse (myProto chan) + + void $ waitAnyCatchCancel [abus] + err "WTF?" + + where + + myProto :: forall e m . ( MonadIO m + , Request e (RefChanValidate e) m + , Response e (RefChanValidate e) m + , e ~ UNIX + ) + => RefChanId e + -> RefChanValidate e + -> m () + + myProto chan msg = do + case rcvData msg of + Poke{} -> debug "poked" + Validate href -> do + debug $ "validate request" <+> pretty (AsBase58 (rcvChan msg)) <+> pretty href + + case verdict of + DoAccept -> do + debug $ "sending accept for" <+> brackets (pretty (AsBase58 (rcvNonce msg))) <+> pretty href + response (RefChanValidate (rcvNonce msg) chan (Accepted @UNIX href)) + + DoReject -> do + debug $ "sending reject for" <+> brackets (pretty (AsBase58 (rcvNonce msg))) <+> pretty href + response (RefChanValidate (rcvNonce msg) chan (Rejected @UNIX href)) + + _ -> pure () + + +main :: IO () +main = join . customExecParser (prefs showHelpOnError) $ + O.info (helper <*> parser) + ( fullDesc + <> header "refchan-dummy-validator" + <> progDesc "for test and demo purposed" + ) + where + parser :: Parser (IO ()) + parser = do + rchan <- strArgument ( metavar "REFCHAN" ) <&> fromString + soname <- strArgument ( metavar "UNIX-SOCKET" ) + + verdict <- accept <|> reject <|> pure DoAccept + + pure $ runMe rchan soname verdict + + accept = do + void $ flag' True ( long "accept" <> short 'y' ) + pure DoAccept + + reject = do + void $ flag' True ( long "reject" <> short 'n' ) + pure DoReject + + + diff --git a/hbs2-tests/test/TestUNIX.hs b/hbs2-tests/test/TestUNIX.hs new file mode 100644 index 00000000..ee168fb7 --- /dev/null +++ b/hbs2-tests/test/TestUNIX.hs @@ -0,0 +1,117 @@ +{-# Language TemplateHaskell #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.Clock +import HBS2.Net.Proto +import HBS2.Net.Messaging.Unix +import HBS2.Actors.Peer +import HBS2.OrDie + +import Codec.Serialise +import Control.Monad.Reader +import Control.Monad.Trans.Resource +import Data.ByteString.Lazy (ByteString) +import Lens.Micro.Platform +import Prettyprinter +import System.FilePath.Posix +import System.IO +import System.IO.Temp +import UnliftIO.Async + + +debug :: (MonadIO m) => Doc ann -> m () +debug p = liftIO $ hPrint stderr p + + +data PingPong e = Ping Int + | Pong Int + deriving stock (Eq,Generic,Show,Read) + + +instance Serialise (PingPong e) + + +instance HasProtocol UNIX (PingPong UNIX) where + type instance ProtocolId (PingPong UNIX) = 1 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +pingPongHandler :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + ) + => Int + -> PingPong e + -> m () + +pingPongHandler n = \case + + Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c) + + Pong c | c < n -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) + | otherwise -> pure () + +data PPEnv = + PPEnv + { _ppSelf :: Peer UNIX + , _ppFab :: Fabriq UNIX + } + +makeLenses 'PPEnv + +newtype PingPongM m a = PingPongM { fromPingPong :: ReaderT PPEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader PPEnv + , MonadTrans + ) + +runPingPong :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> PingPongM m a -> m a +runPingPong tran m = runReaderT (fromPingPong m) (PPEnv (msgUnixSelf tran) (Fabriq tran)) + +instance Monad m => HasFabriq UNIX (PingPongM m) where + getFabriq = asks (view ppFab) + +instance Monad m => HasOwnPeer UNIX (PingPongM m) where + ownPeer = asks (view ppSelf) + +instance HasTimeLimits UNIX (PingPong UNIX) IO where + tryLockForPeriod _ _ = pure True + +main :: IO () +main = do + liftIO $ hSetBuffering stdout LineBuffering + liftIO $ hSetBuffering stderr LineBuffering + + withSystemTempDirectory "test-unix-socket" $ \tmp -> do + + let soname = tmp "unix.socket" + + server <- newMessagingUnix True 1.0 soname + + client <- newMessagingUnix False 1.0 soname + + m1 <- async $ runMessagingUnix server + m2 <- async $ runMessagingUnix client + + p1 <- async $ runPingPong server do + runProto @UNIX + [ makeResponse (pingPongHandler 100000) + ] + + p2 <- async $ runPingPong client do + request (msgUnixSelf server) (Ping @UNIX 0) + runProto @UNIX + [ makeResponse (pingPongHandler 100000) + ] + + (_,r) <- liftIO $ waitAnyCatchCancel [m1,m2,p1,p2] + + debug (viaShow r) + + +