diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 1493aeec..b56ef460 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -105,6 +105,7 @@ library , HBS2.Net.Messaging.UDP , HBS2.Net.Messaging.TCP , HBS2.Net.Messaging.Unix + , HBS2.Net.Messaging.Pipe , HBS2.Net.Messaging.Stream , HBS2.Net.Messaging.Encrypted.RandomPrefix , HBS2.Net.Messaging.Encrypted.ByPass @@ -196,6 +197,7 @@ library , time , transformers , uniplate + , unix , unordered-containers , unliftio , unliftio-core diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs new file mode 100644 index 00000000..4a132315 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Messaging/Pipe.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE NumericUnderscores #-} +module HBS2.Net.Messaging.Pipe where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Types +import HBS2.Actors.Peer.Types +import HBS2.Net.Messaging + +import Control.Concurrent.STM qualified as STM +import Control.Monad.Reader +import Data.ByteString.Builder qualified as B +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.Hashable +import Network.ByteOrder hiding (ByteString) +import System.IO.Unsafe (unsafePerformIO) +import System.Posix.IO +import UnliftIO + +-- define new transport protocol type +data PIPE = PIPE + deriving (Eq,Ord,Show,Generic) + + +-- address for the new protocol +newtype PipeAddr = PipeAddr Handle + deriving newtype (Eq,Show) + +-- the protocol work data +data MessagingPipe = + MessagingPipe + { pipeIn :: Handle + , pipeOut :: Handle + , inQ :: TQueue ByteString + } + +remotePeer :: MessagingPipe -> Peer PIPE +remotePeer = PeerPIPE . PipeAddr . pipeOut + +localPeer :: MessagingPipe -> Peer PIPE +localPeer = PeerPIPE . PipeAddr . pipeIn + +newMessagingPipe :: MonadIO m => (Handle, Handle) -> m MessagingPipe +newMessagingPipe (pIn,pOut) = do + MessagingPipe pIn pOut + <$> newTQueueIO + +instance Hashable PipeAddr where + hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd) + where + fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word) + +instance HasPeer PIPE where + newtype instance Peer PIPE = PeerPIPE { _fromPeerPipe :: PipeAddr } + deriving stock (Eq,Show,Generic) + deriving newtype (Hashable) + +instance Pretty (Peer PIPE) where + pretty (PeerPIPE p) = parens ("pipe" <+> viaShow p) + +-- Messaging definition for protocol +instance Messaging MessagingPipe PIPE ByteString where + + sendTo bus _ _ msg = liftIO do + LBS.hPutStr (pipeOut bus) (B.toLazyByteString frame <> msg) + hFlush (pipeOut bus) + + where + frame = B.word32BE (fromIntegral $ LBS.length msg) + + receive bus _ = do + msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q + for msg $ \m -> pure (From (PeerPIPE (PipeAddr who)), m) + + where + q = inQ bus + who = pipeIn bus + +runMessagingPipe :: MonadIO m => MessagingPipe -> m () +runMessagingPipe bus = liftIO do + fix \next -> do + frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict + piece <- LBS.hGet who (fromIntegral frame) + atomically (writeTQueue (inQ bus) piece) + next + + where + who = pipeIn bus + +instance (MonadIO m, Messaging MessagingPipe PIPE (Encoded PIPE)) + => HasFabriq PIPE (ReaderT MessagingPipe m) where + getFabriq = asks Fabriq + +instance MonadIO m => HasOwnPeer PIPE (ReaderT MessagingPipe m) where + ownPeer = asks localPeer + + diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 424e10a5..6c6c7722 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -947,3 +947,51 @@ executable test-playground , resourcet , text-icu >= 0.8.0.3 + +executable test-pipe-mess + import: shared-properties + default-language: Haskell2010 + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestPipeMessaging.hs + build-depends: + base, hbs2-core + , async + , bytestring + , cache + , containers + , directory + , hashable + , microlens-platform + , mtl + , network-byte-order + , prettyprinter + , QuickCheck + , quickcheck-instances + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-quickcheck + , tasty-hunit + , tasty-quickcheck + , transformers + , uniplate + , vector + , saltine + , simple-logger + , string-conversions + , filepath + , temporary + , unliftio + , unordered-containers + , unix + , timeit + + + diff --git a/hbs2-tests/test/PrototypeGenericService.hs b/hbs2-tests/test/PrototypeGenericService.hs index ea8e62c0..74f1e20a 100644 --- a/hbs2-tests/test/PrototypeGenericService.hs +++ b/hbs2-tests/test/PrototypeGenericService.hs @@ -60,9 +60,9 @@ type instance Output Method2 = () instance MonadIO m => HandleMethod m Method2 where handleMethod _ = pure () -instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m) - => HasDeferred UNIX (ServiceProto api UNIX) m where - deferred _ m = void (async m) +-- instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m) +-- => HasDeferred UNIX (ServiceProto api UNIX) m where +-- deferred m = void (async m) main :: IO () main = do diff --git a/hbs2-tests/test/TestPipeMessaging.hs b/hbs2-tests/test/TestPipeMessaging.hs new file mode 100644 index 00000000..88a0f04a --- /dev/null +++ b/hbs2-tests/test/TestPipeMessaging.hs @@ -0,0 +1,114 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE NumericUnderscores #-} +module Main where + +import HBS2.Prelude.Plated + +import HBS2.Net.Messaging +import HBS2.Net.Messaging.Pipe +import HBS2.Net.Proto.Service +import HBS2.Actors.Peer + +import HBS2.System.Logger.Simple.ANSI + +import Data.ByteString.Lazy (ByteString) +import System.Posix.IO +import UnliftIO +import Control.Monad.Trans.Cont +import Control.Monad.Reader +import Codec.Serialise +import Data.Fixed + +import System.TimeIt + +-- protocol's data +data Ping = + Ping Int + | Pong Int + deriving stock (Eq,Show,Generic) + +instance Pretty Ping where + pretty = viaShow + +instance Serialise Ping + +-- API definition +type MyServiceMethods1 = '[ Ping ] + +-- API endpoint definition +type instance Input Ping = Ping +type instance Output Ping = Maybe Ping + +-- API handler +instance MonadIO m => HandleMethod m Ping where + handleMethod = \case + Ping n -> pure (Just (Pong n)) + Pong _ -> pure Nothing + +-- Codec for protocol +instance HasProtocol PIPE (ServiceProto MyServiceMethods1 PIPE) where + type instance ProtocolId (ServiceProto MyServiceMethods1 PIPE) = 0xDEADF00D1 + type instance Encoded PIPE = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +-- Some "deferred" implementation for our monad +-- note -- plain asyncs may cause to resource leak +instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE)) + => HasDeferred (ServiceProto api PIPE) PIPE m where + deferred m = void (async m) + +mainLoop :: IO () +mainLoop = do + + flip runContT pure do + + -- pipe for server + (i1,o1) <- liftIO $ createPipe + >>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o + + -- pipe for client + (i2,o2) <- liftIO $ createPipe + >>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o + + -- interwire client and server by pipes + server <- newMessagingPipe (i2,o1) + client <- newMessagingPipe (i1,o2) + + -- run messaging workers + void $ ContT $ withAsync $ runMessagingPipe server + void $ ContT $ withAsync $ runMessagingPipe client + + -- make server protocol responder + void $ ContT $ withAsync $ flip runReaderT server do + runProto @PIPE + [ makeResponse (makeServer @MyServiceMethods1) + ] + + -- make client's "caller" + caller <- lift $ makeServiceCaller @MyServiceMethods1 @PIPE (localPeer client) + + -- make client's endpoint worker + void $ ContT $ withAsync $ runReaderT (runServiceClient caller) client + + let n = 20_000 + + (a, _) <- timeItT do + for_ [1..n] $ \i -> do + void $ callService @Ping caller (Ping i) + + debug $ "sent" <+> pretty n <+> "messages in" <+> pretty (realToFrac a :: Fixed E3) <> "sec" + <> line + <> "rps:" <+> pretty (realToFrac n / realToFrac a :: Fixed E2) + +main :: IO () +main = do + + setLogging @DEBUG defLog + mainLoop + `finally` do + setLoggingOff @DEBUG + +