wip, messaging+pipes

This commit is contained in:
Dmitry Zuikov 2024-03-23 10:15:46 +03:00
parent 9cde97cb05
commit a892f4ab36
5 changed files with 267 additions and 3 deletions

View File

@ -105,6 +105,7 @@ library
, HBS2.Net.Messaging.UDP , HBS2.Net.Messaging.UDP
, HBS2.Net.Messaging.TCP , HBS2.Net.Messaging.TCP
, HBS2.Net.Messaging.Unix , HBS2.Net.Messaging.Unix
, HBS2.Net.Messaging.Pipe
, HBS2.Net.Messaging.Stream , HBS2.Net.Messaging.Stream
, HBS2.Net.Messaging.Encrypted.RandomPrefix , HBS2.Net.Messaging.Encrypted.RandomPrefix
, HBS2.Net.Messaging.Encrypted.ByPass , HBS2.Net.Messaging.Encrypted.ByPass
@ -196,6 +197,7 @@ library
, time , time
, transformers , transformers
, uniplate , uniplate
, unix
, unordered-containers , unordered-containers
, unliftio , unliftio
, unliftio-core , unliftio-core

View File

@ -0,0 +1,100 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE NumericUnderscores #-}
module HBS2.Net.Messaging.Pipe where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Types
import HBS2.Actors.Peer.Types
import HBS2.Net.Messaging
import Control.Concurrent.STM qualified as STM
import Control.Monad.Reader
import Data.ByteString.Builder qualified as B
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Hashable
import Network.ByteOrder hiding (ByteString)
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.IO
import UnliftIO
-- define new transport protocol type
data PIPE = PIPE
deriving (Eq,Ord,Show,Generic)
-- address for the new protocol
newtype PipeAddr = PipeAddr Handle
deriving newtype (Eq,Show)
-- the protocol work data
data MessagingPipe =
MessagingPipe
{ pipeIn :: Handle
, pipeOut :: Handle
, inQ :: TQueue ByteString
}
remotePeer :: MessagingPipe -> Peer PIPE
remotePeer = PeerPIPE . PipeAddr . pipeOut
localPeer :: MessagingPipe -> Peer PIPE
localPeer = PeerPIPE . PipeAddr . pipeIn
newMessagingPipe :: MonadIO m => (Handle, Handle) -> m MessagingPipe
newMessagingPipe (pIn,pOut) = do
MessagingPipe pIn pOut
<$> newTQueueIO
instance Hashable PipeAddr where
hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd)
where
fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word)
instance HasPeer PIPE where
newtype instance Peer PIPE = PeerPIPE { _fromPeerPipe :: PipeAddr }
deriving stock (Eq,Show,Generic)
deriving newtype (Hashable)
instance Pretty (Peer PIPE) where
pretty (PeerPIPE p) = parens ("pipe" <+> viaShow p)
-- Messaging definition for protocol
instance Messaging MessagingPipe PIPE ByteString where
sendTo bus _ _ msg = liftIO do
LBS.hPutStr (pipeOut bus) (B.toLazyByteString frame <> msg)
hFlush (pipeOut bus)
where
frame = B.word32BE (fromIntegral $ LBS.length msg)
receive bus _ = do
msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q
for msg $ \m -> pure (From (PeerPIPE (PipeAddr who)), m)
where
q = inQ bus
who = pipeIn bus
runMessagingPipe :: MonadIO m => MessagingPipe -> m ()
runMessagingPipe bus = liftIO do
fix \next -> do
frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict
piece <- LBS.hGet who (fromIntegral frame)
atomically (writeTQueue (inQ bus) piece)
next
where
who = pipeIn bus
instance (MonadIO m, Messaging MessagingPipe PIPE (Encoded PIPE))
=> HasFabriq PIPE (ReaderT MessagingPipe m) where
getFabriq = asks Fabriq
instance MonadIO m => HasOwnPeer PIPE (ReaderT MessagingPipe m) where
ownPeer = asks localPeer

View File

