From 41ace9c87b983a17a4ff667aef64c1c6a52f85ad Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 31 Oct 2023 07:33:29 +0300 Subject: [PATCH] Notify protocol --- hbs2-core/hbs2-core.cabal | 44 +++ hbs2-core/lib/HBS2/Net/Proto/Notify.hs | 382 +++++++++++++++++++++ hbs2-core/lib/HBS2/Prelude.hs | 11 + hbs2-core/test/notify-unix/Main.hs | 127 +++++++ hbs2-tests/test/PrototypeGenericService.hs | 2 +- 5 files changed, 565 insertions(+), 1 deletion(-) create mode 100644 hbs2-core/lib/HBS2/Net/Proto/Notify.hs create mode 100644 hbs2-core/test/notify-unix/Main.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index d669e4aa..1ab1e5a8 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -112,6 +112,7 @@ library , HBS2.Net.Proto.Definition , HBS2.Net.Proto.Dialog , HBS2.Net.Proto.Service + , HBS2.Net.Proto.Notify , HBS2.Net.Proto.EncryptionHandshake , HBS2.Net.Proto.Event.PeerExpired , HBS2.Net.Proto.Peer @@ -255,4 +256,47 @@ test-suite test , resourcet +test-suite test-notify + import: shared-properties + default-language: Haskell2010 + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test/notify-unix + main-is: Main.hs + build-depends: + base, hbs2-core + , async + , bytestring + , cache + , containers + , directory + , hashable + , microlens-platform + , mtl + , prettyprinter + , QuickCheck + , quickcheck-instances + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-quickcheck + , tasty-hunit + , tasty-quickcheck + , transformers + , uniplate + , vector + , saltine + , simple-logger + , string-conversions + , filepath + , temporary + , unliftio + , unordered-containers + , resourcet + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs new file mode 100644 index 00000000..64d92314 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs @@ -0,0 +1,382 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PolyKinds #-} +module HBS2.Net.Proto.Notify where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Net.Proto.Types +import HBS2.Prelude.Plated +import HBS2.Net.Messaging.Unix (UNIX) + +import HBS2.System.Logger.Simple + +import Data.Function +import Codec.Serialise +import Control.Monad +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.Kind +import Data.List qualified as List +import Data.Word +import Control.Concurrent.STM (flushTQueue) +import UnliftIO + + +instance (HasProtocol UNIX (NotifyProto ev0 UNIX)) => HasTimeLimits UNIX (NotifyProto ev0 UNIX) IO where + tryLockForPeriod _ _ = pure True + +-- вряд ли у нас будут причины +-- иметь разные типы для NotifyHandle +type NotifyHandle = Word64 + +type ForNotify ev = ( Serialise (NotifyKey ev) + , Serialise (NotifyData ev) + , Serialise NotifyHandle + , Hashable (NotifyKey ev) + , Eq (NotifyKey ev) + ) + +data family NotifyKey ev :: Type +data family NotifyData ev :: Type + +class (ForNotify ev) => NotifySource ev source where + + startNotify :: forall m0 . MonadIO m0 + => source + -> NotifyKey ev + -> ( forall m . MonadIO m => NotifyHandle -> NotifyData ev -> m () ) + -> m0 NotifyHandle + + stopNotify :: forall m . MonadIO m => source -> NotifyHandle -> m () + +-- NOTE: on-notify-proto +-- можно было бы разнести на несколько (два) разных +-- протокола, но тогда у нас будет +-- 2*makeServer и 2*makeClient +-- так как каждый ev - это и есть отдельный протокол? +-- Или нет? Наверное, да, так как для каждого типа +-- эвента свой какой-то код их генерирует +data NotifyProto ev e = + NotifyWant Word64 (NotifyKey ev) + | NotifyGiven Word64 NotifyHandle + | NotifyAlive NotifyHandle + | Notify NotifyHandle (NotifyEvent ev) + | NotifyBye NotifyHandle + | NotifyError NotifyErr + deriving stock (Generic) + +data NotifyEvent ev = + NotifyEvent + { notifyKey :: NotifyKey ev + , notifyData :: NotifyData ev + } + deriving stock Generic + +data NotifyErr = NotifyErrUnexpected + deriving stock (Generic, Show, Typeable) + +instance Serialise NotifyErr + +instance ForNotify ev => Serialise (NotifyEvent ev) + +instance ForNotify ev => Serialise (NotifyProto ev e) + +data NotifyEnv ev src e = + NotifyEnv + { notifySource :: src + , notifyAlive :: TVar (HashMap NotifyHandle TimeSpec) + , notifyWho :: TVar (HashMap NotifyHandle (Peer e)) + , notifyQ :: TQueue (NotifyHandle, Peer e, NotifyEvent ev) + } + +newNotifyEnvServer :: ( NotifySource ev src + , MonadIO m + ) + => src + -> m (NotifyEnv ev src e) +newNotifyEnvServer src = NotifyEnv src <$> newTVarIO mempty + <*> newTVarIO mempty + <*> newTQueueIO + +makeNotifyServer :: forall ev src e m . ( MonadIO m + , Response e (NotifyProto ev e) m + , NotifySource ev src + , Pretty (Peer e) + ) + => NotifyEnv ev src e + -> NotifyProto ev e + -> m () + +makeNotifyServer (NotifyEnv{..}) what = do + + case what of + NotifyWant rn key -> do + + trace "SERVER: NotifyWant" + + who <- thatPeer (Proxy @(NotifyProto ev e)) + + hndl <- startNotify @ev @src @m notifySource key $ \ha d -> do + atomically $ writeTQueue notifyQ (ha, who, NotifyEvent key d) + + now <- getTimeCoarse + + atomically $ do + modifyTVar notifyAlive (HashMap.insert hndl now) + modifyTVar notifyWho (HashMap.insert hndl who) + + response (NotifyGiven @ev @e rn hndl) + + NotifyBye ha -> do + trace $ "SERVER: NotifyBye" <+> pretty ha + atomically $ modifyTVar notifyAlive (HashMap.insert ha 0) + + NotifyAlive ha -> do + now <- getTimeCoarse + atomically $ modifyTVar notifyAlive (HashMap.insert ha now) + + NotifyError{} -> pure () + + _ -> response (NotifyError @ev @e NotifyErrUnexpected) + + +runNotifyWorkerServer :: forall ev src e m . ( Request e (NotifyProto ev e) m + , ForNotify ev + , NotifySource ev src + , MonadUnliftIO m + ) + => NotifyEnv ev src e + -> m () + +runNotifyWorkerServer env = do + + -- FIXNE: timeout-hardcode + let tmo = 60 + let tnano = round $ realToFrac tmo * 1e9 + + cleanup <- async $ forever do + + trace "SERVER: CLEANUP" + + now <- getTimeCoarse + + losers <- atomically do + current <- readTVar (notifyAlive env) + + pips <- readTVar (notifyWho env) <&> HashMap.toList + + let alive = HashMap.filter (\t -> toNanoSeconds (TimeoutTS (now - t)) < tnano ) current + + let (winners, losers) = List.partition (\(h, _) -> HashMap.member h alive) pips + + writeTVar (notifyAlive env) alive + writeTVar (notifyWho env) (HashMap.fromList winners) + + pure losers + + trace $ "SERVER" <+> "losers" <+> pretty (length losers) + + forM_ losers $ \(h,p) -> do + stopNotify @ev (notifySource env) h + trace $ "SERVER: stopNotify" <+> pretty h + request p (NotifyBye @ev @e h) + + pause @'Seconds tmo + + work <- async $ forever do + -- TODO: several-threads + (ha, pip, ev) <- atomically $ readTQueue (notifyQ env) + request pip (Notify @ev @e ha ev) + + mapM_ link [cleanup, work] + + void $ waitAnyCancel [work, cleanup] + + +data NotifySinkTask ev e = + NotifySinkSubscribe Word64 (NotifyKey ev) (TQueue NotifyHandle) + | NotifySinkAlive NotifyHandle + | NotifySinkBye NotifyHandle + +data NotifySink ev e = + NotifySink + { sinkPipeline :: TQueue (NotifySinkTask ev e) + , sinkNotify :: TVar (HashMap NotifyHandle (TQueue (Maybe (NotifyData ev)))) + , sinkWaiters :: TVar (HashMap Word64 (TQueue NotifyHandle)) + , sinkRnum :: TVar Word64 + } + +newNotifySink :: MonadIO m => m (NotifySink ev e) +newNotifySink = NotifySink <$> newTQueueIO + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO 1 + + +runNotifyWorkerClient :: forall ev e m . ( MonadUnliftIO m + , Request e (NotifyProto ev e) m + , HasOwnPeer e m + , Pretty (Peer e) + ) + => NotifySink ev e + -> m () + +runNotifyWorkerClient sink = do + let waiters = sinkWaiters sink + pip <- ownPeer + forever do + atomically (readTQueue (sinkPipeline sink)) >>= \case + + NotifySinkSubscribe r k w -> do + atomically $ modifyTVar waiters (HashMap.insert r w) + + void $ asyncLinked $ do + -- если ничего не произошло, через минуту удаляем + pause @'Seconds 60 + atomically $ modifyTVar waiters (HashMap.delete r) + + trace $ "CLIENT:" <+> "NotifySinkSubscribe" + request @e pip (NotifyWant @ev @e r k) + + NotifySinkAlive h -> + request @e pip (NotifyAlive @ev @e h) + + NotifySinkBye h -> do + trace $ "CLIENT:" <+> "NotifySinkBye" <+> viaShow h + request @e pip (NotifyBye @ev @e h) + atomically $ modifyTVar (sinkNotify sink) (HashMap.delete h) + +makeNotifyClient :: forall ev e m . ( MonadUnliftIO m + , Request e (NotifyProto ev e) m + , Response e (NotifyProto ev e) m + , HasOwnPeer e m + , Pretty (Peer e) + ) + => NotifySink ev e + -> NotifyProto ev e + -> m () + +makeNotifyClient sink what = do + + pip <- ownPeer + let waiters = sinkWaiters sink + + case what of + Notify ha (NotifyEvent _ kd) -> do + -- debug $ "CLIENT: GOT NOTIFY!" <+> pretty ha + mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha + forM_ mq $ \q -> do + r <- try @_ @SomeException $ atomically $ writeTQueue q (Just kd) + + let unsbscribe _ = do + -- на том конце очередь сдохла? удаляем + request @e pip (NotifyBye @ev @e ha) + atomically (modifyTVar (sinkNotify sink) (HashMap.delete ha)) + pure () + + either unsbscribe (const none) r + + NotifyGiven rn ha -> do + waiter <- atomically $ do + w <- readTVar waiters <&> HashMap.lookup rn + modifyTVar waiters (HashMap.delete rn) + pure w + + forM_ waiter $ \wa -> do + void $ try @_ @SomeException $ atomically $ writeTQueue wa ha + + NotifyBye ha -> do + mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha + forM_ mq $ \q -> do + void $ try @_ @SomeException $ atomically $ writeTQueue q Nothing + + void $ atomically $ modifyTVar (sinkNotify sink) $ HashMap.delete ha + + NotifyError e -> do + err $ "*** makeNotifyClient:" <+> viaShow e + + _ -> pure () + + +nextRNum :: MonadIO m => NotifySink ev e -> m Word64 +nextRNum NotifySink{..} = do + atomically $ stateTVar sinkRnum (\s -> (s, succ s)) + +runNotifySink :: forall ev e m . MonadUnliftIO m + => NotifySink ev e + -> NotifyKey ev + -> ( NotifyData ev -> m () ) + -> m () + +runNotifySink sink k action = do + + my <- nextRNum sink + + answ <- newTQueueIO + + debug "runNotifySink 1" + atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ) + + -- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь + ha <- atomically $ do + r <- readTQueue answ + flushTQueue answ + pure r + + myQ <- newTQueueIO + atomically $ modifyTVar (sinkNotify sink) (HashMap.insert ha myQ) + + w <- async $ forever do + pause @'Seconds 30 + atomically $ writeTQueue (sinkPipeline sink) (NotifySinkAlive ha) + + -- NOTE: run-notify-sink-cleanup + -- если нас пристрелили --- попрощаться с NotifySink хотя бы + let cleanup = do + trace $ "CLIENT: cleanip and exit" <+> pretty ha + atomically $ writeTQueue (sinkPipeline sink) (NotifySinkBye ha) + atomically $ modifyTVar (sinkNotify sink) (HashMap.delete ha) + cancel w + + bracket_ none cleanup do + fix $ \next -> do + atomically (readTQueue myQ) >>= \case + Just ev -> action ev >> next + Nothing -> pure () + + +data SomeCallback ev = + SomeCallback { callback :: forall m . MonadIO m => NotifyHandle -> NotifyData ev -> m () } + +data SomeNotifySource ev = + SomeNotifySource + { handleCount :: TVar NotifyHandle + , listeners :: TVar (HashMap NotifyHandle (SomeCallback ev)) + } + +newSomeNotifySource :: forall ev m . (MonadIO m, ForNotify ev) + => m (SomeNotifySource ev) + +newSomeNotifySource = SomeNotifySource @ev <$> newTVarIO 1 + <*> newTVarIO mempty + +instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where + + startNotify src key fn = do + ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s) + atomically $ modifyTVar (listeners src) (HashMap.insert ha (SomeCallback @ev fn)) + pure ha + + stopNotify src ha = do + atomically $ modifyTVar (listeners src) (HashMap.delete ha) + +emitNotify :: forall ev m . MonadIO m + => SomeNotifySource ev + -> (NotifyKey ev, NotifyData ev) + -> m () + +emitNotify src (_,d) = do + who <- readTVarIO (listeners src) <&> HashMap.toList + for_ who $ \(h, SomeCallback cb) -> cb h d + diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index 9d581f75..a30eee1e 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -6,6 +6,7 @@ module HBS2.Prelude , void, guard, when, unless , maybe1 , eitherToMaybe + , asyncLinked , ToMPlus(..) , Hashable , lift @@ -94,3 +95,13 @@ instance Monad m => ToMPlus (MaybeT m) (Either x a) where toMPlus (Left{}) = mzero toMPlus (Right x) = MaybeT $ pure (Just x) + +asyncLinked :: MonadUnliftIO m => m a -> m (Async a) +asyncLinked m = do + l <- async m + link l + pure l + + +-- asyncLinked :: forall m . MonadUnliftIO m => + diff --git a/hbs2-core/test/notify-unix/Main.hs b/hbs2-core/test/notify-unix/Main.hs new file mode 100644 index 00000000..0cd2d697 --- /dev/null +++ b/hbs2-core/test/notify-unix/Main.hs @@ -0,0 +1,127 @@ +module Main where + +import HBS2.Prelude.Plated +import HBS2.Clock +import HBS2.Net.Proto +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto.Notify +import HBS2.Actors.Peer +-- import HBS2.OrDie + +import HBS2.System.Logger.Simple + +-- import Codec.Serialise +import Control.Monad.Reader +-- import Control.Monad.Trans.Resource +import Data.ByteString.Lazy (ByteString) +-- import Lens.Micro.Platform +-- import Prettyprinter +import System.FilePath.Posix +-- import System.IO +-- import System.IO.Temp +-- import UnliftIO.Async +-- import UnliftIO qualified as UIO +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import UnliftIO +import Codec.Serialise + +data Tick = Tick Int + deriving stock Generic + +instance Serialise Tick + + +instance HasProtocol UNIX (NotifyProto Tick UNIX) where + type instance ProtocolId (NotifyProto Tick UNIX) = 0xd71049a1bffb70c4 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + + + +newtype instance NotifyKey Tick = + TickNotifyKey () + deriving (Generic,Eq) + deriving newtype Hashable + +newtype instance NotifyData Tick = + TickNotifyData Int + deriving Generic + + + +instance Serialise (NotifyKey Tick) +instance Serialise (NotifyData Tick) + + +runTickTack :: MonadIO m => r -> ReaderT r m a -> m a +runTickTack s m = runReaderT m s + +main :: IO () +main = do + + setLogging @DEBUG (logPrefix "[debug] ") + setLogging @INFO (logPrefix "") + setLogging @ERROR (logPrefix "[err] ") + setLogging @WARN (logPrefix "[warn] ") + setLogging @NOTICE (logPrefix "[notice] ") + setLogging @TRACE (logPrefix "[trace] ") + + liftIO $ hSetBuffering stdout LineBuffering + liftIO $ hSetBuffering stderr LineBuffering + + withSystemTempDirectory "test-unix-socket" $ \tmp -> do + + let soname = tmp "unix.socket" + + server <- newMessagingUnix True 1.0 soname + client1 <- newMessagingUnix False 1.0 soname + + m1 <- async $ runMessagingUnix server + m2 <- async $ runMessagingUnix client1 + + src <- newSomeNotifySource @Tick + + -- запускаем "часы" + emitter <- async $ do + sec <- newTVarIO 0 + forever do + sn <- atomically $ stateTVar sec (\s -> (s, succ s)) + emitNotify src (TickNotifyKey (), TickNotifyData sn) + debug "SERVER: TICK!" + pause @'Seconds 1 + + -- запускаем сервер + p1 <- async $ liftIO $ runTickTack server do + env1 <- newNotifyEnvServer @Tick src + w <- async $ runNotifyWorkerServer env1 + link w + runProto @UNIX + [ makeResponse (makeNotifyServer env1) + ] + + sink <- newNotifySink + + -- запускаем клиента + p2 <- async $ runTickTack client1 $ do + void $ asyncLinked $ runNotifyWorkerClient sink + runProto @UNIX + [ makeResponse (makeNotifyClient @Tick sink) + ] + + s1 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do + debug $ "CLIENT1:" <+> viaShow td + + s2 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do + debug $ "CLIENT2:" <+> viaShow td + + s3 <- async $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do + debug $ "CLIENT3:" <+> viaShow td + + void $ async do + pause @'Seconds 3 + cancelWith s3 (toException (userError "Fuck you!")) + + void $ waitAnyCatchCancel [p1,p2,m1,m2,s1,s2] + diff --git a/hbs2-tests/test/PrototypeGenericService.hs b/hbs2-tests/test/PrototypeGenericService.hs index 15c22c81..ea8e62c0 100644 --- a/hbs2-tests/test/PrototypeGenericService.hs +++ b/hbs2-tests/test/PrototypeGenericService.hs @@ -78,7 +78,7 @@ main = do let soname = tmp "unix.socket" - server <- newMessagingUnixOpts [MUFork] True 1.0 soname + server <- newMessagingUnix True 1.0 soname client1 <- newMessagingUnix False 1.0 soname m1 <- async $ runMessagingUnix server