From 002ecf7b3ebcc80400af93ea7e1c84ed5b8c8346 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Mon, 6 Nov 2023 08:56:10 +0300 Subject: [PATCH] merged notify-proto-debug fixes --- .fixme/log | 10 --- hbs2-core/lib/HBS2/Net/Messaging/Unix.hs | 65 ++++++++++------ hbs2-core/lib/HBS2/Net/Proto/Notify.hs | 98 +++++++++++++++++++----- hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 4 +- hbs2-git/reposync/ReposyncMain.hs | 13 ++-- hbs2-peer/app/PeerMain.hs | 3 + hbs2-tests/test/notify-unix/Main.hs | 28 ++++--- 7 files changed, 152 insertions(+), 69 deletions(-) diff --git a/.fixme/log b/.fixme/log index cc305eeb..e69de29b 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,10 +0,0 @@ - -(fixme-set "workflow" "done" "5Rn4qwCmWd") -fixme-del "AQYdvyfrPy" -(fixme-set "workflow" "done" "32P6FXjedw") -(fixme-set "workflow" "done" "CSVYUCZ7Hu") -(fixme-set "workflow" "done" "Ai8NMj6ZXQ") -fixme-del "2qam3QnVud" -fixme-del "J5dnL6WVGg" -fixme-del "FNtMbvYPvu" -(fixme-set "workflow" "backlog" "6TdZivbctK") \ No newline at end of file diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index e4efca5e..08d30384 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -159,13 +159,13 @@ runMessagingUnix env = do forever do (so, sa) <- liftIO $ accept sock - peerNum <- atomically $ do - n <- readTVar (msgUnixAccepts env) - modifyTVar (msgUnixAccepts env) succ - pure n - withSession $ flip runContT void do + peerNum <- atomically $ do + n <- readTVar (msgUnixAccepts env) + modifyTVar (msgUnixAccepts env) succ + pure n + seen <- getTimeCoarse >>= newTVarIO let that = if doFork then @@ -173,13 +173,12 @@ runMessagingUnix env = do else msgUnixSelf env - let writer = liftIO $ async $ forever do + let writer = liftIO $ async $ pause @'Seconds 0.001 >> forever do mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that maybe1 mq none $ \q -> do msg <- liftIO . atomically $ readTQueue q - let len = fromIntegral $ LBS.length msg :: Int let bs = bytestring32 (fromIntegral len) @@ -189,15 +188,15 @@ runMessagingUnix env = do liftIO $ SL.sendAll so msg - void $ ContT $ bracket (createQueues env that) dropQueuesFor - - void $ ContT $ bracket writer cancel - void $ ContT $ bracket ( pure so ) closeSock + void $ ContT $ bracket (createQueues env that) dropQueuesFor + void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that ) ( \_ -> debug $ "Client thread finished" <+> pretty that ) + void $ ContT $ bracket writer cancel + fix \next -> do let mq = Just (msgUnixRecv env) @@ -208,11 +207,10 @@ runMessagingUnix env = do -- debug $ "frameLen" <+> pretty frameLen if frameLen == 0 then do - -- answer to empty message + -- answer to watchdog message liftIO $ sendAll so $ bytestring32 0 else do frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict - maybe1 mq none $ \q -> do atomically $ writeTQueue q (From that, frame) @@ -234,13 +232,14 @@ runMessagingUnix env = do let who = PeerUNIX p tseen <- getTimeCoarse >>= newTVarIO - void $ ContT $ bracket (createQueues env who) dropQueuesFor - let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol let closeSock = close sock <- ContT $ bracket openSock closeSock + void $ ContT $ bracket (createQueues env who) dropQueuesFor + + let attemptConnect = do result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env) case result of @@ -262,11 +261,12 @@ runMessagingUnix env = do getTimeCoarse >>= (atomically . writeTVar tseen) - when (frameLen > 0) do - frame <- liftIO $ readFromSocket sock frameLen + frame <- liftIO $ readFromSocket sock frameLen + + -- when (frameLen > 0) do -- сообщения кому? **МНЕ** -- сообщения от кого? от **КОГО-ТО** - atomically $ writeTQueue q (From who, frame) + atomically $ writeTQueue q (From who, frame) watchdog <- ContT $ liftIO . withAsync do let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] @@ -274,7 +274,7 @@ runMessagingUnix env = do Nothing -> forever (pause @'Seconds 600) Just n -> forever do - sendTo env (To who) (From who) (mempty :: ByteString) + pause (TimeoutSec (realToFrac n)) now <- getTimeCoarse seen <- readTVarIO tseen @@ -287,9 +287,22 @@ runMessagingUnix env = do trace "watchdog fired!" throwIO ReadTimeoutException - pause (TimeoutSec (max 1 (realToFrac n / 2))) writer <- ContT $ liftIO . withAsync do + + let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] + + let withWD m = case mwd of + Nothing -> m + Just n -> do + let nwait = max 1 (realToFrac n * 0.7) + e <- race (pause (TimeoutSec nwait)) m + case e of + Right{} -> pure () + Left{} -> do + liftIO $ sendAll sock $ bytestring32 0 + -- liftIO $ SL.sendAll sock "" + forever do -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. @@ -298,10 +311,12 @@ runMessagingUnix env = do mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who maybe1 mq none $ \q -> do - msg <- liftIO . atomically $ readTQueue q - let len = fromIntegral $ LBS.length msg :: Int - liftIO $ sendAll sock $ bytestring32 (fromIntegral len) - liftIO $ SL.sendAll sock msg + -- если WD установлен, то просыпаемся, скажем, wd/2 и + -- шлём пустую строку серверу + withWD do + msg <- liftIO $ atomically $ readTQueue q + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ SL.sendAll sock $ ( LBS.fromStrict (bytestring32 (fromIntegral len)) <> msg) r <- waitAnyCatchCancel [reader, writer, watchdog] @@ -362,7 +377,7 @@ instance Messaging MessagingUnix UNIX ByteString where sendTo bus (To who) (From me) msg = liftIO do - createQueues bus who + -- createQueues bus who -- FIXME: handle-no-queue-for-rcpt-situation-1 diff --git a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs index ba8e3163..062edebd 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs @@ -21,7 +21,9 @@ import Data.List qualified as List import Data.Word import Control.Concurrent.STM (flushTQueue) import Data.Maybe +import Data.Either import UnliftIO +import System.IO (hPrint) instance (HasProtocol UNIX (NotifyProto ev0 UNIX)) => HasTimeLimits UNIX (NotifyProto ev0 UNIX) IO where @@ -59,7 +61,9 @@ class (ForNotify ev) => NotifySource ev source where -- Или нет? Наверное, да, так как для каждого типа -- эвента свой какой-то код их генерирует data NotifyProto ev e = - NotifyWant Word64 (NotifyKey ev) + NotifyPing + | NotifyPong + | NotifyWant Word64 (NotifyKey ev) | NotifyGiven Word64 NotifyHandle | NotifyAlive NotifyHandle | Notify NotifyHandle (NotifyEvent ev) @@ -103,6 +107,7 @@ newNotifyEnvServer src = NotifyEnv src <$> newTVarIO mempty makeNotifyServer :: forall ev src e m . ( MonadIO m , Response e (NotifyProto ev e) m , NotifySource ev src + , HasDeferred e (NotifyProto ev e) m , Pretty (Peer e) ) => NotifyEnv ev src e @@ -111,10 +116,12 @@ makeNotifyServer :: forall ev src e m . ( MonadIO m makeNotifyServer (NotifyEnv{..}) what = do - case what of - NotifyWant rn key -> do + let proxy = Proxy @(NotifyProto ev e) - trace "SERVER: NotifyWant" + case what of + NotifyWant rn key -> deferred proxy do + + debug "SERVER: NotifyWant" who <- thatPeer (Proxy @(NotifyProto ev e)) @@ -127,6 +134,7 @@ makeNotifyServer (NotifyEnv{..}) what = do modifyTVar notifyAlive (HashMap.insert hndl now) modifyTVar notifyWho (HashMap.insert hndl who) + debug $ "SEND GIVEN TO" <+> viaShow hndl <+> pretty who response (NotifyGiven @ev @e rn hndl) NotifyBye ha -> do @@ -139,12 +147,16 @@ makeNotifyServer (NotifyEnv{..}) what = do NotifyError{} -> pure () + NotifyPing{} -> do + response (NotifyPong @ev @e) + _ -> response (NotifyError @ev @e NotifyErrUnexpected) runNotifyWorkerServer :: forall ev src e m . ( Request e (NotifyProto ev e) m , ForNotify ev , NotifySource ev src + , Pretty (Peer e) , MonadUnliftIO m ) => NotifyEnv ev src e @@ -188,6 +200,7 @@ runNotifyWorkerServer env = do work <- async $ forever do -- TODO: several-threads (ha, pip, ev) <- atomically $ readTQueue (notifyQ env) + debug $ "SENT-NOTIFY-TO" <+> pretty pip request pip (Notify @ev @e ha ev) mapM_ link [cleanup, work] @@ -199,10 +212,13 @@ data NotifySinkTask ev e = NotifySinkSubscribe Word64 (NotifyKey ev) (TQueue NotifyHandle) | NotifySinkAlive NotifyHandle | NotifySinkBye NotifyHandle + | NotifySinkPing data NotifySink ev e = NotifySink { sinkPipeline :: TQueue (NotifySinkTask ev e) + , sinkPong :: TQueue () + , sinkPongNum :: TVar Int , sinkNotify :: TVar (HashMap NotifyHandle (TQueue (Maybe (NotifyData ev)))) , sinkWaiters :: TVar (HashMap Word64 (TQueue NotifyHandle)) , sinkRnum :: TVar Word64 @@ -210,6 +226,8 @@ data NotifySink ev e = newNotifySink :: MonadIO m => m (NotifySink ev e) newNotifySink = NotifySink <$> newTQueueIO + <*> newTQueueIO + <*> newTVarIO 0 <*> newTVarIO mempty <*> newTVarIO mempty <*> newTVarIO 1 @@ -226,23 +244,39 @@ runNotifyWorkerClient :: forall ev e m . ( MonadUnliftIO m runNotifyWorkerClient sink = do let waiters = sinkWaiters sink pip <- ownPeer + + atomically $ writeTVar (sinkPongNum sink) 0 + + fix \next -> do + request @e pip (NotifyPing @ev @e) + what <- race (pause @'Seconds 2) do + debug "runNotifyWorkerClient.sendPing" + atomically $ readTQueue (sinkPong sink) + either (const next) (const none) what + atomically $ modifyTVar (sinkPongNum sink) succ + + debug "runNotifyWorkerClient.run" + forever do atomically (readTQueue (sinkPipeline sink)) >>= \case NotifySinkSubscribe r k w -> do atomically $ modifyTVar waiters (HashMap.insert r w) - void $ asyncLinked $ do + void $ asyncLinked $ void $ try @_ @SomeException $ do -- если ничего не произошло, через минуту удаляем pause @'Seconds 60 atomically $ modifyTVar waiters (HashMap.delete r) - trace $ "CLIENT:" <+> "NotifySinkSubscribe" + trace $ "CLIENT:" <+> "SEND NotifySinkSubscribe" <+> pretty r request @e pip (NotifyWant @ev @e r k) NotifySinkAlive h -> request @e pip (NotifyAlive @ev @e h) + NotifySinkPing -> do + request @e pip (NotifyPing @ev @e) + NotifySinkBye h -> do trace $ "CLIENT:" <+> "NotifySinkBye" <+> viaShow h request @e pip (NotifyBye @ev @e h) @@ -265,12 +299,17 @@ makeNotifyClient sink what = do case what of Notify ha (NotifyEvent _ kd) -> do - -- debug $ "CLIENT: GOT NOTIFY!" <+> pretty ha + trace $ "CLIENT: GOT NOTIFY!" <+> pretty ha mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha + forM_ mq $ \q -> do r <- try @_ @SomeException $ atomically $ writeTQueue q (Just kd) + when (isLeft r) do + debug "LEFT writeTQueue" + let unsbscribe _ = do + debug "UNSUBSCRIBE!" -- на том конце очередь сдохла? удаляем request @e pip (NotifyBye @ev @e ha) atomically (modifyTVar (sinkNotify sink) (HashMap.delete ha)) @@ -279,13 +318,15 @@ makeNotifyClient sink what = do either unsbscribe (const none) r NotifyGiven rn ha -> do + trace $ "CLIENT: GOT NOTIFY GIVEN!" <+> pretty ha waiter <- atomically $ do w <- readTVar waiters <&> HashMap.lookup rn modifyTVar waiters (HashMap.delete rn) pure w forM_ waiter $ \wa -> do - void $ try @_ @SomeException $ atomically $ writeTQueue wa ha + r <- try @_ @SomeException $ atomically $ writeTQueue wa ha + debug $ "NOTIFY CLIENT SUBSCRIBED" <+> viaShow rn NotifyBye ha -> do mq <- readTVarIO (sinkNotify sink) <&> HashMap.lookup ha @@ -294,6 +335,9 @@ makeNotifyClient sink what = do void $ atomically $ modifyTVar (sinkNotify sink) $ HashMap.delete ha + NotifyPong{} -> do + void $ atomically $ writeTQueue (sinkPong sink) () + NotifyError e -> do err $ "*** makeNotifyClient:" <+> viaShow e @@ -312,25 +356,42 @@ runNotifySink :: forall ev e m . MonadUnliftIO m runNotifySink sink k action = do - my <- nextRNum sink + fix \next -> do + snum <- readTVarIO (sinkPongNum sink) + if snum > 0 then + none + else do + pause @'Seconds 0.5 + next - answ <- newTQueueIO + ha <- fix \next -> do - debug "runNotifySink 1" - atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ) + r <- race (pause @'Seconds 1) do + my <- nextRNum sink - -- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь - ha <- atomically $ do - r <- readTQueue answ - flushTQueue answ - pure r + answ <- newTQueueIO + + atomically $ writeTQueue (sinkPipeline sink) (NotifySinkSubscribe my k answ) + + -- ждём первый ответ, потом бы дропнуть или ЗАКРЫТЬ очередь + atomically $ do + r <- readTQueue answ + flushTQueue answ + pure r + + case r of + Right x -> pure x + Left{} -> do + debug "Retry subscribing..." + pause @'Seconds 1 + next myQ <- newTQueueIO atomically $ modifyTVar (sinkNotify sink) (HashMap.insert ha myQ) w <- async $ forever do - pause @'Seconds 30 atomically $ writeTQueue (sinkPipeline sink) (NotifySinkAlive ha) + pause @'Seconds 30 -- NOTE: run-notify-sink-cleanup -- если нас пристрелили --- попрощаться с NotifySink хотя бы @@ -366,6 +427,7 @@ instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where startNotify src key fn = do ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s) + debug $ "Start notify!" atomically $ modifyTVar (listeners src) (HashMap.insertWith (<>) key [(ha, SomeCallback @ev fn)]) pure ha diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 9fe0e421..ba795c06 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -832,7 +832,9 @@ refChanNotifyProto self adapter msg@(Notify rchan box) = do -- теперь пересылаем по госсипу lift $ gossip msg - debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 + -- FIXME: remove-debug + let h1 = hashObject @HbSync (serialise box) + debug $ "^^^ refChanNotifyProto" <+> pretty peer <+> pretty h0 <+> pretty h1 -- тут надо заслать во внешнее приложение, -- равно как и в остальных refchan-протоколах diff --git a/hbs2-git/reposync/ReposyncMain.hs b/hbs2-git/reposync/ReposyncMain.hs index 35dc5069..2f9ebc9a 100644 --- a/hbs2-git/reposync/ReposyncMain.hs +++ b/hbs2-git/reposync/ReposyncMain.hs @@ -228,17 +228,18 @@ runSync = do lift $ syncRepo entry - fix \next -> do - rr' <- liftIO $ race (pause @'Seconds 1) do - callService @RpcRefLogGet refLogRPC rk - <&> fromRight Nothing + fix \next -> do void $ liftIO $ race (pause @'Seconds 60) (atomically (peekTQueue upd)) pause @'Seconds 5 liftIO $ atomically $ flushTQueue upd - rr <- either (const $ pause @'Seconds 10 >> warn "rpc call timeout" >> next) pure rr' + rr' <- liftIO $ race (pause @'Seconds 1) do + callService @RpcRefLogGet refLogRPC rk + <&> fromRight Nothing + + rr <- either (const $ pause @'Seconds 1 >> warn "rpc call timeout" >> next) pure rr' debug $ "REFLOG VALUE:" <+> pretty rr @@ -250,7 +251,7 @@ runSync = do lift (syncRepo entry) >>= \case Left{} -> do debug $ "Failed to update:" <+> pretty (repoPath entry) - pause @'Seconds 1 + pause @'Seconds 5 again Right{} -> do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 849a95d8..3e642b46 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -32,6 +32,7 @@ import HBS2.Net.Proto.RefLog import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Notify (NotifyProto) import HBS2.OrDie import HBS2.Storage.Simple import HBS2.Data.Detect @@ -548,6 +549,8 @@ instance ( Monad m response = lift . response +instance (MonadUnliftIO m, HasProtocol UNIX (NotifyProto ev e)) => HasDeferred UNIX (NotifyProto ev e) m where + deferred _ m = void $ async m respawn :: PeerOpts -> IO () respawn opts = diff --git a/hbs2-tests/test/notify-unix/Main.hs b/hbs2-tests/test/notify-unix/Main.hs index 0cd2d697..71e13cc8 100644 --- a/hbs2-tests/test/notify-unix/Main.hs +++ b/hbs2-tests/test/notify-unix/Main.hs @@ -1,7 +1,7 @@ module Main where import HBS2.Prelude.Plated -import HBS2.Clock +import HBS2.Clock hiding (sec) import HBS2.Net.Proto import HBS2.Net.Messaging.Unix import HBS2.Net.Proto.Notify @@ -38,12 +38,19 @@ instance HasProtocol UNIX (NotifyProto Tick UNIX) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +instance (MonadUnliftIO m, HasProtocol UNIX (NotifyProto ev e)) => HasDeferred UNIX (NotifyProto ev e) m where + deferred _ m = void $ async m +data WhatTick = Odd | Even + deriving stock (Generic,Eq) + +instance Hashable WhatTick +instance Serialise WhatTick newtype instance NotifyKey Tick = - TickNotifyKey () - deriving (Generic,Eq) - deriving newtype Hashable + TickNotifyKey WhatTick + deriving (Generic) + deriving newtype (Hashable,Eq) newtype instance NotifyData Tick = TickNotifyData Int @@ -88,7 +95,10 @@ main = do sec <- newTVarIO 0 forever do sn <- atomically $ stateTVar sec (\s -> (s, succ s)) - emitNotify src (TickNotifyKey (), TickNotifyData sn) + if even sn then do + emitNotify src (TickNotifyKey Even, TickNotifyData sn) + else + emitNotify src (TickNotifyKey Odd, TickNotifyData sn) debug "SERVER: TICK!" pause @'Seconds 1 @@ -110,17 +120,17 @@ main = do [ makeResponse (makeNotifyClient @Tick sink) ] - s1 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do + s1 <- asyncLinked $ runNotifySink sink (TickNotifyKey Even) $ \(TickNotifyData td) -> do debug $ "CLIENT1:" <+> viaShow td - s2 <- asyncLinked $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do + s2 <- asyncLinked $ runNotifySink sink (TickNotifyKey Odd) $ \(TickNotifyData td) -> do debug $ "CLIENT2:" <+> viaShow td - s3 <- async $ runNotifySink sink (TickNotifyKey ()) $ \(TickNotifyData td) -> do + s3 <- async $ runNotifySink sink (TickNotifyKey Odd) $ \(TickNotifyData td) -> do debug $ "CLIENT3:" <+> viaShow td void $ async do - pause @'Seconds 3 + pause @'Seconds 10 cancelWith s3 (toException (userError "Fuck you!")) void $ waitAnyCatchCancel [p1,p2,m1,m2,s1,s2]