From 3c545b01fad030dcebdc1d5ae23743eddb7daa9f Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sun, 1 Oct 2023 10:56:11 +0300 Subject: [PATCH] service-protocol-proof-of-concept --- hbs2-core/hbs2-core.cabal | 6 +- hbs2-core/lib/HBS2/Net/Proto/Service.hs | 227 +++++++++++++++++++++ hbs2-core/test/Main.hs | 6 +- hbs2-tests/hbs2-tests.cabal | 49 +++++ hbs2-tests/test/PrototypeGenericService.hs | 128 ++++++++++++ 5 files changed, 412 insertions(+), 4 deletions(-) create mode 100644 hbs2-core/lib/HBS2/Net/Proto/Service.hs create mode 100644 hbs2-tests/test/PrototypeGenericService.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 96d43cd5..af08fc2b 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -107,6 +107,7 @@ library , HBS2.Net.Proto.BlockInfo , HBS2.Net.Proto.Definition , HBS2.Net.Proto.Dialog + , HBS2.Net.Proto.Service , HBS2.Net.Proto.EncryptionHandshake , HBS2.Net.Proto.Event.PeerExpired , HBS2.Net.Proto.Peer @@ -241,7 +242,10 @@ test-suite test , saltine , simple-logger , string-conversions - + , filepath + , temporary + , unliftio + , resourcet diff --git a/hbs2-core/lib/HBS2/Net/Proto/Service.hs b/hbs2-core/lib/HBS2/Net/Proto/Service.hs new file mode 100644 index 00000000..eb04ba03 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/Service.hs @@ -0,0 +1,227 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE PolyKinds #-} +module HBS2.Net.Proto.Service where + +import HBS2.Actors.Peer +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto +import HBS2.Prelude.Plated + +import Codec.Serialise +import Control.Monad.Reader +import Control.Monad.Trans.Resource +import Data.ByteString.Lazy (ByteString) +import Data.Kind +import Data.List qualified as List +import GHC.TypeLits +import Lens.Micro.Platform +import UnliftIO.Async +import UnliftIO qualified as UIO +import UnliftIO (TVar,TQueue,atomically) +import System.Random (randomIO) +import Data.Word +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap + +class (Monad m, Serialise (Output a), Serialise (Input a)) => HandleMethod m a where + type family Input a :: Type + type family Output a :: Type + handleMethod :: Input a -> m (Output a) + +type family AllHandlers m (xs :: [Type]) :: Constraint where + AllHandlers m '[] = () + AllHandlers m (x ': xs) = (HandleMethod m x, AllHandlers m xs) + +data SomeHandler m = forall a . HandleMethod m a => SomeHandler ( Input a -> m (Output a) ) + +class Monad m => EnumAll (xs :: [Type]) t m where + enumMethods :: [t] + +instance (Monad m, HandleMethod m ()) => EnumAll '[] (Int, SomeHandler m) m where + enumMethods = [(0, SomeHandler @m @() (\_ -> pure ()))] + +instance (Monad m, EnumAll xs (Int, SomeHandler m) m, HandleMethod m x) => EnumAll (x ': xs) (Int, SomeHandler m) m where + enumMethods = (0, wtf) : shift (enumMethods @xs @(Int, SomeHandler m) @m) + where + wtf = SomeHandler @m @x (handleMethod @m @x) + shift = map (\(i, h) -> (i + 1, h)) + +instance Monad m => HandleMethod m () where + type Input () = () + type Output () = () + handleMethod _ = pure () + +data ServiceError = + ErrorMethodNotFound + | ErrorInvalidRequest + | ErrorInvalidResponse + deriving stock (Eq,Ord,Generic,Show,Typeable) + +instance Serialise ServiceError + +data ServiceProto api e = + ServiceRequest { reqNum :: Word64, reqData :: ByteString } + | ServiceResponse { reqNum :: Word64, reqResp :: Either ServiceError ByteString } + deriving stock (Generic,Show) + +instance Serialise (ServiceProto api e) + +dispatch :: forall api e m . ( EnumAll api (Int, SomeHandler m) m + , MonadIO m + , HasProtocol e (ServiceProto api e) + ) + => ServiceProto api e + -> m (ServiceProto api e) + +dispatch (ServiceResponse n _) = do + pure $ ServiceResponse n (Left ErrorInvalidRequest) + +dispatch (ServiceRequest rn lbs) = do + let ha = enumMethods @api @(Int, SomeHandler m) @m + let (n, bss) = deserialise @(Int, ByteString) lbs + maybe1 (List.lookup n ha) methodNotFound $ \(SomeHandler fn) -> do + case deserialiseOrFail bss of + Left{} -> pure $ ServiceResponse rn (Left ErrorInvalidRequest) + Right v -> ServiceResponse rn . Right . serialise <$> fn v + where + methodNotFound = pure (ServiceResponse rn (Left ErrorMethodNotFound)) + +type family FindMethodIndex (n :: Nat) (x :: Type) (xs :: [Type]) :: Maybe Nat where + FindMethodIndex _ x '[] = 'Nothing + FindMethodIndex n x (x ': xs) = 'Just n + FindMethodIndex n x (y ': xs) = FindMethodIndex (n + 1) x xs + +type family FromJust (a :: Maybe k) :: k where + FromJust ('Just a) = a + FromJust 'Nothing = TypeError ('Text "Element not found") + +findMethodIndex :: forall x xs. KnownNat (FromJust (FindMethodIndex 0 x xs)) => Integer +findMethodIndex = natVal (Proxy :: Proxy (FromJust (FindMethodIndex 0 x xs))) + + +makeRequest :: forall api method e . ( KnownNat (FromJust (FindMethodIndex 0 method api)) + , Serialise (Input method) + ) + => Word64 -> Input method -> ServiceProto api e +makeRequest rnum input = ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input)) + where + idx = findMethodIndex @method @api + +makeRequestR :: forall api method e m . ( KnownNat (FromJust (FindMethodIndex 0 method api)) + , Serialise (Input method) + , MonadIO m + ) + => Input method -> m (ServiceProto api e) +makeRequestR input = do + rnum <- liftIO $ randomIO + pure $ ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input)) + where + idx = findMethodIndex @method @api + + +makeServer :: forall api e m . ( MonadIO m + , EnumAll api (Int, SomeHandler m) m + , Response e (ServiceProto api e) m + , HasProtocol e (ServiceProto api e) + , Pretty (Peer e) + ) + => ServiceProto api e + -> m () + +makeServer msg = dispatch @api @e msg >>= response + +data ServiceCaller api e = + ServiceCaller + { callPeer :: Peer e + , callInQ :: TQueue (ServiceProto api e) + , callWaiters :: TVar (HashMap Word64 (TQueue (ServiceProto api e))) + } + +makeServiceCaller :: forall api e m . MonadIO m => Peer e -> m (ServiceCaller api e) +makeServiceCaller p = ServiceCaller p <$> UIO.newTQueueIO + <*> UIO.newTVarIO mempty + +runServiceClient :: forall api e m . ( MonadIO m + , MonadUnliftIO m + , HasProtocol e (ServiceProto api e) + -- FIXME: remove-this-debug-shit + , Show (Peer e) + , Pretty (Peer e) + , PeerMessaging e + , HasOwnPeer e m + , HasFabriq e m + , HasTimeLimits e (ServiceProto api e) m + ) + => ServiceCaller api e + -> m () + +runServiceClient caller = do + proto <- async $ runProto @e [ makeResponse (makeClient @api caller) ] + link proto + forever do + req <- getRequest caller + request @e (callPeer caller) req + + wait proto + +notifyServiceCaller :: forall api e m . MonadIO m + => ServiceCaller api e + -> ServiceProto api e + -> m () + +notifyServiceCaller caller msg = do + waiter <- UIO.readTVarIO (callWaiters caller) <&> HashMap.lookup (reqNum msg) + maybe1 waiter none $ \q -> atomically $ UIO.writeTQueue q msg + + +getRequest :: forall api e m . MonadIO m + => ServiceCaller api e + -> m (ServiceProto api e) + +getRequest caller = atomically $ UIO.readTQueue (callInQ caller) + +callService :: forall method api e m . ( MonadIO m + , HasProtocol e (ServiceProto api e) + , KnownNat (FromJust (FindMethodIndex 0 method api)) + , Serialise (Input method) + , Serialise (Output method) + ) + => ServiceCaller api e + -> Input method + -> m (Either ServiceError (Output method)) + +callService caller input = do + req <- makeRequestR @api @method @e @m input + + resp <- UIO.newTQueueIO + + atomically $ do + UIO.modifyTVar (callWaiters caller) (HashMap.insert (reqNum req) resp) + UIO.writeTQueue (callInQ caller) req + + msg <- atomically $ UIO.readTQueue resp + + case msg of + ServiceResponse _ (Right bs) -> do + case deserialiseOrFail @(Output method) bs of + Left _ -> pure (Left ErrorInvalidResponse) + Right x -> pure (Right x) + + + ServiceResponse _ (Left wtf) -> pure (Left wtf) + + _ -> pure (Left ErrorInvalidResponse) + + +makeClient :: forall api e m . ( MonadIO m + , HasProtocol e (ServiceProto api e) + , Pretty (Peer e) + ) + => ServiceCaller api e + -> ServiceProto api e + -> m () + +makeClient = notifyServiceCaller + diff --git a/hbs2-core/test/Main.hs b/hbs2-core/test/Main.hs index 728c3393..8f6b31ce 100644 --- a/hbs2-core/test/Main.hs +++ b/hbs2-core/test/Main.hs @@ -3,8 +3,9 @@ module Main where import TestFakeMessaging import TestActors import DialogSpec +import PrototypeGenericService -- import TestUniqProtoId -import TestCrypto +-- import TestCrypto import Test.Tasty import Test.Tasty.HUnit @@ -16,9 +17,8 @@ main = [ testCase "testFakeMessaging1" testFakeMessaging1 , testCase "testActorsBasic" testActorsBasic - -- , testCase "testUniqProtoId" testUniqProtoId - , testCrypto , testDialog + , testCase "protoGenericService" protoGenericService ] diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 90907ba1..71d38b3d 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -263,6 +263,55 @@ executable test-unix , vector +executable test-proto-service + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: PrototypeGenericService.hs + + build-depends: + base, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector + + test-suite test-tcp import: shared-properties import: common-deps diff --git a/hbs2-tests/test/PrototypeGenericService.hs b/hbs2-tests/test/PrototypeGenericService.hs new file mode 100644 index 00000000..ca6fa30c --- /dev/null +++ b/hbs2-tests/test/PrototypeGenericService.hs @@ -0,0 +1,128 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} + +module Main where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Service + +import HBS2.System.Logger.Simple + +import Codec.Serialise +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import System.FilePath.Posix +import System.IO +import System.IO.Temp +import UnliftIO.Async +import Data.List + +import Test.Tasty.HUnit + +data Method1 +data Method2 + +type MyServiceMethods1 = '[ Method1, Method2 ] + +instance HasProtocol UNIX (ServiceProto MyServiceMethods1 UNIX) where + type instance ProtocolId (ServiceProto MyServiceMethods1 UNIX) = 1 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance MonadIO m => HandleMethod m Method1 where + type instance Input Method1 = String + type instance Output Method1 = String + handleMethod n = do + debug $ "SERVICE1. METHOD1" <+> pretty n + case n of + "JOPA" -> pure "KITA" + "PECHEN" -> pure "TRESKI" + _ -> pure "X3" + +instance MonadIO m => HandleMethod m Method2 where + type instance Input Method2 = () + type instance Output Method2 = () + handleMethod _ = pure () + + +instance Monad m => HasFabriq UNIX (ReaderT MessagingUnix m) where + getFabriq = asks Fabriq + +instance Monad m => HasOwnPeer UNIX (ReaderT MessagingUnix m) where + ownPeer = asks msgUnixSelf + +instance HasProtocol e (ServiceProto api e) => HasTimeLimits e (ServiceProto api e) IO where + tryLockForPeriod _ _ = pure True + + +main :: IO () +main = do + + setLogging @DEBUG (logPrefix "[debug] ") + setLogging @INFO (logPrefix "") + setLogging @ERROR (logPrefix "[err] ") + setLogging @WARN (logPrefix "[warn] ") + setLogging @NOTICE (logPrefix "[notice] ") + setLogging @TRACE (logPrefix "[trace] ") + + liftIO $ hSetBuffering stdout LineBuffering + liftIO $ hSetBuffering stderr LineBuffering + + withSystemTempDirectory "test-unix-socket" $ \tmp -> do + + let soname = tmp "unix.socket" + + server <- newMessagingUnixOpts [MUFork] True 1.0 soname + client1 <- newMessagingUnix False 1.0 soname + + m1 <- async $ runMessagingUnix server + + pause @'Seconds 0.10 + + m2 <- async $ runMessagingUnix client1 + + p1 <- async $ flip runReaderT server do + runProto @UNIX + [ makeResponse (makeServer @MyServiceMethods1) + ] + + caller <- makeServiceCaller @MyServiceMethods1 @UNIX (msgUnixSelf server) + + p2 <- async $ runReaderT (runServiceClient caller) client1 + + link p1 + link p2 + + results <- forConcurrently ["JOPA", "PECHEN", "WTF?"] $ \r -> do + answ <- callService @Method1 caller r + pure (r, answ) + + debug $ "GOT RESPONSES (Method1): " <+> viaShow results + + assertBool "assert1" (sortOn fst results == [("JOPA",Right "KITA"),("PECHEN",Right "TRESKI"),("WTF?",Right "X3")] ) + + r2 <- callService @Method2 caller () + + debug $ "GOT RESPONSE (Method2): " <+> viaShow r2 + + assertBool "assert2" (r2 == Right ()) + + cancel p1 + pause @'Seconds 0.10 + + waitAnyCatchCancel [p1,p2,m1,m2] + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + +