mirror of https://github.com/voidlizard/hbs2
wip, messaging+pipes
This commit is contained in:
parent
75ec2b25a4
commit
989892dcaa
|
@ -105,6 +105,7 @@ library
|
|||
, HBS2.Net.Messaging.UDP
|
||||
, HBS2.Net.Messaging.TCP
|
||||
, HBS2.Net.Messaging.Unix
|
||||
, HBS2.Net.Messaging.Pipe
|
||||
, HBS2.Net.Messaging.Stream
|
||||
, HBS2.Net.Messaging.Encrypted.RandomPrefix
|
||||
, HBS2.Net.Messaging.Encrypted.ByPass
|
||||
|
@ -196,6 +197,7 @@ library
|
|||
, time
|
||||
, transformers
|
||||
, uniplate
|
||||
, unix
|
||||
, unordered-containers
|
||||
, unliftio
|
||||
, unliftio-core
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
{-# LANGUAGE PolyKinds #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
module HBS2.Net.Messaging.Pipe where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Net.Proto.Types
|
||||
import HBS2.Actors.Peer.Types
|
||||
import HBS2.Net.Messaging
|
||||
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Control.Monad.Reader
|
||||
import Data.ByteString.Builder qualified as B
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Hashable
|
||||
import Network.ByteOrder hiding (ByteString)
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
import System.Posix.IO
|
||||
import UnliftIO
|
||||
|
||||
-- define new transport protocol type
|
||||
data PIPE = PIPE
|
||||
deriving (Eq,Ord,Show,Generic)
|
||||
|
||||
|
||||
-- address for the new protocol
|
||||
newtype PipeAddr = PipeAddr Handle
|
||||
deriving newtype (Eq,Show)
|
||||
|
||||
-- the protocol work data
|
||||
data MessagingPipe =
|
||||
MessagingPipe
|
||||
{ pipeIn :: Handle
|
||||
, pipeOut :: Handle
|
||||
, inQ :: TQueue ByteString
|
||||
}
|
||||
|
||||
remotePeer :: MessagingPipe -> Peer PIPE
|
||||
remotePeer = PeerPIPE . PipeAddr . pipeOut
|
||||
|
||||
localPeer :: MessagingPipe -> Peer PIPE
|
||||
localPeer = PeerPIPE . PipeAddr . pipeIn
|
||||
|
||||
newMessagingPipe :: MonadIO m => (Handle, Handle) -> m MessagingPipe
|
||||
newMessagingPipe (pIn,pOut) = do
|
||||
MessagingPipe pIn pOut
|
||||
<$> newTQueueIO
|
||||
|
||||
instance Hashable PipeAddr where
|
||||
hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd)
|
||||
where
|
||||
fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word)
|
||||
|
||||
instance HasPeer PIPE where
|
||||
newtype instance Peer PIPE = PeerPIPE { _fromPeerPipe :: PipeAddr }
|
||||
deriving stock (Eq,Show,Generic)
|
||||
deriving newtype (Hashable)
|
||||
|
||||
instance Pretty (Peer PIPE) where
|
||||
pretty (PeerPIPE p) = parens ("pipe" <+> viaShow p)
|
||||
|
||||
-- Messaging definition for protocol
|
||||
instance Messaging MessagingPipe PIPE ByteString where
|
||||
|
||||
sendTo bus _ _ msg = liftIO do
|
||||
LBS.hPutStr (pipeOut bus) (B.toLazyByteString frame <> msg)
|
||||
hFlush (pipeOut bus)
|
||||
|
||||
where
|
||||
frame = B.word32BE (fromIntegral $ LBS.length msg)
|
||||
|
||||
receive bus _ = do
|
||||
msg <- liftIO $ atomically $ peekTQueue q >> STM.flushTQueue q
|
||||
for msg $ \m -> pure (From (PeerPIPE (PipeAddr who)), m)
|
||||
|
||||
where
|
||||
q = inQ bus
|
||||
who = pipeIn bus
|
||||
|
||||
runMessagingPipe :: MonadIO m => MessagingPipe -> m ()
|
||||
runMessagingPipe bus = liftIO do
|
||||
fix \next -> do
|
||||
frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict
|
||||
piece <- LBS.hGet who (fromIntegral frame)
|
||||
atomically (writeTQueue (inQ bus) piece)
|
||||
next
|
||||
|
||||
where
|
||||
who = pipeIn bus
|
||||
|
||||
instance (MonadIO m, Messaging MessagingPipe PIPE (Encoded PIPE))
|
||||
=> HasFabriq PIPE (ReaderT MessagingPipe m) where
|
||||
getFabriq = asks Fabriq
|
||||
|
||||
instance MonadIO m => HasOwnPeer PIPE (ReaderT MessagingPipe m) where
|
||||
ownPeer = asks localPeer
|
||||
|
||||
|
|
@ -947,3 +947,51 @@ executable test-playground
|
|||
, resourcet
|
||||
, text-icu >= 0.8.0.3
|
||||
|
||||
|
||||
executable test-pipe-mess
|
||||
import: shared-properties
|
||||
default-language: Haskell2010
|
||||
|
||||
-- other-extensions:
|
||||
|
||||
type: exitcode-stdio-1.0
|
||||
hs-source-dirs: test
|
||||
main-is: TestPipeMessaging.hs
|
||||
build-depends:
|
||||
base, hbs2-core
|
||||
, async
|
||||
, bytestring
|
||||
, cache
|
||||
, containers
|
||||
, directory
|
||||
, hashable
|
||||
, microlens-platform
|
||||
, mtl
|
||||
, network-byte-order
|
||||
, prettyprinter
|
||||
, QuickCheck
|
||||
, quickcheck-instances
|
||||
, random
|
||||
, safe
|
||||
, serialise
|
||||
, stm
|
||||
, streaming
|
||||
, tasty
|
||||
, tasty-quickcheck
|
||||
, tasty-hunit
|
||||
, tasty-quickcheck
|
||||
, transformers
|
||||
, uniplate
|
||||
, vector
|
||||
, saltine
|
||||
, simple-logger
|
||||
, string-conversions
|
||||
, filepath
|
||||
, temporary
|
||||
, unliftio
|
||||
, unordered-containers
|
||||
, unix
|
||||
, timeit
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -60,9 +60,9 @@ type instance Output Method2 = ()
|
|||
instance MonadIO m => HandleMethod m Method2 where
|
||||
handleMethod _ = pure ()
|
||||
|
||||
instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m)
|
||||
=> HasDeferred UNIX (ServiceProto api UNIX) m where
|
||||
deferred _ m = void (async m)
|
||||
-- instance (HasProtocol UNIX (ServiceProto api UNIX), MonadUnliftIO m)
|
||||
-- => HasDeferred UNIX (ServiceProto api UNIX) m where
|
||||
-- deferred m = void (async m)
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
{-# LANGUAGE PolyKinds #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
module Main where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
|
||||
import HBS2.Net.Messaging
|
||||
import HBS2.Net.Messaging.Pipe
|
||||
import HBS2.Net.Proto.Service
|
||||
import HBS2.Actors.Peer
|
||||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
import System.Posix.IO
|
||||
import UnliftIO
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Reader
|
||||
import Codec.Serialise
|
||||
import Data.Fixed
|
||||
|
||||
import System.TimeIt
|
||||
|
||||
-- protocol's data
|
||||
data Ping =
|
||||
Ping Int
|
||||
| Pong Int
|
||||
deriving stock (Eq,Show,Generic)
|
||||
|
||||
instance Pretty Ping where
|
||||
pretty = viaShow
|
||||
|
||||
instance Serialise Ping
|
||||
|
||||
-- API definition
|
||||
type MyServiceMethods1 = '[ Ping ]
|
||||
|
||||
-- API endpoint definition
|
||||
type instance Input Ping = Ping
|
||||
type instance Output Ping = Maybe Ping
|
||||
|
||||
-- API handler
|
||||
instance MonadIO m => HandleMethod m Ping where
|
||||
handleMethod = \case
|
||||
Ping n -> pure (Just (Pong n))
|
||||
Pong _ -> pure Nothing
|
||||
|
||||
-- Codec for protocol
|
||||
instance HasProtocol PIPE (ServiceProto MyServiceMethods1 PIPE) where
|
||||
type instance ProtocolId (ServiceProto MyServiceMethods1 PIPE) = 0xDEADF00D1
|
||||
type instance Encoded PIPE = ByteString
|
||||
decode = either (const Nothing) Just . deserialiseOrFail
|
||||
encode = serialise
|
||||
|
||||
-- Some "deferred" implementation for our monad
|
||||
-- note -- plain asyncs may cause to resource leak
|
||||
instance (MonadUnliftIO m, HasProtocol PIPE (ServiceProto api PIPE))
|
||||
=> HasDeferred (ServiceProto api PIPE) PIPE m where
|
||||
deferred m = void (async m)
|
||||
|
||||
mainLoop :: IO ()
|
||||
mainLoop = do
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
-- pipe for server
|
||||
(i1,o1) <- liftIO $ createPipe
|
||||
>>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o
|
||||
|
||||
-- pipe for client
|
||||
(i2,o2) <- liftIO $ createPipe
|
||||
>>= \(i,o) -> (,) <$> fdToHandle i <*> fdToHandle o
|
||||
|
||||
-- interwire client and server by pipes
|
||||
server <- newMessagingPipe (i2,o1)
|
||||
client <- newMessagingPipe (i1,o2)
|
||||
|
||||
-- run messaging workers
|
||||
void $ ContT $ withAsync $ runMessagingPipe server
|
||||
void $ ContT $ withAsync $ runMessagingPipe client
|
||||
|
||||
-- make server protocol responder
|
||||
void $ ContT $ withAsync $ flip runReaderT server do
|
||||
runProto @PIPE
|
||||
[ makeResponse (makeServer @MyServiceMethods1)
|
||||
]
|
||||
|
||||
-- make client's "caller"
|
||||
caller <- lift $ makeServiceCaller @MyServiceMethods1 @PIPE (localPeer client)
|
||||
|
||||
-- make client's endpoint worker
|
||||
void $ ContT $ withAsync $ runReaderT (runServiceClient caller) client
|
||||
|
||||
let n = 20_000
|
||||
|
||||
(a, _) <- timeItT do
|
||||
for_ [1..n] $ \i -> do
|
||||
void $ callService @Ping caller (Ping i)
|
||||
|
||||
debug $ "sent" <+> pretty n <+> "messages in" <+> pretty (realToFrac a :: Fixed E3) <> "sec"
|
||||
<> line
|
||||
<> "rps:" <+> pretty (realToFrac n / realToFrac a :: Fixed E2)
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
||||
setLogging @DEBUG defLog
|
||||
mainLoop
|
||||
`finally` do
|
||||
setLoggingOff @DEBUG
|
||||
|
||||
|
Loading…
Reference in New Issue