mirror of https://github.com/voidlizard/hbs2
service-protocol-proof-of-concept
This commit is contained in:
parent
902125da75
commit
3c545b01fa
|
@ -107,6 +107,7 @@ library
|
||||||
, HBS2.Net.Proto.BlockInfo
|
, HBS2.Net.Proto.BlockInfo
|
||||||
, HBS2.Net.Proto.Definition
|
, HBS2.Net.Proto.Definition
|
||||||
, HBS2.Net.Proto.Dialog
|
, HBS2.Net.Proto.Dialog
|
||||||
|
, HBS2.Net.Proto.Service
|
||||||
, HBS2.Net.Proto.EncryptionHandshake
|
, HBS2.Net.Proto.EncryptionHandshake
|
||||||
, HBS2.Net.Proto.Event.PeerExpired
|
, HBS2.Net.Proto.Event.PeerExpired
|
||||||
, HBS2.Net.Proto.Peer
|
, HBS2.Net.Proto.Peer
|
||||||
|
@ -241,7 +242,10 @@ test-suite test
|
||||||
, saltine
|
, saltine
|
||||||
, simple-logger
|
, simple-logger
|
||||||
, string-conversions
|
, string-conversions
|
||||||
|
, filepath
|
||||||
|
, temporary
|
||||||
|
, unliftio
|
||||||
|
, resourcet
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,227 @@
|
||||||
|
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||||
|
{-# LANGUAGE UndecidableInstances #-}
|
||||||
|
{-# LANGUAGE TypeOperators #-}
|
||||||
|
{-# LANGUAGE PolyKinds #-}
|
||||||
|
module HBS2.Net.Proto.Service where
|
||||||
|
|
||||||
|
import HBS2.Actors.Peer
|
||||||
|
import HBS2.Net.Messaging.Unix
|
||||||
|
import HBS2.Net.Proto
|
||||||
|
import HBS2.Prelude.Plated
|
||||||
|
|
||||||
|
import Codec.Serialise
|
||||||
|
import Control.Monad.Reader
|
||||||
|
import Control.Monad.Trans.Resource
|
||||||
|
import Data.ByteString.Lazy (ByteString)
|
||||||
|
import Data.Kind
|
||||||
|
import Data.List qualified as List
|
||||||
|
import GHC.TypeLits
|
||||||
|
import Lens.Micro.Platform
|
||||||
|
import UnliftIO.Async
|
||||||
|
import UnliftIO qualified as UIO
|
||||||
|
import UnliftIO (TVar,TQueue,atomically)
|
||||||
|
import System.Random (randomIO)
|
||||||
|
import Data.Word
|
||||||
|
import Data.HashMap.Strict (HashMap)
|
||||||
|
import Data.HashMap.Strict qualified as HashMap
|
||||||
|
|
||||||
|
class (Monad m, Serialise (Output a), Serialise (Input a)) => HandleMethod m a where
|
||||||
|
type family Input a :: Type
|
||||||
|
type family Output a :: Type
|
||||||
|
handleMethod :: Input a -> m (Output a)
|
||||||
|
|
||||||
|
type family AllHandlers m (xs :: [Type]) :: Constraint where
|
||||||
|
AllHandlers m '[] = ()
|
||||||
|
AllHandlers m (x ': xs) = (HandleMethod m x, AllHandlers m xs)
|
||||||
|
|
||||||
|
data SomeHandler m = forall a . HandleMethod m a => SomeHandler ( Input a -> m (Output a) )
|
||||||
|
|
||||||
|
class Monad m => EnumAll (xs :: [Type]) t m where
|
||||||
|
enumMethods :: [t]
|
||||||
|
|
||||||
|
instance (Monad m, HandleMethod m ()) => EnumAll '[] (Int, SomeHandler m) m where
|
||||||
|
enumMethods = [(0, SomeHandler @m @() (\_ -> pure ()))]
|
||||||
|
|
||||||
|
instance (Monad m, EnumAll xs (Int, SomeHandler m) m, HandleMethod m x) => EnumAll (x ': xs) (Int, SomeHandler m) m where
|
||||||
|
enumMethods = (0, wtf) : shift (enumMethods @xs @(Int, SomeHandler m) @m)
|
||||||
|
where
|
||||||
|
wtf = SomeHandler @m @x (handleMethod @m @x)
|
||||||
|
shift = map (\(i, h) -> (i + 1, h))
|
||||||
|
|
||||||
|
instance Monad m => HandleMethod m () where
|
||||||
|
type Input () = ()
|
||||||
|
type Output () = ()
|
||||||
|
handleMethod _ = pure ()
|
||||||
|
|
||||||
|
data ServiceError =
|
||||||
|
ErrorMethodNotFound
|
||||||
|
| ErrorInvalidRequest
|
||||||
|
| ErrorInvalidResponse
|
||||||
|
deriving stock (Eq,Ord,Generic,Show,Typeable)
|
||||||
|
|
||||||
|
instance Serialise ServiceError
|
||||||
|
|
||||||
|
data ServiceProto api e =
|
||||||
|
ServiceRequest { reqNum :: Word64, reqData :: ByteString }
|
||||||
|
| ServiceResponse { reqNum :: Word64, reqResp :: Either ServiceError ByteString }
|
||||||
|
deriving stock (Generic,Show)
|
||||||
|
|
||||||
|
instance Serialise (ServiceProto api e)
|
||||||
|
|
||||||
|
dispatch :: forall api e m . ( EnumAll api (Int, SomeHandler m) m
|
||||||
|
, MonadIO m
|
||||||
|
, HasProtocol e (ServiceProto api e)
|
||||||
|
)
|
||||||
|
=> ServiceProto api e
|
||||||
|
-> m (ServiceProto api e)
|
||||||
|
|
||||||
|
dispatch (ServiceResponse n _) = do
|
||||||
|
pure $ ServiceResponse n (Left ErrorInvalidRequest)
|
||||||
|
|
||||||
|
dispatch (ServiceRequest rn lbs) = do
|
||||||
|
let ha = enumMethods @api @(Int, SomeHandler m) @m
|
||||||
|
let (n, bss) = deserialise @(Int, ByteString) lbs
|
||||||
|
maybe1 (List.lookup n ha) methodNotFound $ \(SomeHandler fn) -> do
|
||||||
|
case deserialiseOrFail bss of
|
||||||
|
Left{} -> pure $ ServiceResponse rn (Left ErrorInvalidRequest)
|
||||||
|
Right v -> ServiceResponse rn . Right . serialise <$> fn v
|
||||||
|
where
|
||||||
|
methodNotFound = pure (ServiceResponse rn (Left ErrorMethodNotFound))
|
||||||
|
|
||||||
|
type family FindMethodIndex (n :: Nat) (x :: Type) (xs :: [Type]) :: Maybe Nat where
|
||||||
|
FindMethodIndex _ x '[] = 'Nothing
|
||||||
|
FindMethodIndex n x (x ': xs) = 'Just n
|
||||||
|
FindMethodIndex n x (y ': xs) = FindMethodIndex (n + 1) x xs
|
||||||
|
|
||||||
|
type family FromJust (a :: Maybe k) :: k where
|
||||||
|
FromJust ('Just a) = a
|
||||||
|
FromJust 'Nothing = TypeError ('Text "Element not found")
|
||||||
|
|
||||||
|
findMethodIndex :: forall x xs. KnownNat (FromJust (FindMethodIndex 0 x xs)) => Integer
|
||||||
|
findMethodIndex = natVal (Proxy :: Proxy (FromJust (FindMethodIndex 0 x xs)))
|
||||||
|
|
||||||
|
|
||||||
|
makeRequest :: forall api method e . ( KnownNat (FromJust (FindMethodIndex 0 method api))
|
||||||
|
, Serialise (Input method)
|
||||||
|
)
|
||||||
|
=> Word64 -> Input method -> ServiceProto api e
|
||||||
|
makeRequest rnum input = ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input))
|
||||||
|
where
|
||||||
|
idx = findMethodIndex @method @api
|
||||||
|
|
||||||
|
makeRequestR :: forall api method e m . ( KnownNat (FromJust (FindMethodIndex 0 method api))
|
||||||
|
, Serialise (Input method)
|
||||||
|
, MonadIO m
|
||||||
|
)
|
||||||
|
=> Input method -> m (ServiceProto api e)
|
||||||
|
makeRequestR input = do
|
||||||
|
rnum <- liftIO $ randomIO
|
||||||
|
pure $ ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input))
|
||||||
|
where
|
||||||
|
idx = findMethodIndex @method @api
|
||||||
|
|
||||||
|
|
||||||
|
makeServer :: forall api e m . ( MonadIO m
|
||||||
|
, EnumAll api (Int, SomeHandler m) m
|
||||||
|
, Response e (ServiceProto api e) m
|
||||||
|
, HasProtocol e (ServiceProto api e)
|
||||||
|
, Pretty (Peer e)
|
||||||
|
)
|
||||||
|
=> ServiceProto api e
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
makeServer msg = dispatch @api @e msg >>= response
|
||||||
|
|
||||||
|
data ServiceCaller api e =
|
||||||
|
ServiceCaller
|
||||||
|
{ callPeer :: Peer e
|
||||||
|
, callInQ :: TQueue (ServiceProto api e)
|
||||||
|
, callWaiters :: TVar (HashMap Word64 (TQueue (ServiceProto api e)))
|
||||||
|
}
|
||||||
|
|
||||||
|
makeServiceCaller :: forall api e m . MonadIO m => Peer e -> m (ServiceCaller api e)
|
||||||
|
makeServiceCaller p = ServiceCaller p <$> UIO.newTQueueIO
|
||||||
|
<*> UIO.newTVarIO mempty
|
||||||
|
|
||||||
|
runServiceClient :: forall api e m . ( MonadIO m
|
||||||
|
, MonadUnliftIO m
|
||||||
|
, HasProtocol e (ServiceProto api e)
|
||||||
|
-- FIXME: remove-this-debug-shit
|
||||||
|
, Show (Peer e)
|
||||||
|
, Pretty (Peer e)
|
||||||
|
, PeerMessaging e
|
||||||
|
, HasOwnPeer e m
|
||||||
|
, HasFabriq e m
|
||||||
|
, HasTimeLimits e (ServiceProto api e) m
|
||||||
|
)
|
||||||
|
=> ServiceCaller api e
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
runServiceClient caller = do
|
||||||
|
proto <- async $ runProto @e [ makeResponse (makeClient @api caller) ]
|
||||||
|
link proto
|
||||||
|
forever do
|
||||||
|
req <- getRequest caller
|
||||||
|
request @e (callPeer caller) req
|
||||||
|
|
||||||
|
wait proto
|
||||||
|
|
||||||
|
notifyServiceCaller :: forall api e m . MonadIO m
|
||||||
|
=> ServiceCaller api e
|
||||||
|
-> ServiceProto api e
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
notifyServiceCaller caller msg = do
|
||||||
|
waiter <- UIO.readTVarIO (callWaiters caller) <&> HashMap.lookup (reqNum msg)
|
||||||
|
maybe1 waiter none $ \q -> atomically $ UIO.writeTQueue q msg
|
||||||
|
|
||||||
|
|
||||||
|
getRequest :: forall api e m . MonadIO m
|
||||||
|
=> ServiceCaller api e
|
||||||
|
-> m (ServiceProto api e)
|
||||||
|
|
||||||
|
getRequest caller = atomically $ UIO.readTQueue (callInQ caller)
|
||||||
|
|
||||||
|
callService :: forall method api e m . ( MonadIO m
|
||||||
|
, HasProtocol e (ServiceProto api e)
|
||||||
|
, KnownNat (FromJust (FindMethodIndex 0 method api))
|
||||||
|
, Serialise (Input method)
|
||||||
|
, Serialise (Output method)
|
||||||
|
)
|
||||||
|
=> ServiceCaller api e
|
||||||
|
-> Input method
|
||||||
|
-> m (Either ServiceError (Output method))
|
||||||
|
|
||||||
|
callService caller input = do
|
||||||
|
req <- makeRequestR @api @method @e @m input
|
||||||
|
|
||||||
|
resp <- UIO.newTQueueIO
|
||||||
|
|
||||||
|
atomically $ do
|
||||||
|
UIO.modifyTVar (callWaiters caller) (HashMap.insert (reqNum req) resp)
|
||||||
|
UIO.writeTQueue (callInQ caller) req
|
||||||
|
|
||||||
|
msg <- atomically $ UIO.readTQueue resp
|
||||||
|
|
||||||
|
case msg of
|
||||||
|
ServiceResponse _ (Right bs) -> do
|
||||||
|
case deserialiseOrFail @(Output method) bs of
|
||||||
|
Left _ -> pure (Left ErrorInvalidResponse)
|
||||||
|
Right x -> pure (Right x)
|
||||||
|
|
||||||
|
|
||||||
|
ServiceResponse _ (Left wtf) -> pure (Left wtf)
|
||||||
|
|
||||||
|
_ -> pure (Left ErrorInvalidResponse)
|
||||||
|
|
||||||
|
|
||||||
|
makeClient :: forall api e m . ( MonadIO m
|
||||||
|
, HasProtocol e (ServiceProto api e)
|
||||||
|
, Pretty (Peer e)
|
||||||
|
)
|
||||||
|
=> ServiceCaller api e
|
||||||
|
-> ServiceProto api e
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
makeClient = notifyServiceCaller
|
||||||
|
|
|
@ -3,8 +3,9 @@ module Main where
|
||||||
import TestFakeMessaging
|
import TestFakeMessaging
|
||||||
import TestActors
|
import TestActors
|
||||||
import DialogSpec
|
import DialogSpec
|
||||||
|
import PrototypeGenericService
|
||||||
-- import TestUniqProtoId
|
-- import TestUniqProtoId
|
||||||
import TestCrypto
|
-- import TestCrypto
|
||||||
|
|
||||||
import Test.Tasty
|
import Test.Tasty
|
||||||
import Test.Tasty.HUnit
|
import Test.Tasty.HUnit
|
||||||
|
@ -16,9 +17,8 @@ main =
|
||||||
[
|
[
|
||||||
testCase "testFakeMessaging1" testFakeMessaging1
|
testCase "testFakeMessaging1" testFakeMessaging1
|
||||||
, testCase "testActorsBasic" testActorsBasic
|
, testCase "testActorsBasic" testActorsBasic
|
||||||
-- , testCase "testUniqProtoId" testUniqProtoId
|
|
||||||
, testCrypto
|
|
||||||
, testDialog
|
, testDialog
|
||||||
|
, testCase "protoGenericService" protoGenericService
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -263,6 +263,55 @@ executable test-unix
|
||||||
, vector
|
, vector
|
||||||
|
|
||||||
|
|
||||||
|
executable test-proto-service
|
||||||
|
import: shared-properties
|
||||||
|
import: common-deps
|
||||||
|
default-language: Haskell2010
|
||||||
|
|
||||||
|
ghc-options:
|
||||||
|
-- -prof
|
||||||
|
-- -fprof-auto
|
||||||
|
|
||||||
|
other-modules:
|
||||||
|
|
||||||
|
-- other-extensions:
|
||||||
|
|
||||||
|
-- type: exitcode-stdio-1.0
|
||||||
|
hs-source-dirs: test
|
||||||
|
main-is: PrototypeGenericService.hs
|
||||||
|
|
||||||
|
build-depends:
|
||||||
|
base, hbs2-core, hbs2-storage-simple
|
||||||
|
, async
|
||||||
|
, attoparsec
|
||||||
|
, bytestring
|
||||||
|
, cache
|
||||||
|
, clock
|
||||||
|
, containers
|
||||||
|
, data-default
|
||||||
|
, data-textual
|
||||||
|
, directory
|
||||||
|
, hashable
|
||||||
|
, microlens-platform
|
||||||
|
, mtl
|
||||||
|
, mwc-random
|
||||||
|
, network
|
||||||
|
, network-ip
|
||||||
|
, prettyprinter
|
||||||
|
, QuickCheck
|
||||||
|
, random
|
||||||
|
, safe
|
||||||
|
, serialise
|
||||||
|
, stm
|
||||||
|
, streaming
|
||||||
|
, tasty
|
||||||
|
, tasty-hunit
|
||||||
|
, text
|
||||||
|
, transformers
|
||||||
|
, uniplate
|
||||||
|
, vector
|
||||||
|
|
||||||
|
|
||||||
test-suite test-tcp
|
test-suite test-tcp
|
||||||
import: shared-properties
|
import: shared-properties
|
||||||
import: common-deps
|
import: common-deps
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||||
|
{-# LANGUAGE UndecidableInstances #-}
|
||||||
|
{-# LANGUAGE PolyKinds #-}
|
||||||
|
|
||||||
|
module Main where
|
||||||
|
|
||||||
|
import HBS2.Actors.Peer
|
||||||
|
import HBS2.Clock
|
||||||
|
import HBS2.Net.Messaging.Unix
|
||||||
|
import HBS2.Net.Proto
|
||||||
|
import HBS2.Prelude.Plated
|
||||||
|
import HBS2.Net.Proto.Service
|
||||||
|
|
||||||
|
import HBS2.System.Logger.Simple
|
||||||
|
|
||||||
|
import Codec.Serialise
|
||||||
|
import Control.Monad.Reader
|
||||||
|
import Data.ByteString.Lazy (ByteString)
|
||||||
|
import System.FilePath.Posix
|
||||||
|
import System.IO
|
||||||
|
import System.IO.Temp
|
||||||
|
import UnliftIO.Async
|
||||||
|
import Data.List
|
||||||
|
|
||||||
|
import Test.Tasty.HUnit
|
||||||
|
|
||||||
|
data Method1
|
||||||
|
data Method2
|
||||||
|
|
||||||
|
type MyServiceMethods1 = '[ Method1, Method2 ]
|
||||||
|
|
||||||
|
instance HasProtocol UNIX (ServiceProto MyServiceMethods1 UNIX) where
|
||||||
|
type instance ProtocolId (ServiceProto MyServiceMethods1 UNIX) = 1
|
||||||
|
type instance Encoded UNIX = ByteString
|
||||||
|
decode = either (const Nothing) Just . deserialiseOrFail
|
||||||
|
encode = serialise
|
||||||
|
|
||||||
|
instance MonadIO m => HandleMethod m Method1 where
|
||||||
|
type instance Input Method1 = String
|
||||||
|
type instance Output Method1 = String
|
||||||
|
handleMethod n = do
|
||||||
|
debug $ "SERVICE1. METHOD1" <+> pretty n
|
||||||
|
case n of
|
||||||
|
"JOPA" -> pure "KITA"
|
||||||
|
"PECHEN" -> pure "TRESKI"
|
||||||
|
_ -> pure "X3"
|
||||||
|
|
||||||
|
instance MonadIO m => HandleMethod m Method2 where
|
||||||
|
type instance Input Method2 = ()
|
||||||
|
type instance Output Method2 = ()
|
||||||
|
handleMethod _ = pure ()
|
||||||
|
|
||||||
|
|
||||||
|
instance Monad m => HasFabriq UNIX (ReaderT MessagingUnix m) where
|
||||||
|
getFabriq = asks Fabriq
|
||||||
|
|
||||||
|
instance Monad m => HasOwnPeer UNIX (ReaderT MessagingUnix m) where
|
||||||
|
ownPeer = asks msgUnixSelf
|
||||||
|
|
||||||
|
instance HasProtocol e (ServiceProto api e) => HasTimeLimits e (ServiceProto api e) IO where
|
||||||
|
tryLockForPeriod _ _ = pure True
|
||||||
|
|
||||||
|
|
||||||
|
main :: IO ()
|
||||||
|
main = do
|
||||||
|
|
||||||
|
setLogging @DEBUG (logPrefix "[debug] ")
|
||||||
|
setLogging @INFO (logPrefix "")
|
||||||
|
setLogging @ERROR (logPrefix "[err] ")
|
||||||
|
setLogging @WARN (logPrefix "[warn] ")
|
||||||
|
setLogging @NOTICE (logPrefix "[notice] ")
|
||||||
|
setLogging @TRACE (logPrefix "[trace] ")
|
||||||
|
|
||||||
|
liftIO $ hSetBuffering stdout LineBuffering
|
||||||
|
liftIO $ hSetBuffering stderr LineBuffering
|
||||||
|
|
||||||
|
withSystemTempDirectory "test-unix-socket" $ \tmp -> do
|
||||||
|
|
||||||
|
let soname = tmp </> "unix.socket"
|
||||||
|
|
||||||
|
server <- newMessagingUnixOpts [MUFork] True 1.0 soname
|
||||||
|
client1 <- newMessagingUnix False 1.0 soname
|
||||||
|
|
||||||
|
m1 <- async $ runMessagingUnix server
|
||||||
|
|
||||||
|
pause @'Seconds 0.10
|
||||||
|
|
||||||
|
m2 <- async $ runMessagingUnix client1
|
||||||
|
|
||||||
|
p1 <- async $ flip runReaderT server do
|
||||||
|
runProto @UNIX
|
||||||
|
[ makeResponse (makeServer @MyServiceMethods1)
|
||||||
|
]
|
||||||
|
|
||||||
|
caller <- makeServiceCaller @MyServiceMethods1 @UNIX (msgUnixSelf server)
|
||||||
|
|
||||||
|
p2 <- async $ runReaderT (runServiceClient caller) client1
|
||||||
|
|
||||||
|
link p1
|
||||||
|
link p2
|
||||||
|
|
||||||
|
results <- forConcurrently ["JOPA", "PECHEN", "WTF?"] $ \r -> do
|
||||||
|
answ <- callService @Method1 caller r
|
||||||
|
pure (r, answ)
|
||||||
|
|
||||||
|
debug $ "GOT RESPONSES (Method1): " <+> viaShow results
|
||||||
|
|
||||||
|
assertBool "assert1" (sortOn fst results == [("JOPA",Right "KITA"),("PECHEN",Right "TRESKI"),("WTF?",Right "X3")] )
|
||||||
|
|
||||||
|
r2 <- callService @Method2 caller ()
|
||||||
|
|
||||||
|
debug $ "GOT RESPONSE (Method2): " <+> viaShow r2
|
||||||
|
|
||||||
|
assertBool "assert2" (r2 == Right ())
|
||||||
|
|
||||||
|
cancel p1
|
||||||
|
pause @'Seconds 0.10
|
||||||
|
|
||||||
|
waitAnyCatchCancel [p1,p2,m1,m2]
|
||||||
|
|
||||||
|
setLoggingOff @DEBUG
|
||||||
|
setLoggingOff @INFO
|
||||||
|
setLoggingOff @ERROR
|
||||||
|
setLoggingOff @WARN
|
||||||
|
setLoggingOff @NOTICE
|
||||||
|
setLoggingOff @TRACE
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue