Notify protocol

This commit is contained in:
Dmitry Zuikov 2023-10-31 07:33:29 +03:00
parent eab3175d52
commit 41ace9c87b
5 changed files with 565 additions and 1 deletions

View File

@ -112,6 +112,7 @@ library
, HBS2.Net.Proto.Definition
, HBS2.Net.Proto.Dialog
, HBS2.Net.Proto.Service
, HBS2.Net.Proto.Notify
, HBS2.Net.Proto.EncryptionHandshake
, HBS2.Net.Proto.Event.PeerExpired
, HBS2.Net.Proto.Peer
@ -255,4 +256,47 @@ test-suite test
, resourcet
test-suite test-notify
import: shared-properties
default-language: Haskell2010
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test/notify-unix
main-is: Main.hs
build-depends:
base, hbs2-core
, async
, bytestring
, cache
, containers
, directory
, hashable
, microlens-platform
, mtl
, prettyprinter
, QuickCheck
, quickcheck-instances
, random
, safe
, serialise
, stm
, streaming
, tasty
, tasty-quickcheck
, tasty-hunit
, tasty-quickcheck
, transformers
, uniplate
, vector
, saltine
, simple-logger
, string-conversions
, filepath
, temporary
, unliftio
, unordered-containers
, resourcet

View File

@ -0,0 +1,382 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
module HBS2.Net.Proto.Notify where
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Net.Proto.Types
import HBS2.Prelude.Plated
import HBS2.Net.Messaging.Unix (UNIX)
import HBS2.System.Logger.Simple
import Data.Function
import Codec.Serialise
import Control.Monad
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Kind
import Data.List qualified as List
import Data.Word
import Control.Concurrent.STM (flushTQueue)
import UnliftIO
instance (HasProtocol UNIX (NotifyProto ev0 UNIX)) => HasTimeLimits UNIX (NotifyProto ev0 UNIX) IO where
tryLockForPeriod _ _ = pure True
-- вряд ли у нас будут причины
-- иметь разные типы для NotifyHandle
type NotifyHandle = Word64
type ForNotify ev = ( Serialise (NotifyKey ev)
, Serialise (NotifyData ev)
, Serialise NotifyHandle
, Hashable (NotifyKey ev)
, Eq (NotifyKey ev)
)
data family NotifyKey ev :: Type
data family NotifyData ev :: Type
class (ForNotify ev) => NotifySource ev source where
startNotify :: forall m0 . MonadIO m0
=> source
-> NotifyKey ev
-> ( forall m . MonadIO m => NotifyHandle -> NotifyData ev -> m () )
-> m0 NotifyHandle
stopNotify :: forall m . MonadIO m => source -> NotifyHandle -> m ()
-- NOTE: on-notify-proto
-- можно было бы разнести на несколько (два) разных
-- протокола, но тогда у нас будет
-- 2*makeServer и 2*makeClient
-- так как каждый ev - это и есть отдельный протокол?
-- Или нет? Наверное, да, так как для каждого типа
-- эвента свой какой-то код их генерирует
data NotifyProto ev e =
NotifyWant Word64 (NotifyKey ev)
| NotifyGiven Word64 NotifyHandle
| NotifyAlive NotifyHandle
| Notify NotifyHandle (NotifyEvent ev)
| NotifyBye NotifyHandle
| NotifyError NotifyErr
deriving stock (Generic)
data NotifyEvent ev =
NotifyEvent
{ notifyKey :: NotifyKey ev
, notifyData :: NotifyData ev
}
deriving stock Generic
data NotifyErr = NotifyErrUnexpected
deriving stock (Generic, Show, Typeable)
instance Serialise NotifyErr
instance ForNotify ev => Serialise (NotifyEvent ev)
instance ForNotify ev => Serialise (NotifyProto ev e)
data NotifyEnv ev src e =
NotifyEnv
{ notifySource :: src
, notifyAlive :: TVar (HashMap NotifyHandle TimeSpec)
, notifyWho :: TVar (HashMap NotifyHandle (Peer e))
, notifyQ :: TQueue (NotifyHandle, Peer e, NotifyEvent ev)
}
newNotifyEnvServer :: ( NotifySource ev src
, MonadIO m
)
=> src
-> m (NotifyEnv ev src e)
newNotifyEnvServer src = NotifyEnv src <$> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
makeNotifyServer :: forall ev src e m . ( MonadIO m
, Response e (NotifyProto ev e) m
, NotifySource ev src
, Pretty (Peer e)
)
=> NotifyEnv ev src e
-> NotifyProto ev e
-> m ()
makeNotifyServer (NotifyEnv{..}) what = do
case what of
NotifyWant rn key -> do
trace "SERVER: NotifyWant"
who <- thatPeer (Proxy @(NotifyProto ev e))
hndl <- startNotify @ev @src @m notifySource key $ \ha d -> do
atomically $ writeTQueue notifyQ (ha, who, NotifyEvent key d)
now <- getTimeCoarse
atomically $ do
modifyTVar notifyAlive (HashMap.insert hndl now)
modifyTVar notifyWho (HashMap.insert hndl who)
response (NotifyGiven @ev @e rn hndl)
NotifyBye ha -> do
trace $ "SERVER: NotifyBye" <+> pretty ha
atomically $ modifyTVar notifyAlive (HashMap.insert ha 0)
NotifyAlive ha -> do
now <- getTimeCoarse
atomically $ modifyTVar notifyAlive (HashMap.insert ha now)
NotifyError{} -> pure ()
_ -> response (NotifyError @ev @e NotifyErrUnexpected)
runNotifyWorkerServer :: forall ev src e m . ( Request e (NotifyProto ev e) m
, ForNotify ev
, NotifySource ev src
, MonadUnliftIO m
)
=> NotifyEnv ev src e
-> m ()
runNotifyWorkerServer env = do
-- FIXNE: timeout-hardcode
let tmo = 60
let tnano = round $ realToFrac tmo * 1e9
cleanup <- async $ forever do
trace "SERVER: CLEANUP"
now <- getTimeCoarse
losers <- atomically do
current <- readTVar (notifyAlive env)
pips <- readTVar (notifyWho env) <&> HashMap.toList
let alive = HashMap.filter (\t -> toNanoSeconds (TimeoutTS (now - t)) < tnano ) current
let (winners, losers) = List.partition (\(h, _) -> HashMap.member h alive) pips
writeTVar (notifyAlive env) alive
writeTVar (notifyWho env) (HashMap.fromList winners)
pure losers
trace $ "SERVER" <+> "losers" <+> pretty (length losers)
forM_ losers $ \(h,p) -> do
stopNotify @ev (notifySource env) h
trace $ "SERVER: stopNotify" <+> pretty h
request p (NotifyBye @ev @e h)
pause @'Seconds tmo
work <- async $ forever do
-- TODO: several-threads
(ha, pip, ev) <- atomically $ readTQueue (notifyQ env)
request pip (Notify @ev @e ha ev)
mapM_ link [cleanup, work]
void $ waitAnyCancel [work, cleanup]
data NotifySinkTask ev e =
NotifySinkSubscribe Word64 (NotifyKey ev) (TQueue NotifyHandle)
| NotifySinkAlive NotifyHandle
| NotifySinkBye NotifyHandle
data NotifySink ev e =
NotifySink
{ sinkPipeline :: TQueue (NotifySinkTask ev e)
, sinkNotify :: TVar (HashMap NotifyHandle (TQueue (Maybe (NotifyData ev))))
, sinkWaiters :: TVar (HashMap Word64 (TQueue NotifyHandle))
, sinkRnum :: TVar Word64
}
newNotifySink :: MonadIO m => m (NotifySink ev e)
newNotifySink = NotifySink <$> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO 1
runNotifyWorkerClient :: forall ev e m . ( MonadUnliftIO m
, Request e (NotifyProto ev e) m
, HasOwnPeer e m
, Pretty (Peer e)
)
=> NotifySink ev e
-> m ()
runNotifyWorkerClient sink = do
let waiters = sinkWaiters sink
pip <- ownPeer
forever do
atomically (readTQueue (sinkPipeline sink)) >>= \case
NotifySinkSubscribe r k w -> do
atomically $ modifyTVar waiters (HashMap.insert r w)
void $ asyncLinked $ do
-- если ничего не произошло, через минуту удаляем
pause @'Seconds 60
atomically $ modifyTVar waiters (HashMap.delete r)
trace $ "CLIENT:" <+> "NotifySinkSubscribe"
request @e pip (NotifyWant @ev @e r k)
NotifySinkAlive h ->
request @e pip (NotifyAlive @ev @e h)
NotifySinkBye h -> do
trace $ "CLIENT:" <+> "NotifySinkBye" <+> viaShow h
request @e pip (NotifyBye @ev @e h)
atomically $ modifyTVar (sinkNotify sink) (HashMap.delete h)
makeNotifyClient :: forall ev e m . ( MonadUnliftIO m
, Request e (NotifyProto ev e) m
, Response e (NotifyProto ev e) m
, HasOwnPeer e m
, Pretty (Peer e)
)
=> NotifySink ev e
-> NotifyProto ev e
-> m ()
makeNotifyClient sink what = do
pip <- ownPeer
let waiters = sinkWaiters sink
case what of
Notify ha (NotifyEvent _ kd) -> do
-- debug $ "CLIENT: GOT NOTIFY!" <+> pretty ha
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
forM_ mq $ \q -> do
r <- try @_ @SomeException $ atomically $ writeTQueue q (Just kd)
let unsbscribe _ = do
-- на том конце очередь сдохла? удаляем
request @e pip (NotifyBye @ev @e ha)
atomically (modifyTVar (sinkNotify sink) (HashMap.delete ha))
pure ()
either unsbscribe (const none) r
NotifyGiven rn ha -> do
waiter <- atomically $ do
w <- readTVar waiters <&> HashMap.lookup rn
modifyTVar waiters (HashMap.delete rn)
pure w
forM_ waiter $ \wa -> do
void $ try @_ @SomeException $ atomically $ writeTQueue wa ha
NotifyBye ha -> do
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
forM_ mq $ \q -> do
void $ try @_ @SomeException $ atomically $ writeTQueue q Nothing
void $ atomically $ modifyTVar (sinkNotify sink) $ HashMap.delete ha
NotifyError e -> do
err $ "*** makeNotifyClient:" <+> viaShow e
_ -> pure ()
nextRNum :: MonadIO m => NotifySink ev e -> m Word64
nextRNum NotifySink{..} = do
atomically $ stateTVar sinkRnum (\s -> (s, succ s))
runNotifySink :: forall ev e m . MonadUnliftIO m
=> NotifySink ev e
-> NotifyKey ev
-> ( NotifyData ev -> m () )
-> m ()
runNotifySink sink k action = do
my <- nextRNum sink
answ <- newTQueueIO
debug "runNotifySink 1"
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ)
-- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь
ha <- atomically $ do
r <- readTQueue answ
flushTQueue answ
pure r
myQ <- newTQueueIO
atomically $ modifyTVar (sinkNotify sink) (HashMap.insert ha myQ)
w <- async $ forever do
pause @'Seconds 30
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkAlive ha)
-- NOTE: run-notify-sink-cleanup
-- если нас пристрелили --- попрощаться с NotifySink хотя бы
let cleanup = do
trace $ "CLIENT: cleanip and exit" <+> pretty ha
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkBye ha)
atomically $ modifyTVar (sinkNotify sink) (HashMap.delete ha)
cancel w
bracket_ none cleanup do
fix $ \next -> do
atomically (readTQueue myQ) >>= \case
Just ev -> action ev >> next
Nothing -> pure ()
data SomeCallback ev =
SomeCallback { callback :: forall m . MonadIO m => NotifyHandle -> NotifyData ev -> m () }
data SomeNotifySource ev =
SomeNotifySource
{ handleCount :: TVar NotifyHandle
, listeners :: TVar (HashMap NotifyHandle (SomeCallback ev))
}
newSomeNotifySource :: forall ev m . (MonadIO m, ForNotify ev)
=> m (SomeNotifySource ev)
newSomeNotifySource = SomeNotifySource @ev <$> newTVarIO 1
<*> newTVarIO mempty
instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where
startNotify src key fn = do
ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s)
atomically $ modifyTVar (listeners src) (HashMap.insert ha (SomeCallback @ev fn))
pure ha
stopNotify src ha = do
atomically $ modifyTVar (listeners src) (HashMap.delete ha)
emitNotify :: forall ev m . MonadIO m
=> SomeNotifySource ev
-> (NotifyKey ev, NotifyData ev)
-> m ()
emitNotify src (_,d) = do
who <- readTVarIO (listeners src) <&> HashMap.toList
for_ who $ \(h, SomeCallback cb) -> cb h d

