hbs2/hbs2-core/lib/HBS2/Net/Proto/Notify.hs

447 lines
14 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PolyKinds #-}
module HBS2.Net.Proto.Notify where
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Net.Proto.Types
import HBS2.Prelude.Plated
import HBS2.Net.Messaging.Unix (UNIX)
import HBS2.System.Logger.Simple
import Data.Function
import Codec.Serialise
import Control.Monad
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Kind
import Data.List qualified as List
import Data.Word
import Control.Concurrent.STM (flushTQueue)
import Data.Maybe
import Data.Either
import UnliftIO
import System.IO (hPrint)
instance (HasProtocol UNIX (NotifyProto ev0 UNIX)) => HasTimeLimits UNIX (NotifyProto ev0 UNIX) IO where
tryLockForPeriod _ _ = pure True
-- вряд ли у нас будут причины
-- иметь разные типы для NotifyHandle
type NotifyHandle = Word64
type ForNotify ev = ( Serialise (NotifyKey ev)
, Serialise (NotifyData ev)
, Serialise NotifyHandle
, Hashable (NotifyKey ev)
, Eq (NotifyKey ev)
)
data family NotifyKey ev :: Type
data family NotifyData ev :: Type
class (ForNotify ev) => NotifySource ev source where
startNotify :: forall m0 . MonadIO m0
=> source
-> NotifyKey ev
-> ( forall m . MonadIO m => NotifyHandle -> NotifyData ev -> m () )
-> m0 NotifyHandle
stopNotify :: forall m . MonadIO m => source -> NotifyHandle -> m ()
-- NOTE: on-notify-proto
-- можно было бы разнести на несколько (два) разных
-- протокола, но тогда у нас будет
-- 2*makeServer и 2*makeClient
-- так как каждый ev - это и есть отдельный протокол?
-- Или нет? Наверное, да, так как для каждого типа
-- эвента свой какой-то код их генерирует
data NotifyProto ev e =
NotifyPing
| NotifyPong
| NotifyWant Word64 (NotifyKey ev)
| NotifyGiven Word64 NotifyHandle
| NotifyAlive NotifyHandle
| Notify NotifyHandle (NotifyEvent ev)
| NotifyBye NotifyHandle
| NotifyError NotifyErr
deriving stock (Generic)
data NotifyEvent ev =
NotifyEvent
{ notifyKey :: NotifyKey ev
, notifyData :: NotifyData ev
}
deriving stock Generic
data NotifyErr = NotifyErrUnexpected
deriving stock (Generic, Show, Typeable)
instance Serialise NotifyErr
instance ForNotify ev => Serialise (NotifyEvent ev)
instance ForNotify ev => Serialise (NotifyProto ev e)
data NotifyEnv ev src e =
NotifyEnv
{ notifySource :: src
, notifyAlive :: TVar (HashMap NotifyHandle TimeSpec)
, notifyWho :: TVar (HashMap NotifyHandle (Peer e))
, notifyQ :: TQueue (NotifyHandle, Peer e, NotifyEvent ev)
}
newNotifyEnvServer :: ( NotifySource ev src
, MonadIO m
)
=> src
-> m (NotifyEnv ev src e)
newNotifyEnvServer src = NotifyEnv src <$> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
makeNotifyServer :: forall ev src e m proto . ( MonadIO m
, Response e proto m
, NotifySource ev src
, HasDeferred proto e m
, Pretty (Peer e)
, proto ~ NotifyProto ev e
)
=> NotifyEnv ev src e
-> NotifyProto ev e
-> m ()
makeNotifyServer (NotifyEnv{..}) what = do
case what of
NotifyWant rn key -> deferred @proto do
debug "SERVER: NotifyWant"
who <- thatPeer (Proxy @(NotifyProto ev e))
hndl <- startNotify @ev @src @m notifySource key $ \ha d -> do
atomically $ writeTQueue notifyQ (ha, who, NotifyEvent key d)
now <- getTimeCoarse
atomically $ do
modifyTVar notifyAlive (HashMap.insert hndl now)
modifyTVar notifyWho (HashMap.insert hndl who)
debug $ "SEND GIVEN TO" <+> viaShow hndl <+> pretty who
response (NotifyGiven @ev @e rn hndl)
NotifyBye ha -> do
trace $ "SERVER: NotifyBye" <+> pretty ha
atomically $ modifyTVar notifyAlive (HashMap.insert ha 0)
NotifyAlive ha -> do
now <- getTimeCoarse
atomically $ modifyTVar notifyAlive (HashMap.insert ha now)
NotifyError{} -> pure ()
NotifyPing{} -> do
response (NotifyPong @ev @e)
_ -> response (NotifyError @ev @e NotifyErrUnexpected)
runNotifyWorkerServer :: forall ev src e m . ( Request e (NotifyProto ev e) m
, ForNotify ev
, NotifySource ev src
, Pretty (Peer e)
, MonadUnliftIO m
)
=> NotifyEnv ev src e
-> m ()
runNotifyWorkerServer env = do
-- FIXNE: timeout-hardcode
let tmo = 60
let tnano = round $ realToFrac tmo * 1e9
cleanup <- async $ forever do
trace "SERVER: CLEANUP"
now <- getTimeCoarse
losers <- atomically do
current <- readTVar (notifyAlive env)
pips <- readTVar (notifyWho env) <&> HashMap.toList
let alive = HashMap.filter (\t -> toNanoSeconds (TimeoutTS (now - t)) < tnano ) current
let (winners, losers) = List.partition (\(h, _) -> HashMap.member h alive) pips
writeTVar (notifyAlive env) alive
writeTVar (notifyWho env) (HashMap.fromList winners)
pure losers
trace $ "SERVER" <+> "losers" <+> pretty (length losers)
forM_ losers $ \(h,p) -> do
stopNotify @ev (notifySource env) h
trace $ "SERVER: stopNotify" <+> pretty h
request p (NotifyBye @ev @e h)
pause @'Seconds tmo
work <- async $ forever do
-- TODO: several-threads
(ha, pip, ev) <- atomically $ readTQueue (notifyQ env)
debug $ "SENT-NOTIFY-TO" <+> pretty pip
request pip (Notify @ev @e ha ev)
mapM_ link [cleanup, work]
void $ waitAnyCancel [work, cleanup]
data NotifySinkTask ev e =
NotifySinkSubscribe Word64 (NotifyKey ev) (TQueue NotifyHandle)
| NotifySinkAlive NotifyHandle
| NotifySinkBye NotifyHandle
| NotifySinkPing
data NotifySink ev e =
NotifySink
{ sinkPipeline :: TQueue (NotifySinkTask ev e)
, sinkPong :: TQueue ()
, sinkPongNum :: TVar Int
, sinkNotify :: TVar (HashMap NotifyHandle (TQueue (Maybe (NotifyData ev))))
, sinkWaiters :: TVar (HashMap Word64 (TQueue NotifyHandle))
, sinkRnum :: TVar Word64
}
newNotifySink :: MonadIO m => m (NotifySink ev e)
newNotifySink = NotifySink <$> newTQueueIO
<*> newTQueueIO
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO 1
runNotifyWorkerClient :: forall ev e m . ( MonadUnliftIO m
, Request e (NotifyProto ev e) m
, HasOwnPeer e m
, Pretty (Peer e)
)
=> NotifySink ev e
-> m ()
runNotifyWorkerClient sink = do
let waiters = sinkWaiters sink
pip <- ownPeer
atomically $ writeTVar (sinkPongNum sink) 0
fix \next -> do
request @e pip (NotifyPing @ev @e)
what <- race (pause @'Seconds 2) do
debug "runNotifyWorkerClient.sendPing"
atomically $ readTQueue (sinkPong sink)
either (const next) (const none) what
atomically $ modifyTVar (sinkPongNum sink) succ
debug "runNotifyWorkerClient.run"
forever do
atomically (readTQueue (sinkPipeline sink)) >>= \case
NotifySinkSubscribe r k w -> do
atomically $ modifyTVar waiters (HashMap.insert r w)
void $ asyncLinked $ void $ try @_ @SomeException $ do
-- если ничего не произошло, через минуту удаляем
pause @'Seconds 60
atomically $ modifyTVar waiters (HashMap.delete r)
trace $ "CLIENT:" <+> "SEND NotifySinkSubscribe" <+> pretty r
request @e pip (NotifyWant @ev @e r k)
NotifySinkAlive h ->
request @e pip (NotifyAlive @ev @e h)
NotifySinkPing -> do
request @e pip (NotifyPing @ev @e)
NotifySinkBye h -> do
trace $ "CLIENT:" <+> "NotifySinkBye" <+> viaShow h
request @e pip (NotifyBye @ev @e h)
atomically $ modifyTVar (sinkNotify sink) (HashMap.delete h)
makeNotifyClient :: forall ev e m . ( MonadUnliftIO m
, Request e (NotifyProto ev e) m
, Response e (NotifyProto ev e) m
, HasOwnPeer e m
, Pretty (Peer e)
)
=> NotifySink ev e
-> NotifyProto ev e
-> m ()
makeNotifyClient sink what = do
pip <- ownPeer
let waiters = sinkWaiters sink
case what of
Notify ha (NotifyEvent _ kd) -> do
trace $ "CLIENT: GOT NOTIFY!" <+> pretty ha
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
forM_ mq $ \q -> do
r <- try @_ @SomeException $ atomically $ writeTQueue q (Just kd)
when (isLeft r) do
debug "LEFT writeTQueue"
let unsbscribe _ = do
debug "UNSUBSCRIBE!"
-- на том конце очередь сдохла? удаляем
request @e pip (NotifyBye @ev @e ha)
atomically (modifyTVar (sinkNotify sink) (HashMap.delete ha))
pure ()
either unsbscribe (const none) r
NotifyGiven rn ha -> do
trace $ "CLIENT: GOT NOTIFY GIVEN!" <+> pretty ha
waiter <- atomically $ do
w <- readTVar waiters <&> HashMap.lookup rn
modifyTVar waiters (HashMap.delete rn)
pure w
forM_ waiter $ \wa -> do
r <- try @_ @SomeException $ atomically $ writeTQueue wa ha
debug $ "NOTIFY CLIENT SUBSCRIBED" <+> viaShow rn
NotifyBye ha -> do
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
forM_ mq $ \q -> do
void $ try @_ @SomeException $ atomically $ writeTQueue q Nothing
void $ atomically $ modifyTVar (sinkNotify sink) $ HashMap.delete ha
NotifyPong{} -> do
void $ atomically $ writeTQueue (sinkPong sink) ()
NotifyError e -> do
err $ "*** makeNotifyClient:" <+> viaShow e
_ -> pure ()
nextRNum :: MonadIO m => NotifySink ev e -> m Word64
nextRNum NotifySink{..} = do
atomically $ stateTVar sinkRnum (\s -> (s, succ s))
runNotifySink :: forall ev e m . MonadUnliftIO m
=> NotifySink ev e
-> NotifyKey ev
-> ( NotifyData ev -> m () )
-> m ()
runNotifySink sink k action = do
fix \next -> do
snum <- readTVarIO (sinkPongNum sink)
if snum > 0 then
none
else do
pause @'Seconds 0.5
next
ha <- fix \next -> do
r <- race (pause @'Seconds 1) do
my <- nextRNum sink
answ <- newTQueueIO
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ)
-- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь
atomically $ do
r <- readTQueue answ
flushTQueue answ
pure r
case r of
Right x -> pure x
Left{} -> do
debug "Retry subscribing..."
pause @'Seconds 1
next
myQ <- newTQueueIO
atomically $ modifyTVar (sinkNotify sink) (HashMap.insert ha myQ)
w <- async $ forever do
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkAlive ha)
pause @'Seconds 30
-- NOTE: run-notify-sink-cleanup
-- если нас пристрелили --- попрощаться с NotifySink хотя бы
let cleanup = do
trace $ "CLIENT: cleanip and exit" <+> pretty ha
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkBye ha)
atomically $ modifyTVar (sinkNotify sink) (HashMap.delete ha)
cancel w
bracket_ none cleanup do
fix $ \next -> do
atomically (readTQueue myQ) >>= \case
Just ev -> action ev >> next
Nothing -> pure ()
data SomeCallback ev =
SomeCallback { callback :: forall m . MonadIO m => NotifyHandle -> NotifyData ev -> m () }
data SomeNotifySource ev =
SomeNotifySource
{ handleCount :: TVar NotifyHandle
, listeners :: TVar (HashMap (NotifyKey ev) [(NotifyHandle, SomeCallback ev)])
}
newSomeNotifySource :: forall ev m . (MonadIO m, ForNotify ev)
=> m (SomeNotifySource ev)
newSomeNotifySource = SomeNotifySource @ev <$> newTVarIO 1
<*> newTVarIO mempty
instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where
startNotify src key fn = do
ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s)
debug $ "Start notify!"
atomically $ modifyTVar (listeners src) (HashMap.insertWith (<>) key [(ha, SomeCallback @ev fn)])
pure ha
stopNotify src ha = do
atomically do
modifyTVar (listeners src) (HashMap.map (filter ((/= ha) . fst )))
modifyTVar (listeners src) (HashMap.filter (not . null))
emitNotify :: forall ev m . (ForNotify ev, MonadIO m)
=> SomeNotifySource ev
-> (NotifyKey ev, NotifyData ev)
-> m ()
emitNotify src (k,d) = do
who <- readTVarIO (listeners src) <&> HashMap.lookup k <&> fromMaybe mempty
for_ who $ \(h, SomeCallback cb) -> cb h d