@ -947,3 +947,51 @@ executable test-playground
, resourcet , resourcet
, text-icu >= 0.8.0.3 , text-icu >= 0.8.0.3
executable test-pipe-mess
import: shared-properties
default-language: Haskell2010
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: TestPipeMessaging.hs
build-depends:
base, hbs2-core
, async
, bytestring
, cache
, containers
, directory
, hashable
, microlens-platform
, mtl
, network-byte-order
, prettyprinter
, QuickCheck
, quickcheck-instances
, random
, safe
, serialise
, stm
, streaming
, tasty
, tasty-quickcheck
, tasty-hunit
, tasty-quickcheck
, transformers
, uniplate
, vector
, saltine
, simple-logger
, string-conversions
, filepath
, temporary
, unliftio
, unordered-containers
, unix
, timeit

View File

@ -60,9 +60,9 @@ type instance Output Method2 = ()
instance MonadIO m => HandleMethod m Method2 where instance MonadIO m => HandleMethod m Method2 where
handleMethod _ = pure () handleMethod _ = pure ()
instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m) -- instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m)
=> HasDeferred UNIX (ServiceProto api UNIX) m where -- => HasDeferred UNIX (ServiceProto api UNIX) m where
deferred _ m = void (async m) -- deferred m = void (async m)
main :: IO () main :: IO ()
main = do main = do

View File

@ -0,0 +1,114 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE NumericUnderscores #-}
module Main where
import HBS2.Prelude.Plated
import HBS2.Net.Messaging
import HBS2.Net.Messaging.Pipe
import HBS2.Net.Proto.Service
import HBS2.Actors.Peer
import HBS2.System.Logger.Simple.ANSI
import Data.ByteString.Lazy (ByteString)
import System.Posix.IO
import UnliftIO
import Control.Monad.Trans.Cont
import Control.Monad.Reader
import Codec.Serialise
import Data.Fixed
import System.TimeIt
-- protocol's data
data Ping =
Ping Int
| Pong Int
deriving stock (Eq,Show,Generic)
instance Pretty Ping where
pretty = viaShow
instance Serialise Ping
-- API definition
type MyServiceMethods1 = '[ Ping ]
-- API endpoint definition
type instance Input Ping = Ping
type instance Output Ping = Maybe Ping
-- API handler
instance MonadIO m => HandleMethod m Ping where
handleMethod = \case
Ping n -> pure (Just (Pong n))
Pong _ -> pure Nothing
-- Codec for protocol
instance HasProtocol PIPE (ServiceProto MyServiceMethods1 PIPE) where
type instance ProtocolId (ServiceProto MyServiceMethods1 PIPE) = 0xDEADF00D1
type instance Encoded PIPE = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
-- Some "deferred" implementation for our monad
-- note -- plain asyncs may cause to resource leak
instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE))
=> HasDeferred (ServiceProto api PIPE) PIPE m where
deferred m = void (async m)
mainLoop :: IO ()
mainLoop = do
flip runContT pure do
-- pipe for server
(i1,o1) <- liftIO $ createPipe
>>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o
-- pipe for client
(i2,o2) <- liftIO $ createPipe
>>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o
-- interwire client and server by pipes
server <- newMessagingPipe (i2,o1)
client <- newMessagingPipe (i1,o2)
-- run messaging workers
void $ ContT $ withAsync $ runMessagingPipe server
void $ ContT $ withAsync $ runMessagingPipe client
-- make server protocol responder
void $ ContT $ withAsync $ flip runReaderT server do
runProto @PIPE
[ makeResponse (makeServer @MyServiceMethods1)
]
-- make client's "caller"
caller <- lift $ makeServiceCaller @MyServiceMethods1 @PIPE (localPeer client)
-- make client's endpoint worker
void $ ContT $ withAsync $ runReaderT (runServiceClient caller) client
let n = 20_000
(a, _) <- timeItT do
for_ [1..n] $ \i -> do
void $ callService @Ping caller (Ping i)
debug $ "sent" <+> pretty n <+> "messages in" <+> pretty (realToFrac a :: Fixed E3) <> "sec"
<> line
<> "rps:" <+> pretty (realToFrac n / realToFrac a :: Fixed E2)
main :: IO ()
main = do
setLogging @DEBUG defLog
mainLoop
`finally` do
setLoggingOff @DEBUG