From 71e325f8bc9e9d3e1d0737ebe83d006c76cbceb6 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 28 Jul 2023 15:36:50 +0300 Subject: [PATCH] PR 3nmxU5Ro8b dialog-proto --- .fixme/log | 5 +- Makefile | 20 + hbs2-core/hbs2-core.cabal | 16 + hbs2-core/lib/Dialog/Client.hs | 193 +++++++++ hbs2-core/lib/Dialog/Core.hs | 442 +++++++++++++++++++++ hbs2-core/lib/Dialog/Helpers/List.hs | 19 + hbs2-core/lib/Dialog/Helpers/Streaming.hs | 88 ++++ hbs2-core/lib/HBS2/Actors/Peer.hs | 18 +- hbs2-core/lib/HBS2/Net/Messaging.hs | 2 + hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 14 + hbs2-core/lib/HBS2/Net/Proto/Dialog.hs | 157 ++++++++ hbs2-core/lib/HBS2/OrDie.hs | 3 + hbs2-core/test/DialogSpec.hs | 59 +++ hbs2-core/test/Main.hs | 2 + hbs2-peer/app/PeerMain.hs | 18 +- hbs2-peer/app/PeerMain/DialogCliCommand.hs | 269 +++++++++++++ hbs2-peer/app/PeerMain/PeerDialog.hs | 39 ++ hbs2-peer/app/RPC.hs | 2 + hbs2-peer/hbs2-peer.cabal | 7 + 19 files changed, 1366 insertions(+), 7 deletions(-) create mode 100644 Makefile create mode 100644 hbs2-core/lib/Dialog/Client.hs create mode 100644 hbs2-core/lib/Dialog/Core.hs create mode 100644 hbs2-core/lib/Dialog/Helpers/List.hs create mode 100644 hbs2-core/lib/Dialog/Helpers/Streaming.hs create mode 100644 hbs2-core/lib/HBS2/Net/Proto/Dialog.hs create mode 100644 hbs2-core/test/DialogSpec.hs create mode 100644 hbs2-peer/app/PeerMain/DialogCliCommand.hs create mode 100644 hbs2-peer/app/PeerMain/PeerDialog.hs diff --git a/.fixme/log b/.fixme/log index 1de2b341..30abe5a9 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,5 +1,2 @@ -(fixme-set "workflow" "test" "8ey8Fnr4c4") -fixme-del "7TVJE7H1YY" -fixme-del "DoshYTWtk5" -(fixme-set "workflow" "done" "DgtWWrbatG") +(fixme-set "workflow" "test" "3nmxU5Ro8b") \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..b8d14084 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +SHELL := bash +.ONESHELL: +.SHELLFLAGS := -eu -o pipefail -c +.DELETE_ON_ERROR: +MAKEFLAGS += --warn-undefined-variables +MAKEFLAGS += --no-builtin-rules + +ifeq ($(origin .RECIPEPREFIX), undefined) + $(error This Make does not support .RECIPEPREFIX. Please use GNU Make 4.0 or later) +endif +.RECIPEPREFIX = > + +.PHONY: build +build: +> nix develop -c cabal build all + +.PHONY: test-core +test-core: +> nix develop -c cabal run hbs2-core:test + diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 974c200d..5503512c 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -51,8 +51,10 @@ common shared-properties , ImportQualifiedPost , LambdaCase , MultiParamTypeClasses + , OverloadedLabels , OverloadedStrings , QuasiQuotes + , RankNTypes , RecordWildCards , ScopedTypeVariables , StandaloneDeriving @@ -60,6 +62,7 @@ common shared-properties , TypeApplications , TypeFamilies , TemplateHaskell + , ViewPatterns @@ -96,6 +99,7 @@ library , HBS2.Net.Proto.BlockChunks , HBS2.Net.Proto.BlockInfo , HBS2.Net.Proto.Definition + , HBS2.Net.Proto.Dialog , HBS2.Net.Proto.EncryptionHandshake , HBS2.Net.Proto.Event.PeerExpired , HBS2.Net.Proto.Peer @@ -112,6 +116,10 @@ library , HBS2.Storage , HBS2.System.Logger.Simple , HBS2.System.Logger.Simple.Class + , Dialog.Core + , Dialog.Client + , Dialog.Helpers.List + , Dialog.Helpers.Streaming -- other-modules: @@ -120,6 +128,7 @@ library , aeson , async , attoparsec + , base16-bytestring , base58-bytestring , binary , bytestring @@ -128,11 +137,13 @@ library , clock , containers , cryptonite + , data-default , deepseq , directory , fast-logger , filelock , filepath + , generic-lens , hashable , interpolatedstring-perl6 , iproute @@ -146,6 +157,7 @@ library , network-simple , network-byte-order , prettyprinter + , mwc-random , random , random-shuffle , resourcet @@ -156,6 +168,7 @@ library , split , stm , stm-chans + , string-conversions , streaming , string-conversions , suckless-conf @@ -166,6 +179,7 @@ library , uniplate , unordered-containers , unliftio + , unliftio-core hs-source-dirs: lib @@ -181,6 +195,7 @@ test-suite test -- , TestUniqProtoId , FakeMessaging , HasProtocol + , DialogSpec -- other-extensions: @@ -207,6 +222,7 @@ test-suite test , tasty , tasty-quickcheck , tasty-hunit + , tasty-quickcheck , transformers , uniplate , vector diff --git a/hbs2-core/lib/Dialog/Client.hs b/hbs2-core/lib/Dialog/Client.hs new file mode 100644 index 00000000..5d8a987d --- /dev/null +++ b/hbs2-core/lib/Dialog/Client.hs @@ -0,0 +1,193 @@ +{-# LANGUAGE OverloadedLabels #-} +{-# LANGUAGE StrictData #-} +{-# LANGUAGE ImpredicativeTypes #-} +module Dialog.Client where + +-- import System.Clock +-- import System.Timeout +import Codec.Serialise +import Control.Arrow +import Control.Exception qualified as Exception +import Control.Monad +import Control.Monad.Cont +import Control.Monad.Error.Class +import Control.Monad.Except (ExceptT(..), runExcept, runExceptT) +import Control.Monad.IO.Unlift +import Control.Monad.State.Class as State +import Control.Monad.State.Strict (evalState, evalStateT) +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Builder +import Data.ByteString.Lazy qualified as BSL +import Data.Default +import Data.Generics.Labels +import Data.Generics.Product.Fields +import Data.List qualified as List +import Data.Map.Strict as Map +import Data.String.Conversions (cs, ConvertibleStrings) +import Data.Time +import GHC.Generics(Generic) +import Lens.Micro.Platform +import Streaming as S +import Streaming.Prelude qualified as S +import UnliftIO.Exception +import UnliftIO.STM +import UnliftIO.Timeout + +import Dialog.Core +import Dialog.Helpers.Streaming + +--- + +dQuery_ :: MonadUnliftIO m + => RequestParams + -> DialogClient m peer + -> peer + -> Frames + -> m () +dQuery_ _par dcli peer rq = + withClientQuery dcli & \dialf -> + dialf peer rq' \_flow -> pure () + where + rq' = rq & #unFrames %~ ([serialiseS routerSignature] <>) + +-- +dQuery1 :: (MonadUnliftIO m) + => RequestParams + -> DialogClient m peer + -> peer + -> Frames + -> m Frames + +dQuery1 par dcli peer rq = dQuery' par dcli peer rq \flow -> + either (throwIO . DQuery1Error) (pure . view _2) =<< headEither flow + +data DQuery1Error = DQuery1Error RequestResult + deriving (Show) + +instance Exception DQuery1Error + +-- +dQuery' :: MonadUnliftIO m + => RequestParams + -> DialogClient m peer + -> peer + -> Frames + -> (Stream (Of (ResponseHeader, Frames)) m RequestResult -> m r) + -> m r + +dQuery' par dcli peer rq go = + withClientQuery dcli & \dialf -> do + dialf peer rq' \flow -> go $ + flow + & withEffectsMay RequestTimeout (timeout' (requestParamsTimeout par)) + & S.map decodeHeader + & stopAfterLeftMay (either + (\(merr, xs) -> Left (Nothing, RequestErrorBadResponse merr xs)) + processResponseHeader + ) + + where + + processResponseHeader :: (ResponseHeader, Frames) -> + Either + (Maybe (ResponseHeader, Frames), RequestResult) + (ResponseHeader, Frames) + + processResponseHeader rhxs@(rh, xs) = case ((responseStatusCode . respStatus) rh) of + Success200 -> Left (Just rhxs, RequestDone) + SuccessMore -> Right rhxs + r@BadRequest400 -> Left (Nothing, (RequestFailure r xs)) + r@Forbidden403 -> Left (Nothing, (RequestFailure r xs)) + r@NotFound404 -> Left (Nothing, (RequestFailure r xs)) + + rq' = rq & #unFrames %~ ([serialiseS routerSignature] <>) + +timeout' :: MonadUnliftIO m => NominalDiffTime -> m a -> m (Maybe a) +timeout' = timeout . round . (* 10^6) . nominalDiffTimeToSeconds + +-- +decodeHeader :: Frames -> Either (BadResponse, Frames) (ResponseHeader, Frames) +decodeHeader = evalState do + ex <- runExceptT cutFrameDecode' + xs <- State.get + pure $ ex + & left ((, xs) . maybe ResponseInsufficientFrames ResponseParseError) + & right (, xs) + +data RequestParams = RequestParams + { requestParamsTimeout :: NominalDiffTime + } + deriving (Generic) + +instance Default RequestParams where + def = RequestParams + { requestParamsTimeout = 5 + } + +data DialogClient m p = DialogClient + { withClientQuery :: ClientQuery m p + } + +type ClientQuery m p = forall r . + p + -> Frames + -> (Stream (Of Frames) m RequestResult -> m r) + -> m r + +withClient :: forall m p i r . MonadUnliftIO m + => DClient m p i -> (DialogClient m p -> m r) -> m r +withClient dclient go = do + callerID <- newCallerID + + requestIDtvar <- newTVarIO 1 + + -- У обработчика получателя - своё окружение, в которое мы добавляем + -- обработчики ответов на запросы по requestID + requestResponseEnv <- newRequestResponseEnv + + let withClientQuery' :: ClientQuery m p + withClientQuery' = \pid xs handleStream -> do + requestID <- atomically $ stateTVar requestIDtvar (id &&& succ) + + ch <- newTQueueIO + let useResponse = RequestResponseHandler @m do + atomically . writeTQueue ch + let + -- flow :: Stream (Of Frames) m RequestResult + flow = S.repeatM (atomically (readTQueue ch)) + + bracket_ + (setupRepHandler requestResponseEnv requestID useResponse) + (clearRepHandler requestResponseEnv requestID) + (do + + clientSendProtoRequest dclient pid do + xs & addEnvelope + [ (BSL.toStrict . serialise) callerID + , (BSL.toStrict . serialise) requestID + ] + + handleStream flow + ) + + -- Установить в окружении обработчик получателя с callerID + let callerHandler = CallerHandler $ unFrames >>> \case + + requestIDRaw:xs -> do + case deserialiseOrFail (BSL.fromStrict requestIDRaw) of + Left _ -> + -- Если не нашли, ничего не предпринимать + -- На этот вопрос уже не ждут ответа + pure () + Right requestID -> do + mh <- findRepHandler requestResponseEnv requestID + forM_ mh \(RequestResponseHandler h) -> h (Frames xs) + + _ -> pure () + + bracket_ + (setupCallerEnv (clientCallerEnv dclient) callerID callerHandler) + (clearCallerEnv (clientCallerEnv dclient) callerID) + (go (DialogClient {withClientQuery = withClientQuery'})) + diff --git a/hbs2-core/lib/Dialog/Core.hs b/hbs2-core/lib/Dialog/Core.hs new file mode 100644 index 00000000..d259e89f --- /dev/null +++ b/hbs2-core/lib/Dialog/Core.hs @@ -0,0 +1,442 @@ +{-# LANGUAGE StrictData #-} +-- {-# LANGUAGE OverloadedLists #-} +-- {-# LANGUAGE UndecidableInstances #-} +module Dialog.Core where + +-- import Data.ByteString.Builder as Builder +-- import Data.ByteString.Builder.Internal as Builder +-- import GHC.IsList +import Codec.Serialise +import Control.Arrow +import Control.Monad +import Control.Monad.Error.Class +import Control.Monad.Except (Except(..), ExceptT(..), runExcept, runExceptT) +import Control.Monad.IO.Class +import Control.Monad.State.Class as State +import Control.Monad.State.Strict (evalStateT) +import Control.Monad.Trans.Class +import Control.Monad.Writer qualified as W +import Data.Binary.Get as Get +import Data.Binary.Put as Put +import Data.Bits +import Data.Bool +import Data.ByteArray (ByteArrayAccess) +import Data.ByteArray.Sized as BAS +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy qualified as BSL +import Data.Foldable +import Data.Foldable as F +import Data.Function +import Data.Generics.Labels +import Data.Generics.Product.Fields +import Data.List qualified as List +import Data.List.NonEmpty qualified as NE +import Data.Map.Strict as Map +import Data.Maybe +import Data.String.Conversions as X (cs) +import Data.Text (Text) +import Data.Word +import GHC.Exts +import GHC.Generics (Generic) +import Lens.Micro.Platform +import Numeric.Natural +import System.Random.MWC +import UnliftIO.Exception +import UnliftIO.STM + +-- import Prettyprinter +-- import HBS2.Base58 +import Data.ByteString.Base16 qualified as B16 + +import Dialog.Helpers.List + +type Frames = Frames' ByteString +newtype Frames' a = Frames { unFrames :: [a] } + deriving stock (Generic,Eq) + deriving newtype (Functor, Foldable, Semigroup, Monoid + -- , IsList + ) + + +instance Show Frames where + show (Frames xs) = "Frames " <> show (BS.length <$> xs) + -- <> " " <> show (fmap B16.encode xs) + <> " " <> (show . fmap (limitSize 42)) xs + + where + limitSize n as = bool as (BS.take (n-3) as <> "...") (BS.length as > n) + +framesBodyPart :: Traversal' Frames [ByteString] +framesBodyPart = #unFrames . tailAfterP (== "") + +tailAfterP :: forall a . (a -> Bool) -> Traversal' [a] [a] +tailAfterP p focus = fix \go -> \case + x:xs -> (x :) <$> bool go focus (p x) xs + xs -> pure xs + +--- + +-- encodeFrames :: Frames -> ByteString +encodeFrames :: Foldable t => t ByteString -> ByteString +encodeFrames = F.toList >>> BSL.toStrict . runPut . \case + + [] -> pure () + + xss -> flip fix xss \go -> \case + [] -> pure () + bs:xs -> do + let (flip shiftR 1 -> n1, ns) = unfoldSizeBytes @Word64 . flip shiftL 1 . fromIntegral . BS.length $ bs + + putWord8 $ n1 + & (bool (sbit 7) id (List.null xs)) + & (bool (sbit 6) id (List.null ns)) + + forM_ (markMore ns) \(n, isMoreBytesInSize) -> do + putWord8 $ n & bool (zbit 7) (sbit 7) isMoreBytesInSize + + putByteString bs + + go xs + + where + + markMore as = zip as ((True <$ List.drop 1 as) <> [False]) + + unfoldSizeBytes :: (Bits n, Integral n) => n -> (Word8, [Word8]) + unfoldSizeBytes = (\(a NE.:| as) -> (a, as)) . NE.unfoldr \w -> + ( (flip shiftR 1 . flip shiftL 1 . fromIntegral) w + , let w' = shiftR w 7 + in bool Nothing (Just w') (w' > 0) + ) + +decodeFrames :: MonadError String m => ByteString -> m Frames +decodeFrames = \case + "" -> pure mempty + + bs' -> (bs' &) $ BSL.fromStrict >>> either (throwError . view _3) (pure . Frames . view _3) + <$> runGetOrFail do + + fix \go -> do + + j <- getWord8 + + size <- + flip fix (6, j) \fu (b, j') -> do + let n = (fromIntegral . clearLeftBits (8-b)) j' + if (tbit b j') + then (n +) . flip shiftL b <$> (fu . (7, ) =<< getWord8) + else pure n + + bs <- getByteString size + + let moreFrames = tbit 7 j + + if moreFrames + then (bs :) <$> go + else pure [bs] + + where + clearLeftBits n = flip shiftR n . flip shiftL n + tbit = flip testBit + + +devDialogCore :: IO () +devDialogCore = do + display (Frames []) + display (Frames [""]) + display (Frames [BS.replicate 32 0x55]) + display (Frames [BS.replicate 32 0x55, ""]) + display (Frames [BS.replicate 32 0x55, "\3\3"]) + display (Frames [BS.replicate 33 0x55, "\3\3"]) + display (Frames [BS.replicate 63 0x55]) + display (Frames [BS.replicate 64 0x55]) + -- display (Frames [BS.replicate 65 0x55]) + display (Frames ["\8\8\8","\4\4\4"]) + display (Frames ["","\1"]) + where + display a = do + putStrLn . cs . show . B16.encode . encodeFrames $ a + putStrLn "" + + + + +sbit :: (Bits n) => Int -> n -> n +sbit = flip setBit + +zbit :: (Bits n) => Int -> n -> n +zbit = flip clearBit + +--- + +decodeFramesFail :: (MonadFail m) => ByteString -> m Frames +decodeFramesFail = errorToFail . decodeFrames + +--- + +errorToFailT :: (MonadFail m) => ExceptT String m a -> m a +errorToFailT = either fail pure <=< runExceptT + +errorToFail :: MonadFail m => Except String a -> m a +errorToFail = either fail pure . runExcept + +errorShowToFail :: (MonadFail m, Show s) => Except s a -> m a +errorShowToFail = either (fail . show) pure . runExcept + +-- + +data CallerID = CallerID + { unCallerIDV :: Word8 + , unCallerID :: Word32 + } + deriving stock (Generic, Eq, Ord) + +instance Serialise CallerID + +newCallerID :: forall m. MonadIO m => m CallerID +newCallerID = liftIO $ withSystemRandomST \g -> + CallerID <$> (uniformM g) <*> (uniformM g) + +--- + +newtype CallerHandler m = CallerHandler + { unCallerHandler :: Frames -> m () + } + +newtype CallerEnv m = CallerEnv + { unCallerEnv :: TVar (Map CallerID (CallerHandler m)) } + +newCallerEnv :: MonadIO m => m (CallerEnv m') +newCallerEnv = CallerEnv <$> newTVarIO mempty + +--- + +newtype RequestResponseHandler m = RequestResponseHandler + { unRequestResponseHandler :: Frames -> m () + } + +newtype RequestResponseEnv m = RequestResponseEnv + { unRequestResponseEnv :: TVar (Map RequestID (RequestResponseHandler m)) + } + +newRequestResponseEnv :: MonadIO m => m (RequestResponseEnv m') +newRequestResponseEnv = + RequestResponseEnv <$> newTVarIO mempty + +--- + +data DClient m p i = DClient + { clientCallerEnv :: CallerEnv m + , clientSendProtoRequest :: p -> Frames -> m () + , clientGetKnownPeers :: m [(p, i)] + } + +--- + +newtype RequestID = RequestID { unRequestID :: Word32 } + deriving stock (Generic, Eq, Ord) + deriving newtype (Serialise, Num, Enum) + -- deriving via TODO_GenericVLQ Put Get + +data RequestResult + = RequestDone + -- | RequestSuccessIncomplete + | RequestTimeout + | RequestFailure ResponseStatusCode Frames + | RequestErrorBadResponse BadResponse Frames + deriving stock (Generic, Eq, Show) + +data BadResponse + = ResponseErrorNoResponseHeader + | ResponseInsufficientFrames + | ResponseParseError DeserialiseFailure + deriving stock (Generic, Eq, Show) + +--- + +setupCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> CallerHandler m' -> m () +setupCallerEnv env callerID repHandleEnv = + (atomically . modifyTVar' (unCallerEnv env)) + (at callerID .~ Just repHandleEnv) + +clearCallerEnv :: MonadIO m => CallerEnv m' -> CallerID -> m () +clearCallerEnv env callerID = + (atomically . modifyTVar' (unCallerEnv env)) + (at callerID .~ Nothing) + +findCallerHandler :: MonadIO m => CallerEnv m' -> CallerID -> m (Maybe (CallerHandler m')) +findCallerHandler CallerEnv{..} callerID = + (atomically (readTVar unCallerEnv)) <&> (preview (ix callerID)) + +--- + +setupRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> RequestResponseHandler m' -> m () +setupRepHandler RequestResponseEnv{..} requestID useResponse = + (atomically . modifyTVar' unRequestResponseEnv) + (at requestID .~ Just useResponse) + +clearRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m () +clearRepHandler RequestResponseEnv{..} requestID = + (atomically . modifyTVar' unRequestResponseEnv) + (at requestID .~ Nothing) + +findRepHandler :: MonadIO m => RequestResponseEnv m' -> RequestID -> m (Maybe (RequestResponseHandler m')) +findRepHandler RequestResponseEnv{..} requestID = + (atomically (readTVar unRequestResponseEnv)) <&> (preview (ix requestID)) + +--- + +data DialogRequestEnv m p pd = DialogRequestEnv + { dreqEnvPeer :: p + , dreqEnvGetPeerData :: m pd + } + +-- data DialogRequestError +-- = DialogRequestFailure String +-- deriving stock (Show) +-- instance Exception DialogRequestError + +--- + +newtype DialogRequestRouter m = DialogRequestRouter + { unDialogRequestRouter :: + Map [ByteString] -- path + + -- handler :: Input -> m (Either ErrorMessage (HowToReply -> ResponseContinuation)) + (Frames -> Either Text ((Frames -> m ()) -> m ())) + } + + deriving (Semigroup, Monoid) + +dialogRequestRoutes + :: ListBuilder + ([ByteString], Frames -> Either Text ((Frames -> m ()) -> m ())) + -> DialogRequestRouter m +dialogRequestRoutes = DialogRequestRouter . Map.fromList . buildList + +hand :: Monad m => a -> b -> ListBuilderT m (a, b) +hand = curry li + +--- + +dpath :: Text -> [ByteString] -> Frames +dpath path = Frames . (cs path :) + +--- + +addEnvelope :: Monoid a => [a] -> Frames' a -> Frames' a +addEnvelope en = over #unFrames ((en <> [mempty]) <>) + +splitEnvelope :: (Monoid a, Eq a) => Frames' a -> ([a], Frames' a) +splitEnvelope = fmap (Frames . List.drop 1) . List.break (== mempty) . unFrames + +data ResponseHeader = ResponseHeader + { respStatus :: ResponseStatus + , respSeqNumber :: Int + } + deriving (Generic, Show, Eq) + +instance Serialise ResponseHeader + +data ResponseStatus = ResponseStatus + { responseStatusCode :: ResponseStatusCode + , responseStatusMessage :: Text + } + deriving (Generic, Show, Eq) + +instance Serialise ResponseStatus + +data ResponseStatusCode + = Success200 + | SuccessMore + | BadRequest400 + | Forbidden403 + | NotFound404 + deriving (Generic, Show, Eq) + +instance Serialise ResponseStatusCode + +routerSignature :: Word8 +routerSignature = 1 + +routeDialogRequest :: forall m p pd . + Monad m + => DialogRequestRouter m + -> DialogRequestEnv m p pd + -> (Frames -> m ()) + -> Frames + -> m () +routeDialogRequest router drenv rawReplyToPeer frames = do + erun <- pure $ runExcept $ flip evalStateT req do + + signature <- cutFrameDecode + (ResponseStatus BadRequest400 "No signature in request") + + when (signature /= routerSignature) $ throwError + (ResponseStatus BadRequest400 "Wrong signature in request") + + route <- cutFrameOr + (ResponseStatus BadRequest400 "No route in request") + + h <- fromJustThrowError + (ResponseStatus NotFound404 "Route not found") + (unDialogRequestRouter router ^? ix (BS8.split '/' route)) + + lift . ExceptT . pure + -- Если не может разобрать параметры запроса, + -- то самим ответить этому пиру '404' + . left (ResponseStatus BadRequest400) + . h + -- передать оставшуюся часть запроса в хэндлер + =<< get + + case erun of + Left rs -> replyToPeer (Frames [serialiseS (ResponseHeader rs 0)]) + Right run -> + -- передать хэндлеру продолжение чтобы ответить этому пиру + run replyToPeer + + where + (backPath, req) = splitEnvelope frames + + replyToPeer :: Frames -> m () + replyToPeer = rawReplyToPeer . over #unFrames (backPath <>) + +cutFrameDecode :: (Serialise b, MonadState Frames m, MonadError e m) => e -> m b +cutFrameDecode e = + State.gets unFrames >>= \case + x:xs -> + (either (const (throwError e)) pure . deserialiseOrFailS) x + <* State.put (Frames xs) + _ -> throwError e + +cutFrameDecode' + :: (Serialise b, MonadState Frames m, MonadError (Maybe DeserialiseFailure) m) + => m b +cutFrameDecode' = + State.gets unFrames >>= \case + x:xs -> + (either (throwError . Just) pure . deserialiseOrFailS) x + <* State.put (Frames xs) + _ -> throwError Nothing + +cutFrameOr :: (MonadState (Frames' b) m, MonadError e m) => e -> m b +cutFrameOr e = + State.gets unFrames >>= \case + x:xs -> x <$ State.put (Frames xs) + _ -> throwError e + +serialiseS :: Serialise a => a -> ByteString +serialiseS = BSL.toStrict . serialise + +deserialiseOrFailS :: Serialise a => ByteString -> Either DeserialiseFailure a +deserialiseOrFailS = deserialiseOrFail . BSL.fromStrict + +fromMaybeM :: Applicative m => m a -> Maybe a -> m a +fromMaybeM ma = maybe ma pure + +fromJustThrowError :: MonadError e m => e -> Maybe a -> m a +fromJustThrowError = fromMaybeM . throwError + diff --git a/hbs2-core/lib/Dialog/Helpers/List.hs b/hbs2-core/lib/Dialog/Helpers/List.hs new file mode 100644 index 00000000..b086b2e8 --- /dev/null +++ b/hbs2-core/lib/Dialog/Helpers/List.hs @@ -0,0 +1,19 @@ +module Dialog.Helpers.List where + +import Control.Monad.Trans.Writer.CPS qualified as W +import Data.Functor.Identity +import Data.Monoid + +type ListBuilder a = ListBuilderT Identity a + +type ListBuilderT m a = W.WriterT (Endo [a]) m () + +buildList :: ListBuilder a -> [a] +buildList = runIdentity . buildListT + +buildListT :: Monad m => ListBuilderT m a -> m [a] +buildListT = fmap (flip appEndo []) . W.execWriterT + +li :: Monad m => a -> ListBuilderT m a +li = W.tell . Endo . (:) + diff --git a/hbs2-core/lib/Dialog/Helpers/Streaming.hs b/hbs2-core/lib/Dialog/Helpers/Streaming.hs new file mode 100644 index 00000000..a38be2b6 --- /dev/null +++ b/hbs2-core/lib/Dialog/Helpers/Streaming.hs @@ -0,0 +1,88 @@ +module Dialog.Helpers.Streaming where + +import Control.Monad.Fix +import Data.ByteString qualified as BS +import Data.Int +import Data.Map.Strict qualified as Map +import Data.Maybe +import Data.Set qualified as Set +import Streaming as S +import Streaming.Internal +import Streaming.Prelude (cons) +import Streaming.Prelude qualified as S +import UnliftIO.Async +import UnliftIO.STM +import Prelude hiding (cons) + +withEffects + :: (Functor m, Functor f, s ~ Stream f m r) + => (forall a. m a -> m a) + -> s + -> s +withEffects trans = fix \go -> \case + Return r -> Return r + Effect m -> Effect (trans (fmap go m)) + Step f -> Step (fmap go f) +{-# INLINEABLE withEffects #-} + +withEffectsMay + :: (Monad m, Functor f, s ~ Stream f m r) + => r + -> (forall a. m a -> m (Maybe a)) + -> s + -> s +withEffectsMay d trans = fix \go -> \case + Return r -> Return r + Effect m -> Effect (fromMaybe (Return d) <$> trans (fmap go m)) + Step f -> Step (fmap go f) +{-# INLINEABLE withEffectsMay #-} + +stopOnLeft + :: (Monad m) + => (a -> Either r b) + -> Stream (Of a) m r + -> Stream (Of b) m r +stopOnLeft f = fix \go -> \case + Return r -> Return r + Effect m -> Effect (go <$> m) + Step (a :> s) -> either Return (\b -> Step (b :> go s)) (f a) +{-# INLINEABLE stopOnLeft #-} + +stopAfterLeftMay + :: (Monad m) + => (a -> Either (Maybe b, r) b) + -> Stream (Of a) m r + -> Stream (Of b) m r +stopAfterLeftMay f = fix \go -> \case + Return r -> Return r + Effect m -> Effect (go <$> m) + Step (a :> s) -> either + (\(mb, r) -> maybe + (Return r) + (\b -> Step (b :> Return r)) + mb) + (\b -> Step (b :> go s)) + (f a) +{-# INLINEABLE stopAfterLeftMay #-} + +stopAfter + :: (Monad m) + => (a -> Maybe r) + -> Stream (Of a) m r + -> Stream (Of a) m r +stopAfter f = fix \go -> \case + Return r -> Return r + Effect m -> Effect (go <$> m) + Step (a :> s) -> Step (a :> (maybe (go s) Return (f a))) +{-# INLINEABLE stopAfter #-} + +headEither + :: (Monad m) + => Stream (Of a) m r + -> m (Either r a) +headEither = fix \go -> \case + Return r -> pure (Left r) + Effect ms -> go =<< ms + Step (a :> _) -> pure (Right a) +{-# INLINEABLE headEither #-} + diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index ef0983d5..6fb6737b 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -43,10 +43,10 @@ import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Control.Concurrent.STM.TVar import Control.Concurrent.STM -import Data.Hashable (hash) import UnliftIO (MonadUnliftIO(..)) import Crypto.Saltine.Core.SecretBox qualified as SBox -- Симметричное шифрование с nonce без подписи import Crypto.Saltine.Core.Box qualified as Encrypt -- Асимметричное шифрование без подписи +import Control.Monad.IO.Unlift import Codec.Serialise (serialise, deserialiseOrFail) @@ -128,6 +128,22 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) , handle = h } +-- makeResponse' :: forall e p m . ( MonadIO m +-- , Response e p m +-- , HasProtocol e p +-- , Messaging (Fabriq e) e (AnyMessage (Encoded e) e) +-- ) +-- => (Encoded e -> Maybe p) +-- -> (p -> Encoded e) +-- -> (p -> m ()) +-- -> AnyProtocol e m + +-- makeResponse' dec enc h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) +-- , protoDecode = dec +-- , protoEncode = enc +-- , handle = h +-- } + data PeerEnv e = PeerEnv { _envSelf :: Peer e diff --git a/hbs2-core/lib/HBS2/Net/Messaging.hs b/hbs2-core/lib/HBS2/Net/Messaging.hs index 67bd56dd..b093b3d6 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging.hs @@ -1,4 +1,5 @@ {-# Language FunctionalDependencies #-} +{-# Language UndecidableInstances #-} {-# Language AllowAmbiguousTypes #-} module HBS2.Net.Messaging where @@ -7,6 +8,7 @@ import HBS2.Net.Proto import Control.Monad.IO.Class newtype From a = From (Peer a) +deriving instance Show (Peer a) => Show (From a) newtype To a = To (Peer a) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index d6376143..0b540944 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -14,6 +14,7 @@ import HBS2.Net.Proto import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo +import HBS2.Net.Proto.Dialog import HBS2.Net.Proto.EncryptionHandshake import HBS2.Net.Proto.Peer import HBS2.Net.Proto.PeerAnnounce @@ -27,6 +28,7 @@ import Control.Monad import Data.Functor import Data.ByteString.Lazy (ByteString) import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as BSL import Codec.Serialise (deserialiseOrFail,serialise) import Crypto.Saltine.Core.Box qualified as Crypto @@ -177,6 +179,18 @@ instance HasProtocol L4Proto (RefChanNotify L4Proto) where -- возьмем пока 10 секунд requestPeriodLim = ReqLimPerMessage 10 +instance HasProtocol L4Proto (DialReq L4Proto) where + type instance ProtocolId (DialReq L4Proto) = 96000 + type instance Encoded L4Proto = ByteString + decode = dialReqDecode . BSL.toStrict + encode = BSL.fromStrict . dialReqEncode + +instance HasProtocol L4Proto (DialResp L4Proto) where + type instance ProtocolId (DialResp L4Proto) = 96001 + type instance Encoded L4Proto = ByteString + decode = dialRespDecode . BSL.toStrict + encode = BSL.fromStrict . dialRespEncode + instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs new file mode 100644 index 00000000..76523679 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/Dialog.hs @@ -0,0 +1,157 @@ +{-# Language UndecidableInstances #-} +{-# LANGUAGE StrictData #-} + +module HBS2.Net.Proto.Dialog +( module HBS2.Net.Proto.Dialog +, module Dialog.Core +, module Dialog.Client +) where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Data.Types +import HBS2.Net.Auth.Credentials +import HBS2.Net.Proto +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated hiding (at) +import HBS2.System.Logger.Simple + +import Codec.Serialise (deserialiseOrFail) +import Control.Arrow +import Control.Monad +import Control.Monad.Error.Class +import Control.Monad.IO.Unlift +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Builder +import Data.ByteString.Lazy qualified as BSL +import Data.List qualified as List +import Data.Map.Strict as Map +import Lens.Micro.Platform +import Streaming as S +import Streaming.Prelude qualified as S +import UnliftIO.Exception +import UnliftIO.STM + +import Dialog.Client +import Dialog.Core + +--- + +newtype DialReq e = DialReq { unDialReq :: Frames } + deriving stock (Generic) + +dialReqDecode :: MonadFail m => ByteString -> m (DialReq e) +dialReqDecode = fmap DialReq . decodeFramesFail + +dialReqEncode :: DialReq e -> ByteString +dialReqEncode = \case + DialReq xs -> encodeFrames xs + + +newtype DialResp e = DialResp { unDialResp :: Frames } + deriving stock (Generic) + +dialRespDecode :: MonadFail m => ByteString -> m (DialResp e) +dialRespDecode = fmap DialResp . decodeFramesFail + +dialRespEncode :: DialResp e -> ByteString +dialRespEncode = \case + DialResp xs -> encodeFrames xs + +--- + +data DialogProtoEnv m e = DialogProtoEnv + { dialogProtoEnvCallerEnv :: CallerEnv m + } + +newDialogProtoEnv :: + ( MonadIO m + , Ord (Peer e) + ) => m (DialogProtoEnv m' e) +newDialogProtoEnv = do + dialogProtoEnvCallerEnv <- newCallerEnv + pure DialogProtoEnv {..} + +-- Adapters should share the same env + +data DialReqProtoAdapter e (m :: * -> *) s = DialReqProtoAdapter + { dialReqProtoAdapterRouter :: DialogRequestRouter m + -- , dialReqProtoAdapterEnv :: DialogProtoEnv e + } + +data DialRespProtoAdapter e (m :: * -> *) s = DialRespProtoAdapter + { dialRespProtoAdapterEnv :: DialogProtoEnv m e + } + +--- + +-- | Обрабатывается на стороне сервера +dialReqProto :: forall e s m . + ( MonadIO m + , Response e (DialReq e) m + , Request e (DialResp e) m + -- , Sessions e (KnownPeer e) m + , e ~ L4Proto + ) + => DialReqProtoAdapter e m s + -> DialReq e + -> m () +dialReqProto DialReqProtoAdapter{..} = unDialReq >>> \frames -> do + peer <- thatPeer dialReqProtoProxy + + let dialReqEnv :: DialogRequestEnv m (Peer e) (Maybe (PeerData e)) + dialReqEnv = DialogRequestEnv + { dreqEnvPeer = peer + , dreqEnvGetPeerData = pure Nothing -- undefined -- find (KnownPeerKey peer) id + } + + let replyToPeer :: Frames -> m () + replyToPeer = request peer . DialResp @e + + routeDialogRequest dialReqProtoAdapterRouter dialReqEnv replyToPeer frames + + where + dialReqProtoProxy = Proxy @(DialReq e) + +--- + +-- | Обрабатывает ответы сервера на стороне клиента +dialRespProto :: forall e s m . + ( MonadIO m + , Response e (DialResp e) m + , e ~ L4Proto + ) + => DialRespProtoAdapter e m s + -> DialResp e + -> m () +dialRespProto DialRespProtoAdapter{..} = unDialResp >>> unFrames >>> \case + "": xs -> do + -- Ответили прямо нам сюда. Нужно как-то отреагировать на xs + pure () + + callerIDRaw: xs -> do + + -- Найти в окружении пира, соответствующего callerID, и продолжение для ответа ему + -- Если нашли, передать xs в это продолжение (переслать ответ обратно спрашивавшему) + case deserialiseOrFail (BSL.fromStrict callerIDRaw) of + Left _ -> + -- Если не нашли, ничего не предпринимать + -- Клиент отключился + pure () + + Right callerID -> do + let env = dialogProtoEnvCallerEnv dialRespProtoAdapterEnv + mh <- findCallerHandler env callerID + forM_ mh \(CallerHandler h) -> h (Frames xs) + + pure () + + _ -> do + -- Прислали пустой ответ неизвестно кому? -- Никак не реагировать. + pure () + + where + dialRespProtoProxy = Proxy @(DialResp e) + diff --git a/hbs2-core/lib/HBS2/OrDie.hs b/hbs2-core/lib/HBS2/OrDie.hs index 0833c496..12376b91 100644 --- a/hbs2-core/lib/HBS2/OrDie.hs +++ b/hbs2-core/lib/HBS2/OrDie.hs @@ -8,6 +8,9 @@ class OrDie m a where type family OrDieResult a :: Type orDie :: m a -> String -> m (OrDieResult a) +orDieM :: (Monad m, OrDie m a) => a -> String -> m (OrDieResult a) +orDieM a msg = pure a `orDie` msg + instance MonadIO m => OrDie m (Maybe a) where type instance OrDieResult (Maybe a) = a orDie mv err = mv >>= \case diff --git a/hbs2-core/test/DialogSpec.hs b/hbs2-core/test/DialogSpec.hs new file mode 100644 index 00000000..1a8e8913 --- /dev/null +++ b/hbs2-core/test/DialogSpec.hs @@ -0,0 +1,59 @@ +module DialogSpec where + +import Test.QuickCheck +import Test.Tasty +import Test.Tasty.HUnit +import Test.Tasty.QuickCheck as TastyQ + +import Control.Concurrent.Async +import Control.Monad +import Control.Monad +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import GHC.Generics (Generic) +import Lens.Micro.Platform +import System.IO + +import Dialog.Core +import Dialog.Helpers.List + +newtype BSA = BSA { unBSA :: ByteString } + deriving (Generic, Show) + +instance Arbitrary BSA where + arbitrary = BSA <$> randomSizedByteString + + -- shrink = \case + -- BSA bs | BS.length bs > 1 -> + -- let (bs1, bs2) = BS.splitAt (BS.length bs `div` 2) bs + -- in [BSA bs1, BSA bs2] + -- _ -> [] + + shrink = \case + BSA (BS.uncons -> Just (x, xs)) -> [BSA xs] + _ -> [] + +deriving via [BSA] instance Arbitrary Frames + +randomByteString :: Int -> Gen ByteString +randomByteString n = + vectorOf n arbitrary <&> BS.pack +{-# NOINLINE randomByteString #-} + +randomSizedByteString :: Gen ByteString +randomSizedByteString = do + let low = 0 + let high = 2^13 + size <- choose (low, high) + randomByteString size +{-# NOINLINE randomSizedByteString #-} + +property' name = li . (name, ) . property + +testDialog :: TestTree +testDialog = testGroup "dialog" $ buildList do + li . TastyQ.testProperties "props" $ buildList do + + property' "roundtrip encode Frames" \ xs -> + (decodeFrames . encodeFrames) xs == Right xs + diff --git a/hbs2-core/test/Main.hs b/hbs2-core/test/Main.hs index a5eb12fc..728c3393 100644 --- a/hbs2-core/test/Main.hs +++ b/hbs2-core/test/Main.hs @@ -2,6 +2,7 @@ module Main where import TestFakeMessaging import TestActors +import DialogSpec -- import TestUniqProtoId import TestCrypto @@ -17,6 +18,7 @@ main = , testCase "testActorsBasic" testActorsBasic -- , testCase "testUniqProtoId" testUniqProtoId , testCrypto + , testDialog ] diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 1524bf29..2c37bdaa 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -23,6 +23,7 @@ import HBS2.Net.Messaging.TCP import HBS2.Net.PeerLocator import HBS2.Net.Proto as Proto import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Dialog import HBS2.Net.Proto.EncryptionHandshake import HBS2.Net.Proto.Event.PeerExpired import HBS2.Net.Proto.Peer @@ -54,6 +55,8 @@ import RefLog qualified import RefLog (reflogWorker) import HttpWorker import ProxyMessaging +import PeerMain.DialogCliCommand +import PeerMain.PeerDialog import PeerMeta import CLI.RefChan import RefChan @@ -98,8 +101,8 @@ import Text.InterpolatedString.Perl6 (qc) import UnliftIO.Exception qualified as U -- import UnliftIO.STM import UnliftIO.Async as U -import Control.Monad.Trans.Resource +import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S import Streaming qualified as S @@ -145,7 +148,6 @@ instance HasCfgKey PeerTrace1Key FeatureSwitch where instance HasCfgKey PeerListenKey (Maybe String) where key = "listen" - instance HasCfgKey PeerKeyFileKey (Maybe String) where key = "key" @@ -240,6 +242,8 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ <> command "peers" (info pPeers (progDesc "show known peers")) <> command "pexinfo" (info pPexInfo (progDesc "show pex")) <> command "log" (info pLog (progDesc "set logging level")) + + <> command "dial" (info pDialog (progDesc "dialog commands")) ) confOpt = strOption ( long "config" <> short 'c' <> help "config" ) @@ -705,6 +709,10 @@ runPeer opts = U.handle (\e -> myException e } + -- dialReqProtoAdapter <- do + -- dialReqProtoAdapterRouter <- pure dialogRoutes + -- pure DialReqProtoAdapter {..} + env <- ask pnonce <- peerNonce @e @@ -990,6 +998,7 @@ runPeer opts = U.handle (\e -> myException e , makeResponse (refChanUpdateProto False pc refChanAdapter) , makeResponse (refChanRequestProto False refChanAdapter) , makeResponse (refChanNotifyProto False refChanAdapter) + -- , makeResponse (dialReqProto dialReqProtoAdapter) ] void $ liftIO $ waitAnyCancel workers @@ -1175,9 +1184,14 @@ runPeer opts = U.handle (\e -> myException e , rpcOnRefChanPropose = refChanProposeAction } + dialReqProtoAdapter <- do + dialReqProtoAdapterRouter <- pure dialogRoutes + pure DialReqProtoAdapter {..} + rpc <- async $ runRPC udp1 do runProto @e [ makeResponse (rpcHandler arpc) + , makeResponse (dialReqProto dialReqProtoAdapter) ] menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) diff --git a/hbs2-peer/app/PeerMain/DialogCliCommand.hs b/hbs2-peer/app/PeerMain/DialogCliCommand.hs new file mode 100644 index 00000000..d896752f --- /dev/null +++ b/hbs2-peer/app/PeerMain/DialogCliCommand.hs @@ -0,0 +1,269 @@ +{-# LANGUAGE StrictData #-} + +module PeerMain.DialogCliCommand where + +import Data.Generics.Labels +import Data.Generics.Product.Fields +import HBS2.Actors.Peer +import HBS2.Base58 +import HBS2.Clock +import HBS2.Data.Types.Refs (RefLogKey(..)) +import HBS2.Defaults +import HBS2.Events +import HBS2.Hash +import HBS2.Merkle +import HBS2.Net.Auth.Credentials +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging +import HBS2.Net.Messaging.TCP +import HBS2.Net.Messaging.UDP +import HBS2.Net.PeerLocator +import HBS2.Net.Proto +import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Dialog +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerAnnounce +import HBS2.Net.Proto.PeerExchange +import HBS2.Net.Proto.PeerMeta +import HBS2.Net.Proto.RefLog +import HBS2.Net.Proto.Sessions +import HBS2.Net.Proto.Types +import HBS2.OrDie +import HBS2.Prelude +import HBS2.Prelude.Plated +import HBS2.Storage.Simple +import HBS2.System.Logger.Simple hiding (info) +import HBS2.System.Logger.Simple qualified as Log + +import BlockDownload +import BlockHttpDownload +import Bootstrap +import Brains +import CheckMetrics +import DownloadQ +import HttpWorker +import PeerConfig +import PeerInfo +import PeerMeta +import PeerTypes +import ProxyMessaging +import RefLog (reflogWorker) +import RefLog qualified +import RPC + +import Control.Monad +import Control.Monad.IO.Unlift +import Control.Monad.Reader +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Crypto.Saltine.Core.Box qualified as Encrypt +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.Default +import Data.Function +import Data.Functor +import Data.List qualified as List +import Data.Map.Strict qualified as Map +import Data.Maybe +import Data.Monoid qualified as Monoid +import Data.Set qualified as Set +import Data.String.Conversions as X (cs) +import Data.Void (absurd, Void) +import Lens.Micro.Platform +import Network.Socket +import Options.Applicative +import Streaming as S +import Streaming.Prelude qualified as S +import System.Directory +import UnliftIO.Async +import UnliftIO.Concurrent +import UnliftIO.Exception as U +import UnliftIO.Resource + +-- import System.FilePath.Posix +import System.IO +import System.Exit + + +pDialog :: Parser (IO ()) +pDialog = hsubparser $ mempty + <> command "ping" (info pPing (progDesc "ping hbs2 node via dialog inteface") ) + <> command "debug" (info pDebug (progDesc "debug call different dialog inteface routes") ) + +confOpt :: Parser FilePath +confOpt = strOption ( long "config" <> short 'c' <> help "config" ) + +data OptInitial (f :: * -> *) a b = OptInitial { unOptInitial :: f a } + deriving (Generic, Show) + +data OptResolved (f :: * -> *) a b = OptResolved { unOptResolved :: b } + deriving (Generic, Show) + +type DialOptInitial = DialOpt OptInitial +type DialOptResolved = DialOpt OptResolved + +data DialOpt (f :: (* -> *) -> * -> * -> *) = DialOpt + { dialOptConf :: f Maybe FilePath PeerConfig + , dialOptAddr :: f Maybe String (Peer L4Proto) + } + deriving (Generic) + +deriving instance Show DialOptInitial + +pDialCommon :: Parser DialOptInitial +pDialCommon = do + + dialOptConf <- OptInitial <$> optional do + strOption ( long "config" <> short 'c' <> help "config" ) + + dialOptAddr <- OptInitial <$> optional do + strOption ( short 'r' <> long "dial" <> help "addr:port" ) + + pure DialOpt {..} + +resolveDialOpt :: DialOptInitial -> IO DialOptResolved +resolveDialOpt dopt = do + config <- peerConfigRead (dopt ^. #dialOptConf . #unOptInitial) + + let dialConf = cfgValue @PeerRpcKey config :: Maybe String + + saddr <- (dopt ^. #dialOptAddr . #unOptInitial <|> dialConf) + `orDieM` "Dial endpoint not set" + + as <- parseAddrUDP (cs saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) + peer <- (headMay $ List.sortBy (compare `on` addrPriority) as) + `orDieM` "Can't parse Dial endpoint" + + pure DialOpt + { dialOptConf = OptResolved config + , dialOptAddr = OptResolved peer + } + +pPing :: Parser (IO ()) +pPing = do + dopt <- pDialCommon + pure $ withDial dopt \peer dclient -> + withClient dclient \cli -> do + liftIO . print =<< do + dQuery1 def cli peer (dpath "ping" []) + + +pDebug :: Parser (IO ()) +pDebug = do + dopt <- pDialCommon + + pure $ withDial dopt \peer dclient -> + withClient dclient \cli -> do + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "ping" + liftIO . print =<< do + dQuery' def cli peer (dpath "ping" []) \flow -> do + S.print flow + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "ping1" + liftIO . print =<< do + dQuery1 def cli peer (dpath "ping" []) + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "undefined-route" + liftIO . print =<< do + dQuery' def cli peer (dpath "undefined-rout" []) \flow -> do + S.print flow + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "debug/timeout" + liftIO . print =<< do + dQuery' (def & #requestParamsTimeout .~ 0.1) + cli peer (dpath "debug/timeout" []) \flow -> do + S.print flow + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "debug/no-response-header" + liftIO . print =<< do + dQuery' def cli peer (dpath "debug/no-response-header" []) \flow -> do + S.print flow + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "debug/wrong-header" + liftIO . print =<< do + dQuery' def cli peer (dpath "debug/wrong-header" []) \flow -> do + S.print flow + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "undefined-route-1" + (U.handleAny \e -> liftIO (print e)) do + liftIO . print =<< do + dQuery1 def cli peer (dpath "undefined-route-1" []) + + threadDelay 100 + liftIO $ putStrLn "" + liftIO $ putStrLn "spec" + liftIO . print =<< do + dQuery' def cli peer (dpath "spec" []) \flow -> do + S.print flow + + +evalContT' :: ContT r m Void -> m r +evalContT' = flip runContT absurd + +withDial :: forall e i . + ( e ~ L4Proto + ) + => DialOptInitial + -> ( Peer e + -> DClient (ResponseM e (RpcM (ResourceT IO))) (Peer e) i + -> (ResponseM e (RpcM (ResourceT IO))) () + ) + -> IO () +withDial dopt' cmd = do + dopt <- resolveDialOpt dopt' + setLoggingOff @DEBUG + hSetBuffering stdout LineBuffering + + runResourceT do + udp1 <- newMessagingUDP False Nothing `orDie` "Can't start Dial" + + evalContT' do + + dialProtoEnv :: DialogProtoEnv (ResponseM L4Proto (RpcM (ResourceT IO))) L4Proto + <- newDialogProtoEnv + + amessaging <- ContT $ withAsync do + runMessagingUDP udp1 + + aprotos <- ContT $ withAsync $ runRPC udp1 do + + runProto @e + [ makeResponse do + dialRespProto (DialRespProtoAdapter dialProtoEnv) + ] + + aclient <- ContT $ withAsync $ + runRPC udp1 do + let p = dopt ^. #dialOptAddr . #unOptResolved + runResponseM p $ + cmd p + DClient + { clientCallerEnv = dialogProtoEnvCallerEnv dialProtoEnv + , clientSendProtoRequest = \peer frames -> do + request peer (DialReq @e frames) + + -- , clientGetKnownPeers :: m [(p, i)] + , clientGetKnownPeers = pure [] + } + + ContT \_ -> waitAnyCancel [amessaging, aprotos, aclient] + + pure () + diff --git a/hbs2-peer/app/PeerMain/PeerDialog.hs b/hbs2-peer/app/PeerMain/PeerDialog.hs new file mode 100644 index 00000000..5da04012 --- /dev/null +++ b/hbs2-peer/app/PeerMain/PeerDialog.hs @@ -0,0 +1,39 @@ +module PeerMain.PeerDialog where + +import Control.Monad +import Control.Monad.IO.Class +import Data.Bool +import Data.ByteString qualified as BS +import Data.Map qualified as Map + +import Dialog.Core +import HBS2.Net.Proto.Types + + +dialogRoutes :: forall m . MonadIO m => DialogRequestRouter m +dialogRoutes = dialogRequestRoutes do + + hand ["ping"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus Success200 "") 0), "pong"]) + + hand ["spec"] \req -> Right \reply -> do + let xs = Map.keys (unDialogRequestRouter (dialogRoutes @m)) + + forM_ (zip (zip [1..] xs) ((True <$ drop 1 xs) <> [False])) \((j,x),isMore) -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus (bool Success200 SuccessMore isMore) "") j) + , BS.intercalate "/" x + ]) + + + hand ["debug", "no-response-header"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "one"]) + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 1), "two"]) + reply (Frames []) + + hand ["debug", "wrong-header"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "correct-header"]) + reply (Frames ["wrong-header"]) + + hand ["debug", "timeout"] \req -> Right \reply -> do + reply (Frames [serialiseS (ResponseHeader (ResponseStatus SuccessMore "") 0), "false more"]) + diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 549f3168..cf992a70 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -20,6 +20,7 @@ import HBS2.System.Logger.Simple qualified as Log import PeerConfig +import Control.Monad.IO.Unlift import Codec.Serialise (serialise,deserialiseOrFail) import Control.Applicative import Control.Concurrent.STM @@ -163,6 +164,7 @@ newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } , MonadIO , MonadReader RPCEnv , MonadTrans + , MonadUnliftIO ) runRPC :: ( MonadIO m diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index cccd1203..0f951bdf 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -29,6 +29,7 @@ common common-deps , directory , dns , filepath + , generic-lens , hashable , microlens-platform , mtl @@ -68,9 +69,11 @@ common common-deps , http-types , wai-extra , unliftio + , unliftio-core , unix , heaps , psqueues + , string-conversions common shared-properties ghc-options: @@ -108,7 +111,9 @@ common shared-properties , LambdaCase , MultiParamTypeClasses , OverloadedStrings + , OverloadedLabels , QuasiQuotes + , RankNTypes , RecordWildCards , RecursiveDo , ScopedTypeVariables @@ -130,6 +135,8 @@ executable hbs2-peer , EncryptionKeys , Bootstrap , PeerInfo + , PeerMain.DialogCliCommand + , PeerMain.PeerDialog , PeerMeta , RPC , PeerTypes