mirror of https://github.com/voidlizard/hbs2
merged notify-proto-debug fixes
This commit is contained in:
parent
35905b94bd
commit
002ecf7b3e
10
.fixme/log
10
.fixme/log
|
@ -1,10 +0,0 @@
|
||||||
|
|
||||||
(fixme-set "workflow" "done" "5Rn4qwCmWd")
|
|
||||||
fixme-del "AQYdvyfrPy"
|
|
||||||
(fixme-set "workflow" "done" "32P6FXjedw")
|
|
||||||
(fixme-set "workflow" "done" "CSVYUCZ7Hu")
|
|
||||||
(fixme-set "workflow" "done" "Ai8NMj6ZXQ")
|
|
||||||
fixme-del "2qam3QnVud"
|
|
||||||
fixme-del "J5dnL6WVGg"
|
|
||||||
fixme-del "FNtMbvYPvu"
|
|
||||||
(fixme-set "workflow" "backlog" "6TdZivbctK")
|
|
|
@ -159,13 +159,13 @@ runMessagingUnix env = do
|
||||||
forever do
|
forever do
|
||||||
(so, sa) <- liftIO $ accept sock
|
(so, sa) <- liftIO $ accept sock
|
||||||
|
|
||||||
peerNum <- atomically $ do
|
|
||||||
n <- readTVar (msgUnixAccepts env)
|
|
||||||
modifyTVar (msgUnixAccepts env) succ
|
|
||||||
pure n
|
|
||||||
|
|
||||||
withSession $ flip runContT void do
|
withSession $ flip runContT void do
|
||||||
|
|
||||||
|
peerNum <- atomically $ do
|
||||||
|
n <- readTVar (msgUnixAccepts env)
|
||||||
|
modifyTVar (msgUnixAccepts env) succ
|
||||||
|
pure n
|
||||||
|
|
||||||
seen <- getTimeCoarse >>= newTVarIO
|
seen <- getTimeCoarse >>= newTVarIO
|
||||||
|
|
||||||
let that = if doFork then
|
let that = if doFork then
|
||||||
|
@ -173,13 +173,12 @@ runMessagingUnix env = do
|
||||||
else
|
else
|
||||||
msgUnixSelf env
|
msgUnixSelf env
|
||||||
|
|
||||||
let writer = liftIO $ async $ forever do
|
let writer = liftIO $ async $ pause @'Seconds 0.001 >> forever do
|
||||||
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
|
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
|
||||||
|
|
||||||
maybe1 mq none $ \q -> do
|
maybe1 mq none $ \q -> do
|
||||||
msg <- liftIO . atomically $ readTQueue q
|
msg <- liftIO . atomically $ readTQueue q
|
||||||
|
|
||||||
|
|
||||||
let len = fromIntegral $ LBS.length msg :: Int
|
let len = fromIntegral $ LBS.length msg :: Int
|
||||||
let bs = bytestring32 (fromIntegral len)
|
let bs = bytestring32 (fromIntegral len)
|
||||||
|
|
||||||
|
@ -189,15 +188,15 @@ runMessagingUnix env = do
|
||||||
|
|
||||||
liftIO $ SL.sendAll so msg
|
liftIO $ SL.sendAll so msg
|
||||||
|
|
||||||
void $ ContT $ bracket (createQueues env that) dropQueuesFor
|
|
||||||
|
|
||||||
void $ ContT $ bracket writer cancel
|
|
||||||
|
|
||||||
void $ ContT $ bracket ( pure so ) closeSock
|
void $ ContT $ bracket ( pure so ) closeSock
|
||||||
|
|
||||||
|
void $ ContT $ bracket (createQueues env that) dropQueuesFor
|
||||||
|
|
||||||
void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that )
|
void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that )
|
||||||
( \_ -> debug $ "Client thread finished" <+> pretty that )
|
( \_ -> debug $ "Client thread finished" <+> pretty that )
|
||||||
|
|
||||||
|
void $ ContT $ bracket writer cancel
|
||||||
|
|
||||||
fix \next -> do
|
fix \next -> do
|
||||||
|
|
||||||
let mq = Just (msgUnixRecv env)
|
let mq = Just (msgUnixRecv env)
|
||||||
|
@ -208,11 +207,10 @@ runMessagingUnix env = do
|
||||||
-- debug $ "frameLen" <+> pretty frameLen
|
-- debug $ "frameLen" <+> pretty frameLen
|
||||||
|
|
||||||
if frameLen == 0 then do
|
if frameLen == 0 then do
|
||||||
-- answer to empty message
|
-- answer to watchdog message
|
||||||
liftIO $ sendAll so $ bytestring32 0
|
liftIO $ sendAll so $ bytestring32 0
|
||||||
else do
|
else do
|
||||||
frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict
|
frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict
|
||||||
|
|
||||||
maybe1 mq none $ \q -> do
|
maybe1 mq none $ \q -> do
|
||||||
atomically $ writeTQueue q (From that, frame)
|
atomically $ writeTQueue q (From that, frame)
|
||||||
|
|
||||||
|
@ -234,13 +232,14 @@ runMessagingUnix env = do
|
||||||
let who = PeerUNIX p
|
let who = PeerUNIX p
|
||||||
tseen <- getTimeCoarse >>= newTVarIO
|
tseen <- getTimeCoarse >>= newTVarIO
|
||||||
|
|
||||||
void $ ContT $ bracket (createQueues env who) dropQueuesFor
|
|
||||||
|
|
||||||
let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol
|
let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol
|
||||||
let closeSock = close
|
let closeSock = close
|
||||||
|
|
||||||
sock <- ContT $ bracket openSock closeSock
|
sock <- ContT $ bracket openSock closeSock
|
||||||
|
|
||||||
|
void $ ContT $ bracket (createQueues env who) dropQueuesFor
|
||||||
|
|
||||||
|
|
||||||
let attemptConnect = do
|
let attemptConnect = do
|
||||||
result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env)
|
result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env)
|
||||||
case result of
|
case result of
|
||||||
|
@ -262,11 +261,12 @@ runMessagingUnix env = do
|
||||||
|
|
||||||
getTimeCoarse >>= (atomically . writeTVar tseen)
|
getTimeCoarse >>= (atomically . writeTVar tseen)
|
||||||
|
|
||||||
when (frameLen > 0) do
|
frame <- liftIO $ readFromSocket sock frameLen
|
||||||
frame <- liftIO $ readFromSocket sock frameLen
|
|
||||||
|
-- when (frameLen > 0) do
|
||||||
-- сообщения кому? **МНЕ**
|
-- сообщения кому? **МНЕ**
|
||||||
-- сообщения от кого? от **КОГО-ТО**
|
-- сообщения от кого? от **КОГО-ТО**
|
||||||
atomically $ writeTQueue q (From who, frame)
|
atomically $ writeTQueue q (From who, frame)
|
||||||
|
|
||||||
watchdog <- ContT $ liftIO . withAsync do
|
watchdog <- ContT $ liftIO . withAsync do
|
||||||
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
|
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
|
||||||
|
@ -274,7 +274,7 @@ runMessagingUnix env = do
|
||||||
Nothing -> forever (pause @'Seconds 600)
|
Nothing -> forever (pause @'Seconds 600)
|
||||||
Just n -> forever do
|
Just n -> forever do
|
||||||
|
|
||||||
sendTo env (To who) (From who) (mempty :: ByteString)
|
pause (TimeoutSec (realToFrac n))
|
||||||
|
|
||||||
now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
seen <- readTVarIO tseen
|
seen <- readTVarIO tseen
|
||||||
|
@ -287,9 +287,22 @@ runMessagingUnix env = do
|
||||||
trace "watchdog fired!"
|
trace "watchdog fired!"
|
||||||
throwIO ReadTimeoutException
|
throwIO ReadTimeoutException
|
||||||
|
|
||||||
pause (TimeoutSec (max 1 (realToFrac n / 2)))
|
|
||||||
|
|
||||||
writer <- ContT $ liftIO . withAsync do
|
writer <- ContT $ liftIO . withAsync do
|
||||||
|
|
||||||
|
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
|
||||||
|
|
||||||
|
let withWD m = case mwd of
|
||||||
|
Nothing -> m
|
||||||
|
Just n -> do
|
||||||
|
let nwait = max 1 (realToFrac n * 0.7)
|
||||||
|
e <- race (pause (TimeoutSec nwait)) m
|
||||||
|
case e of
|
||||||
|
Right{} -> pure ()
|
||||||
|
Left{} -> do
|
||||||
|
liftIO $ sendAll sock $ bytestring32 0
|
||||||
|
-- liftIO $ SL.sendAll sock ""
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
|
|
||||||
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
|
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
|
||||||
|
@ -298,10 +311,12 @@ runMessagingUnix env = do
|
||||||
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
|
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
|
||||||
|
|
||||||
maybe1 mq none $ \q -> do
|
maybe1 mq none $ \q -> do
|
||||||
msg <- liftIO . atomically $ readTQueue q
|
-- если WD установлен, то просыпаемся, скажем, wd/2 и
|
||||||
let len = fromIntegral $ LBS.length msg :: Int
|
-- шлём пустую строку серверу
|
||||||
liftIO $ sendAll sock $ bytestring32 (fromIntegral len)
|
withWD do
|
||||||
liftIO $ SL.sendAll sock msg
|
msg <- liftIO $ atomically $ readTQueue q
|
||||||
|
let len = fromIntegral $ LBS.length msg :: Int
|
||||||
|
liftIO $ SL.sendAll sock $ ( LBS.fromStrict (bytestring32 (fromIntegral len)) <> msg)
|
||||||
|
|
||||||
r <- waitAnyCatchCancel [reader, writer, watchdog]
|
r <- waitAnyCatchCancel [reader, writer, watchdog]
|
||||||
|
|
||||||
|
@ -362,7 +377,7 @@ instance Messaging MessagingUnix UNIX ByteString where
|
||||||
|
|
||||||
sendTo bus (To who) (From me) msg = liftIO do
|
sendTo bus (To who) (From me) msg = liftIO do
|
||||||
|
|
||||||
createQueues bus who
|
-- createQueues bus who
|
||||||
|
|
||||||
-- FIXME: handle-no-queue-for-rcpt-situation-1
|
-- FIXME: handle-no-queue-for-rcpt-situation-1
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,9 @@ import Data.List qualified as List
|
||||||
import Data.Word
|
import Data.Word
|
||||||
import Control.Concurrent.STM (flushTQueue)
|
import Control.Concurrent.STM (flushTQueue)
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
|
import Data.Either
|
||||||
import UnliftIO
|
import UnliftIO
|
||||||
|
import System.IO (hPrint)
|
||||||
|
|
||||||
|
|
||||||
instance (HasProtocol UNIX (NotifyProto ev0 UNIX)) => HasTimeLimits UNIX (NotifyProto ev0 UNIX) IO where
|
instance (HasProtocol UNIX (NotifyProto ev0 UNIX)) => HasTimeLimits UNIX (NotifyProto ev0 UNIX) IO where
|
||||||
|
@ -59,7 +61,9 @@ class (ForNotify ev) => NotifySource ev source where
|
||||||
-- Или нет? Наверное, да, так как для каждого типа
|
-- Или нет? Наверное, да, так как для каждого типа
|
||||||
-- эвента свой какой-то код их генерирует
|
-- эвента свой какой-то код их генерирует
|
||||||
data NotifyProto ev e =
|
data NotifyProto ev e =
|
||||||
NotifyWant Word64 (NotifyKey ev)
|
NotifyPing
|
||||||
|
| NotifyPong
|
||||||
|
| NotifyWant Word64 (NotifyKey ev)
|
||||||
| NotifyGiven Word64 NotifyHandle
|
| NotifyGiven Word64 NotifyHandle
|
||||||
| NotifyAlive NotifyHandle
|
| NotifyAlive NotifyHandle
|
||||||
| Notify NotifyHandle (NotifyEvent ev)
|
| Notify NotifyHandle (NotifyEvent ev)
|
||||||
|
@ -103,6 +107,7 @@ newNotifyEnvServer src = NotifyEnv src <$> newTVarIO mempty
|
||||||
makeNotifyServer :: forall ev src e m . ( MonadIO m
|
makeNotifyServer :: forall ev src e m . ( MonadIO m
|
||||||
, Response e (NotifyProto ev e) m
|
, Response e (NotifyProto ev e) m
|
||||||
, NotifySource ev src
|
, NotifySource ev src
|
||||||
|
, HasDeferred e (NotifyProto ev e) m
|
||||||
, Pretty (Peer e)
|
, Pretty (Peer e)
|
||||||
)
|
)
|
||||||
=> NotifyEnv ev src e
|
=> NotifyEnv ev src e
|
||||||
|
@ -111,10 +116,12 @@ makeNotifyServer :: forall ev src e m . ( MonadIO m
|
||||||
|
|
||||||
makeNotifyServer (NotifyEnv{..}) what = do
|
makeNotifyServer (NotifyEnv{..}) what = do
|
||||||
|
|
||||||
case what of
|
let proxy = Proxy @(NotifyProto ev e)
|
||||||
NotifyWant rn key -> do
|
|
||||||
|
|
||||||
trace "SERVER: NotifyWant"
|
case what of
|
||||||
|
NotifyWant rn key -> deferred proxy do
|
||||||
|
|
||||||
|
debug "SERVER: NotifyWant"
|
||||||
|
|
||||||
who <- thatPeer (Proxy @(NotifyProto ev e))
|
who <- thatPeer (Proxy @(NotifyProto ev e))
|
||||||
|
|
||||||
|
@ -127,6 +134,7 @@ makeNotifyServer (NotifyEnv{..}) what = do
|
||||||
modifyTVar notifyAlive (HashMap.insert hndl now)
|
modifyTVar notifyAlive (HashMap.insert hndl now)
|
||||||
modifyTVar notifyWho (HashMap.insert hndl who)
|
modifyTVar notifyWho (HashMap.insert hndl who)
|
||||||
|
|
||||||
|
debug $ "SEND GIVEN TO" <+> viaShow hndl <+> pretty who
|
||||||
response (NotifyGiven @ev @e rn hndl)
|
response (NotifyGiven @ev @e rn hndl)
|
||||||
|
|
||||||
NotifyBye ha -> do
|
NotifyBye ha -> do
|
||||||
|
@ -139,12 +147,16 @@ makeNotifyServer (NotifyEnv{..}) what = do
|
||||||
|
|
||||||
NotifyError{} -> pure ()
|
NotifyError{} -> pure ()
|
||||||
|
|
||||||
|
NotifyPing{} -> do
|
||||||
|
response (NotifyPong @ev @e)
|
||||||
|
|
||||||
_ -> response (NotifyError @ev @e NotifyErrUnexpected)
|
_ -> response (NotifyError @ev @e NotifyErrUnexpected)
|
||||||
|
|
||||||
|
|
||||||
runNotifyWorkerServer :: forall ev src e m . ( Request e (NotifyProto ev e) m
|
runNotifyWorkerServer :: forall ev src e m . ( Request e (NotifyProto ev e) m
|
||||||
, ForNotify ev
|
, ForNotify ev
|
||||||
, NotifySource ev src
|
, NotifySource ev src
|
||||||
|
, Pretty (Peer e)
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
)
|
)
|
||||||
=> NotifyEnv ev src e
|
=> NotifyEnv ev src e
|
||||||
|
@ -188,6 +200,7 @@ runNotifyWorkerServer env = do
|
||||||
work <- async $ forever do
|
work <- async $ forever do
|
||||||
-- TODO: several-threads
|
-- TODO: several-threads
|
||||||
(ha, pip, ev) <- atomically $ readTQueue (notifyQ env)
|
(ha, pip, ev) <- atomically $ readTQueue (notifyQ env)
|
||||||
|
debug $ "SENT-NOTIFY-TO" <+> pretty pip
|
||||||
request pip (Notify @ev @e ha ev)
|
request pip (Notify @ev @e ha ev)
|
||||||
|
|
||||||
mapM_ link [cleanup, work]
|
mapM_ link [cleanup, work]
|
||||||
|
@ -199,10 +212,13 @@ data NotifySinkTask ev e =
|
||||||
NotifySinkSubscribe Word64 (NotifyKey ev) (TQueue NotifyHandle)
|
NotifySinkSubscribe Word64 (NotifyKey ev) (TQueue NotifyHandle)
|
||||||
| NotifySinkAlive NotifyHandle
|
| NotifySinkAlive NotifyHandle
|
||||||
| NotifySinkBye NotifyHandle
|
| NotifySinkBye NotifyHandle
|
||||||
|
| NotifySinkPing
|
||||||
|
|
||||||
data NotifySink ev e =
|
data NotifySink ev e =
|
||||||
NotifySink
|
NotifySink
|
||||||
{ sinkPipeline :: TQueue (NotifySinkTask ev e)
|
{ sinkPipeline :: TQueue (NotifySinkTask ev e)
|
||||||
|
, sinkPong :: TQueue ()
|
||||||
|
, sinkPongNum :: TVar Int
|
||||||
, sinkNotify :: TVar (HashMap NotifyHandle (TQueue (Maybe (NotifyData ev))))
|
, sinkNotify :: TVar (HashMap NotifyHandle (TQueue (Maybe (NotifyData ev))))
|
||||||
, sinkWaiters :: TVar (HashMap Word64 (TQueue NotifyHandle))
|
, sinkWaiters :: TVar (HashMap Word64 (TQueue NotifyHandle))
|
||||||
, sinkRnum :: TVar Word64
|
, sinkRnum :: TVar Word64
|
||||||
|
@ -210,6 +226,8 @@ data NotifySink ev e =
|
||||||
|
|
||||||
newNotifySink :: MonadIO m => m (NotifySink ev e)
|
newNotifySink :: MonadIO m => m (NotifySink ev e)
|
||||||
newNotifySink = NotifySink <$> newTQueueIO
|
newNotifySink = NotifySink <$> newTQueueIO
|
||||||
|
<*> newTQueueIO
|
||||||
|
<*> newTVarIO 0
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
<*> newTVarIO 1
|
<*> newTVarIO 1
|
||||||
|
@ -226,23 +244,39 @@ runNotifyWorkerClient :: forall ev e m . ( MonadUnliftIO m
|
||||||
runNotifyWorkerClient sink = do
|
runNotifyWorkerClient sink = do
|
||||||
let waiters = sinkWaiters sink
|
let waiters = sinkWaiters sink
|
||||||
pip <- ownPeer
|
pip <- ownPeer
|
||||||
|
|
||||||
|
atomically $ writeTVar (sinkPongNum sink) 0
|
||||||
|
|
||||||
|
fix \next -> do
|
||||||
|
request @e pip (NotifyPing @ev @e)
|
||||||
|
what <- race (pause @'Seconds 2) do
|
||||||
|
debug "runNotifyWorkerClient.sendPing"
|
||||||
|
atomically $ readTQueue (sinkPong sink)
|
||||||
|
either (const next) (const none) what
|
||||||
|
atomically $ modifyTVar (sinkPongNum sink) succ
|
||||||
|
|
||||||
|
debug "runNotifyWorkerClient.run"
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
atomically (readTQueue (sinkPipeline sink)) >>= \case
|
atomically (readTQueue (sinkPipeline sink)) >>= \case
|
||||||
|
|
||||||
NotifySinkSubscribe r k w -> do
|
NotifySinkSubscribe r k w -> do
|
||||||
atomically $ modifyTVar waiters (HashMap.insert r w)
|
atomically $ modifyTVar waiters (HashMap.insert r w)
|
||||||
|
|
||||||
void $ asyncLinked $ do
|
void $ asyncLinked $ void $ try @_ @SomeException $ do
|
||||||
-- если ничего не произошло, через минуту удаляем
|
-- если ничего не произошло, через минуту удаляем
|
||||||
pause @'Seconds 60
|
pause @'Seconds 60
|
||||||
atomically $ modifyTVar waiters (HashMap.delete r)
|
atomically $ modifyTVar waiters (HashMap.delete r)
|
||||||
|
|
||||||
trace $ "CLIENT:" <+> "NotifySinkSubscribe"
|
trace $ "CLIENT:" <+> "SEND NotifySinkSubscribe" <+> pretty r
|
||||||
request @e pip (NotifyWant @ev @e r k)
|
request @e pip (NotifyWant @ev @e r k)
|
||||||
|
|
||||||
NotifySinkAlive h ->
|
NotifySinkAlive h ->
|
||||||
request @e pip (NotifyAlive @ev @e h)
|
request @e pip (NotifyAlive @ev @e h)
|
||||||
|
|
||||||
|
NotifySinkPing -> do
|
||||||
|
request @e pip (NotifyPing @ev @e)
|
||||||
|
|
||||||
NotifySinkBye h -> do
|
NotifySinkBye h -> do
|
||||||
trace $ "CLIENT:" <+> "NotifySinkBye" <+> viaShow h
|
trace $ "CLIENT:" <+> "NotifySinkBye" <+> viaShow h
|
||||||
request @e pip (NotifyBye @ev @e h)
|
request @e pip (NotifyBye @ev @e h)
|
||||||
|
@ -265,12 +299,17 @@ makeNotifyClient sink what = do
|
||||||
|
|
||||||
case what of
|
case what of
|
||||||
Notify ha (NotifyEvent _ kd) -> do
|
Notify ha (NotifyEvent _ kd) -> do
|
||||||
-- debug $ "CLIENT: GOT NOTIFY!" <+> pretty ha
|
trace $ "CLIENT: GOT NOTIFY!" <+> pretty ha
|
||||||
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
|
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
|
||||||
|
|
||||||
forM_ mq $ \q -> do
|
forM_ mq $ \q -> do
|
||||||
r <- try @_ @SomeException $ atomically $ writeTQueue q (Just kd)
|
r <- try @_ @SomeException $ atomically $ writeTQueue q (Just kd)
|
||||||
|
|
||||||
|
when (isLeft r) do
|
||||||
|
debug "LEFT writeTQueue"
|
||||||
|
|
||||||
let unsbscribe _ = do
|
let unsbscribe _ = do
|
||||||
|
debug "UNSUBSCRIBE!"
|
||||||
-- на том конце очередь сдохла? удаляем
|
-- на том конце очередь сдохла? удаляем
|
||||||
request @e pip (NotifyBye @ev @e ha)
|
request @e pip (NotifyBye @ev @e ha)
|
||||||
atomically (modifyTVar (sinkNotify sink) (HashMap.delete ha))
|
atomically (modifyTVar (sinkNotify sink) (HashMap.delete ha))
|
||||||
|
@ -279,13 +318,15 @@ makeNotifyClient sink what = do
|
||||||
either unsbscribe (const none) r
|
either unsbscribe (const none) r
|
||||||
|
|
||||||
NotifyGiven rn ha -> do
|
NotifyGiven rn ha -> do
|
||||||
|
trace $ "CLIENT: GOT NOTIFY GIVEN!" <+> pretty ha
|
||||||
waiter <- atomically $ do
|
waiter <- atomically $ do
|
||||||
w <- readTVar waiters <&> HashMap.lookup rn
|
w <- readTVar waiters <&> HashMap.lookup rn
|
||||||
modifyTVar waiters (HashMap.delete rn)
|
modifyTVar waiters (HashMap.delete rn)
|
||||||
pure w
|
pure w
|
||||||
|
|
||||||
forM_ waiter $ \wa -> do
|
forM_ waiter $ \wa -> do
|
||||||
void $ try @_ @SomeException $ atomically $ writeTQueue wa ha
|
r <- try @_ @SomeException $ atomically $ writeTQueue wa ha
|
||||||
|
debug $ "NOTIFY CLIENT SUBSCRIBED" <+> viaShow rn
|
||||||
|
|
||||||
NotifyBye ha -> do
|
NotifyBye ha -> do
|
||||||
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
|
mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha
|
||||||
|
@ -294,6 +335,9 @@ makeNotifyClient sink what = do
|
||||||
|
|
||||||
void $ atomically $ modifyTVar (sinkNotify sink) $ HashMap.delete ha
|
void $ atomically $ modifyTVar (sinkNotify sink) $ HashMap.delete ha
|
||||||
|
|
||||||
|
NotifyPong{} -> do
|
||||||
|
void $ atomically $ writeTQueue (sinkPong sink) ()
|
||||||
|
|
||||||
NotifyError e -> do
|
NotifyError e -> do
|
||||||
err $ "*** makeNotifyClient:" <+> viaShow e
|
err $ "*** makeNotifyClient:" <+> viaShow e
|
||||||
|
|
||||||
|
@ -312,25 +356,42 @@ runNotifySink :: forall ev e m . MonadUnliftIO m
|
||||||
|
|
||||||
runNotifySink sink k action = do
|
runNotifySink sink k action = do
|
||||||
|
|
||||||
my <- nextRNum sink
|
fix \next -> do
|
||||||
|
snum <- readTVarIO (sinkPongNum sink)
|
||||||
|
if snum > 0 then
|
||||||
|
none
|
||||||
|
else do
|
||||||
|
pause @'Seconds 0.5
|
||||||
|
next
|
||||||
|
|
||||||
answ <- newTQueueIO
|
ha <- fix \next -> do
|
||||||
|
|
||||||
debug "runNotifySink 1"
|
r <- race (pause @'Seconds 1) do
|
||||||
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ)
|
my <- nextRNum sink
|
||||||
|
|
||||||
-- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь
|
answ <- newTQueueIO
|
||||||
ha <- atomically $ do
|
|
||||||
r <- readTQueue answ
|
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ)
|
||||||
flushTQueue answ
|
|
||||||
pure r
|
-- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь
|
||||||
|
atomically $ do
|
||||||
|
r <- readTQueue answ
|
||||||
|
flushTQueue answ
|
||||||
|
pure r
|
||||||
|
|
||||||
|
case r of
|
||||||
|
Right x -> pure x
|
||||||
|
Left{} -> do
|
||||||
|
debug "Retry subscribing..."
|
||||||
|
pause @'Seconds 1
|
||||||
|
next
|
||||||
|
|
||||||
myQ <- newTQueueIO
|
myQ <- newTQueueIO
|
||||||
atomically $ modifyTVar (sinkNotify sink) (HashMap.insert ha myQ)
|
atomically $ modifyTVar (sinkNotify sink) (HashMap.insert ha myQ)
|
||||||
|
|
||||||
w <- async $ forever do
|
w <- async $ forever do
|
||||||
pause @'Seconds 30
|
|
||||||
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkAlive ha)
|
atomically $ writeTQueue (sinkPipeline sink) (NotifySinkAlive ha)
|
||||||
|
pause @'Seconds 30
|
||||||
|
|
||||||
-- NOTE: run-notify-sink-cleanup
|
-- NOTE: run-notify-sink-cleanup
|
||||||
-- если нас пристрелили --- попрощаться с NotifySink хотя бы
|
-- если нас пристрелили --- попрощаться с NotifySink хотя бы
|
||||||
|
@ -366,6 +427,7 @@ instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where
|
||||||
|
|
||||||
startNotify src key fn = do
|
startNotify src key fn = do
|
||||||
ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s)
|
ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s)
|
||||||
|
debug $ "Start notify!"
|
||||||
atomically $ modifyTVar (listeners src) (HashMap.insertWith (<>) key [(ha, SomeCallback @ev fn)])
|
atomically $ modifyTVar (listeners src) (HashMap.insertWith (<>) key [(ha, SomeCallback @ev fn)])
|
||||||
pure ha
|
pure ha
|
||||||
|
|
||||||
|
|
|
@ -832,7 +832,9 @@ refChanNotifyProto self adapter msg@(Notify rchan box) = do
|
||||||
-- теперь пересылаем по госсипу
|
-- теперь пересылаем по госсипу
|
||||||
lift $ gossip msg
|
lift $ gossip msg
|
||||||
|
|
||||||
debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0
|
-- FIXME: remove-debug
|
||||||
|
let h1 = hashObject @HbSync (serialise box)
|
||||||
|
debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 <+> pretty h1
|
||||||
|
|
||||||
-- тут надо заслать во внешнее приложение,
|
-- тут надо заслать во внешнее приложение,
|
||||||
-- равно как и в остальных refchan-протоколах
|
-- равно как и в остальных refchan-протоколах
|
||||||
|
|
|
@ -228,17 +228,18 @@ runSync = do
|
||||||
|
|
||||||
lift $ syncRepo entry
|
lift $ syncRepo entry
|
||||||
|
|
||||||
fix \next -> do
|
|
||||||
|
|
||||||
rr' <- liftIO $ race (pause @'Seconds 1) do
|
fix \next -> do
|
||||||
callService @RpcRefLogGet refLogRPC rk
|
|
||||||
<&> fromRight Nothing
|
|
||||||
|
|
||||||
void $ liftIO $ race (pause @'Seconds 60) (atomically (peekTQueue upd))
|
void $ liftIO $ race (pause @'Seconds 60) (atomically (peekTQueue upd))
|
||||||
pause @'Seconds 5
|
pause @'Seconds 5
|
||||||
liftIO $ atomically $ flushTQueue upd
|
liftIO $ atomically $ flushTQueue upd
|
||||||
|
|
||||||
rr <- either (const $ pause @'Seconds 10 >> warn "rpc call timeout" >> next) pure rr'
|
rr' <- liftIO $ race (pause @'Seconds 1) do
|
||||||
|
callService @RpcRefLogGet refLogRPC rk
|
||||||
|
<&> fromRight Nothing
|
||||||
|
|
||||||
|
rr <- either (const $ pause @'Seconds 1 >> warn "rpc call timeout" >> next) pure rr'
|
||||||
|
|
||||||
debug $ "REFLOG VALUE:" <+> pretty rr
|
debug $ "REFLOG VALUE:" <+> pretty rr
|
||||||
|
|
||||||
|
@ -250,7 +251,7 @@ runSync = do
|
||||||
lift (syncRepo entry) >>= \case
|
lift (syncRepo entry) >>= \case
|
||||||
Left{} -> do
|
Left{} -> do
|
||||||
debug $ "Failed to update:" <+> pretty (repoPath entry)
|
debug $ "Failed to update:" <+> pretty (repoPath entry)
|
||||||
pause @'Seconds 1
|
pause @'Seconds 5
|
||||||
again
|
again
|
||||||
|
|
||||||
Right{} -> do
|
Right{} -> do
|
||||||
|
|
|
@ -32,6 +32,7 @@ import HBS2.Net.Proto.RefLog
|
||||||
import HBS2.Net.Proto.RefChan
|
import HBS2.Net.Proto.RefChan
|
||||||
import HBS2.Net.Proto.Sessions
|
import HBS2.Net.Proto.Sessions
|
||||||
import HBS2.Net.Proto.Service
|
import HBS2.Net.Proto.Service
|
||||||
|
import HBS2.Net.Proto.Notify (NotifyProto)
|
||||||
import HBS2.OrDie
|
import HBS2.OrDie
|
||||||
import HBS2.Storage.Simple
|
import HBS2.Storage.Simple
|
||||||
import HBS2.Data.Detect
|
import HBS2.Data.Detect
|
||||||
|
@ -548,6 +549,8 @@ instance ( Monad m
|
||||||
|
|
||||||
response = lift . response
|
response = lift . response
|
||||||
|
|
||||||
|
instance (MonadUnliftIO m, HasProtocol UNIX (NotifyProto ev e)) => HasDeferred UNIX (NotifyProto ev e) m where
|
||||||
|
deferred _ m = void $ async m
|
||||||
|
|
||||||
respawn :: PeerOpts -> IO ()
|
respawn :: PeerOpts -> IO ()
|
||||||
respawn opts =
|
respawn opts =
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
module Main where
|
module Main where
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
import HBS2.Prelude.Plated
|
||||||
import HBS2.Clock
|
import HBS2.Clock hiding (sec)
|
||||||
import HBS2.Net.Proto
|
import HBS2.Net.Proto
|
||||||
import HBS2.Net.Messaging.Unix
|
import HBS2.Net.Messaging.Unix
|
||||||
import HBS2.Net.Proto.Notify
|
import HBS2.Net.Proto.Notify
|
||||||
|
@ -38,12 +38,19 @@ instance HasProtocol UNIX (NotifyProto Tick UNIX) where
|
||||||
decode = either (const Nothing) Just . deserialiseOrFail
|
decode = either (const Nothing) Just . deserialiseOrFail
|
||||||
encode = serialise
|
encode = serialise
|
||||||
|
|
||||||
|
instance (MonadUnliftIO m, HasProtocol UNIX (NotifyProto ev e)) => HasDeferred UNIX (NotifyProto ev e) m where
|
||||||
|
deferred _ m = void $ async m
|
||||||
|
|
||||||
|
data WhatTick = Odd | Even
|
||||||
|
deriving stock (Generic,Eq)
|
||||||
|
|
||||||
|
instance Hashable WhatTick
|
||||||
|
instance Serialise WhatTick
|
||||||
|
|
||||||
newtype instance NotifyKey Tick =
|
newtype instance NotifyKey Tick =
|
||||||
TickNotifyKey ()
|
TickNotifyKey WhatTick
|
||||||
deriving (Generic,Eq)
|
deriving (Generic)
|
||||||
deriving newtype Hashable
|
deriving newtype (Hashable,Eq)
|
||||||
|
|
||||||
newtype instance NotifyData Tick =
|
newtype instance NotifyData Tick =
|
||||||
TickNotifyData Int
|
TickNotifyData Int
|
||||||
|
@ -88,7 +95,10 @@ main = do
|
||||||
sec <- newTVarIO 0
|
sec <- newTVarIO 0
|
||||||
forever do
|
forever do
|
||||||
sn <- atomically $ stateTVar sec (\s -> (s, succ s))
|
sn <- atomically $ stateTVar sec (\s -> (s, succ s))
|
||||||
emitNotify src (TickNotifyKey (), TickNotifyData sn)
|
if even sn then do
|
||||||
|
emitNotify src (TickNotifyKey Even, TickNotifyData sn)
|
||||||
|
else
|
||||||
|
emitNotify src (TickNotifyKey Odd, TickNotifyData sn)
|
||||||
debug "SERVER: TICK!"
|
debug "SERVER: TICK!"
|
||||||
pause @'Seconds 1
|
pause @'Seconds 1
|
||||||
|
|
||||||
|
@ -110,17 +120,17 @@ main = do
|
||||||
[ makeResponse (makeNotifyClient @Tick sink)
|
[ makeResponse (makeNotifyClient @Tick sink)
|
||||||
]
|
]
|
||||||
|
|
||||||
s1 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do
|
s1 <- asyncLinked $ runNotifySink sink (TickNotifyKey Even) $ \(TickNotifyData td) -> do
|
||||||
debug $ "CLIENT1:" <+> viaShow td
|
debug $ "CLIENT1:" <+> viaShow td
|
||||||
|
|
||||||
s2 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do
|
s2 <- asyncLinked $ runNotifySink sink (TickNotifyKey Odd) $ \(TickNotifyData td) -> do
|
||||||
debug $ "CLIENT2:" <+> viaShow td
|
debug $ "CLIENT2:" <+> viaShow td
|
||||||
|
|
||||||
s3 <- async $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do
|
s3 <- async $ runNotifySink sink (TickNotifyKey Odd) $ \(TickNotifyData td) -> do
|
||||||
debug $ "CLIENT3:" <+> viaShow td
|
debug $ "CLIENT3:" <+> viaShow td
|
||||||
|
|
||||||
void $ async do
|
void $ async do
|
||||||
pause @'Seconds 3
|
pause @'Seconds 10
|
||||||
cancelWith s3 (toException (userError "Fuck you!"))
|
cancelWith s3 (toException (userError "Fuck you!"))
|
||||||
|
|
||||||
void $ waitAnyCatchCancel [p1,p2,m1,m2,s1,s2]
|
void $ waitAnyCatchCancel [p1,p2,m1,m2,s1,s2]
|
||||||
|
|
Loading…
Reference in New Issue