diff --git a/hbs2-fixer/app/Main.hs b/hbs2-fixer/app/Main.hs index 09b838d9..431d5797 100644 --- a/hbs2-fixer/app/Main.hs +++ b/hbs2-fixer/app/Main.hs @@ -160,71 +160,80 @@ withApp cfgPath action = do setLogging @WARN warnPrefix setLogging @NOTICE noticePrefix - soname <- detectRPC - `orDie` "can't detect RPC" + fix \next -> do - flip runContT pure do + flip runContT pure do - client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) - >>= orThrowUser ("can't connect to" <+> pretty soname) + soname' <- lift detectRPC - mess <- ContT $ withAsync $ runMessagingUnix client + soname <- ContT $ maybe1 soname' (pure ()) - link mess + client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) + >>= orThrowUser ("can't connect to" <+> pretty soname) - peerAPI <- makeServiceCaller @PeerAPI (fromString soname) - refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) - storageAPI <- makeServiceCaller @StorageAPI (fromString soname) - lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) + mess <- ContT $ withAsync $ runMessagingUnix client - let endpoints = [ Endpoint @UNIX peerAPI - , Endpoint @UNIX refLogAPI - , Endpoint @UNIX lwwAPI - , Endpoint @UNIX storageAPI - ] + peerAPI <- makeServiceCaller @PeerAPI (fromString soname) + refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) + storageAPI <- makeServiceCaller @StorageAPI (fromString soname) + lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) - void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + let endpoints = [ Endpoint @UNIX peerAPI + , Endpoint @UNIX refLogAPI + , Endpoint @UNIX lwwAPI + , Endpoint @UNIX storageAPI + ] - let o = [MUWatchdog 20,MUDontRetry] - clientN <- newMessagingUnixOpts o False 1.0 soname + mn <- ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client - notif <- ContT $ withAsync (runMessagingUnix clientN) + let o = [MUWatchdog 20,MUDontRetry] + clientN <- newMessagingUnixOpts o False 1.0 soname - link notif + notif <- ContT $ withAsync (runMessagingUnix clientN) - sink <- newNotifySink - void $ ContT $ withAsync $ flip runReaderT clientN $ do - debug $ red "notify restarted!" - runNotifyWorkerClient sink + sink <- newNotifySink - void $ ContT $ withAsync $ flip runReaderT clientN $ do - runProto @UNIX - [ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink) - ] + void $ ContT $ withAsync $ flip runReaderT clientN $ do + debug $ red "notify restarted!" + runNotifyWorkerClient sink - env <- FixerEnv Nothing - lwwAPI - refLogAPI - sink - peerAPI - (AnyStorage (StorageClient storageAPI)) - <$> newTVarIO mempty - <*> newTVarIO 30 - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO 0 - <*> newTVarIO mempty - <*> newTQueueIO + p1 <- ContT $ withAsync $ flip runReaderT clientN $ do + runProto @UNIX + [ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink) + ] - lift $ runReaderT (runFixerM $ withConfig cfgPath action) env - `finally` do - setLoggingOff @DEBUG - setLoggingOff @INFO - setLoggingOff @ERROR - setLoggingOff @WARN - setLoggingOff @NOTICE + env <- FixerEnv Nothing + lwwAPI + refLogAPI + sink + peerAPI + (AnyStorage (StorageClient storageAPI)) + <$> newTVarIO mempty + <*> newTVarIO 30 + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO 0 + <*> newTVarIO mempty + <*> newTQueueIO + + void $ ContT $ bracket (pure ()) $ \_ -> do + readTVarIO (_listeners env) <&> HM.elems >>= mapM_ cancel + + p3 <- ContT $ withAsync $ runReaderT (runFixerM $ withConfig cfgPath action) env + + void $ waitAnyCatchCancel [mess,mn,notif,p1,p3] + + debug $ red "FUCKING CANCELLED!" + pause @'Seconds 5 + next + + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE where errorPrefix = toStdout . logPrefix "[error] " @@ -238,7 +247,7 @@ data ConfWatch = | ConfUpdate [Syntax C] mainLoop :: FixerM IO () -mainLoop = forever $ do +mainLoop = do debug "hbs2-fixer. do stuff since 2024" conf <- getConf -- debug $ line <> vcat (fmap pretty conf) @@ -249,7 +258,7 @@ mainLoop = forever $ do lift $ updateFromConfig conf - void $ ContT $ withAsync $ do + p1 <- ContT $ withAsync $ do cfg <- asks _configFile `orDie` "config file not specified" flip fix ConfRead $ \next -> \case @@ -279,7 +288,7 @@ mainLoop = forever $ do next ConfRead -- poll reflogs - void $ ContT $ withAsync do + p2 <- ContT $ withAsync do let w = asks _watchers >>= readTVarIO @@ -300,15 +309,22 @@ mainLoop = forever $ do pure () - jobs <- asks _pipeline - void $ ContT $ withAsync $ forever do - liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs) - >>= \case - Left e -> err (viaShow e) - _ -> pure () + p3 <- ContT $ withAsync $ fix \next -> do + r <- liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs) + case r of + Left e -> do + err ("CATCHED" <+> viaShow e) + let ee = fromException @AsyncCancelled e - forever $ pause @'Seconds 60 + unless (isJust ee) do + next + + debug "WE'RE FUCKING CANCELLED!" + + _ -> next + + void $ waitAnyCatchCancel [p1,p2,p3] oneSec :: MonadUnliftIO m => m b -> m (Either () b) oneSec = race (pause @'Seconds 1) diff --git a/hbs2-tests/test/playground/Main.hs b/hbs2-tests/test/playground/Main.hs index 284e3f91..1c1525d7 100644 --- a/hbs2-tests/test/playground/Main.hs +++ b/hbs2-tests/test/playground/Main.hs @@ -10,6 +10,10 @@ import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Codec.Serialise import Lens.Micro.Platform +import Control.Monad.Trans.Cont + +import Control.Monad +import UnliftIO -- желаемое поведение: добавить в новую версию A какое-нибудь поле так, -- что бы предыдущие записи продолжали десериализоваться без этого поля, @@ -65,6 +69,29 @@ test w = case w of A -> "Match A" +runWithAsync :: IO () +runWithAsync = do + + hSetBuffering stdout LineBuffering + + flip runContT pure do + + t1 <- ContT $ withAsync do + forever do + print "PIU" + pause @'Seconds 1 + + q <- ContT $ withAsync do + pause @'Seconds 10 + print "FUCKIG QUIT" + + pysh <- ContT $ withAsync $ forever do + pause @'Seconds 2 + print "PYSHPYSH" + + void $ waitAnyCatchCancel [t1,q,pysh] + + main :: IO () main = do print "1"