View File

@ -6,6 +6,7 @@ module HBS2.Prelude
, void, guard, when, unless
, maybe1
, eitherToMaybe
, asyncLinked
, ToMPlus(..)
, Hashable
, lift
@ -94,3 +95,13 @@ instance Monad m => ToMPlus (MaybeT m) (Either x a) where
toMPlus (Left{}) = mzero
toMPlus (Right x) = MaybeT $ pure (Just x)
asyncLinked :: MonadUnliftIO m => m a -> m (Async a)
asyncLinked m = do
l <- async m
link l
pure l
-- asyncLinked :: forall m . MonadUnliftIO m =>

View File

@ -0,0 +1,127 @@
module Main where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Net.Proto
import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto.Notify
import HBS2.Actors.Peer
-- import HBS2.OrDie
import HBS2.System.Logger.Simple
-- import Codec.Serialise
import Control.Monad.Reader
-- import Control.Monad.Trans.Resource
import Data.ByteString.Lazy (ByteString)
-- import Lens.Micro.Platform
-- import Prettyprinter
import System.FilePath.Posix
-- import System.IO
-- import System.IO.Temp
-- import UnliftIO.Async
-- import UnliftIO qualified as UIO
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import UnliftIO
import Codec.Serialise
data Tick = Tick Int
deriving stock Generic
instance Serialise Tick
instance HasProtocol UNIX (NotifyProto Tick UNIX) where
type instance ProtocolId (NotifyProto Tick UNIX) = 0xd71049a1bffb70c4
type instance Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
newtype instance NotifyKey Tick =
TickNotifyKey ()
deriving (Generic,Eq)
deriving newtype Hashable
newtype instance NotifyData Tick =
TickNotifyData Int
deriving Generic
instance Serialise (NotifyKey Tick)
instance Serialise (NotifyData Tick)
runTickTack :: MonadIO m => r -> ReaderT r m a -> m a
runTickTack s m = runReaderT m s
main :: IO ()
main = do
setLogging @DEBUG (logPrefix "[debug] ")
setLogging @INFO (logPrefix "")
setLogging @ERROR (logPrefix "[err] ")
setLogging @WARN (logPrefix "[warn] ")
setLogging @NOTICE (logPrefix "[notice] ")
setLogging @TRACE (logPrefix "[trace] ")
liftIO $ hSetBuffering stdout LineBuffering
liftIO $ hSetBuffering stderr LineBuffering
withSystemTempDirectory "test-unix-socket" $ \tmp -> do
let soname = tmp </> "unix.socket"
server <- newMessagingUnix True 1.0 soname
client1 <- newMessagingUnix False 1.0 soname
m1 <- async $ runMessagingUnix server
m2 <- async $ runMessagingUnix client1
src <- newSomeNotifySource @Tick
-- запускаем "часы"
emitter <- async $ do
sec <- newTVarIO 0
forever do
sn <- atomically $ stateTVar sec (\s -> (s, succ s))
emitNotify src (TickNotifyKey (), TickNotifyData sn)
debug "SERVER: TICK!"
pause @'Seconds 1
-- запускаем сервер
p1 <- async $ liftIO $ runTickTack server do
env1 <- newNotifyEnvServer @Tick src
w <- async $ runNotifyWorkerServer env1
link w
runProto @UNIX
[ makeResponse (makeNotifyServer env1)
]
sink <- newNotifySink
-- запускаем клиента
p2 <- async $ runTickTack client1 $ do
void $ asyncLinked $ runNotifyWorkerClient sink
runProto @UNIX
[ makeResponse (makeNotifyClient @Tick sink)
]
s1 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do
debug $ "CLIENT1:" <+> viaShow td
s2 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do
debug $ "CLIENT2:" <+> viaShow td
s3 <- async $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do
debug $ "CLIENT3:" <+> viaShow td
void $ async do
pause @'Seconds 3
cancelWith s3 (toException (userError "Fuck you!"))
void $ waitAnyCatchCancel [p1,p2,m1,m2,s1,s2]

View File

@ -78,7 +78,7 @@ main = do
let soname = tmp </> "unix.socket"
server <- newMessagingUnixOpts [MUFork] True 1.0 soname
server <- newMessagingUnix True 1.0 soname
client1 <- newMessagingUnix False 1.0 soname
m1 <- async $ runMessagingUnix server