wip, debug

This commit is contained in:
Dmitry Zuikov 2024-04-10 11:24:15 +03:00
parent 73049bcd03
commit c75dd1ae6b
2 changed files with 103 additions and 60 deletions

View File

@ -160,71 +160,80 @@ withApp cfgPath action = do
setLogging @WARN warnPrefix
setLogging @NOTICE noticePrefix
soname <- detectRPC
`orDie` "can't detect RPC"
fix \next -> do
flip runContT pure do
flip runContT pure do
client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
>>= orThrowUser ("can't connect to" <+> pretty soname)
soname' <- lift detectRPC
mess <- ContT $ withAsync $ runMessagingUnix client
soname <- ContT $ maybe1 soname' (pure ())
link mess
client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
>>= orThrowUser ("can't connect to" <+> pretty soname)
peerAPI <- makeServiceCaller @PeerAPI (fromString soname)
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
mess <- ContT $ withAsync $ runMessagingUnix client
let endpoints = [ Endpoint @UNIX peerAPI
, Endpoint @UNIX refLogAPI
, Endpoint @UNIX lwwAPI
, Endpoint @UNIX storageAPI
]
peerAPI <- makeServiceCaller @PeerAPI (fromString soname)
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
let endpoints = [ Endpoint @UNIX peerAPI
, Endpoint @UNIX refLogAPI
, Endpoint @UNIX lwwAPI
, Endpoint @UNIX storageAPI
]
let o = [MUWatchdog 20,MUDontRetry]
clientN <- newMessagingUnixOpts o False 1.0 soname
mn <- ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
notif <- ContT $ withAsync (runMessagingUnix clientN)
let o = [MUWatchdog 20,MUDontRetry]
clientN <- newMessagingUnixOpts o False 1.0 soname
link notif
notif <- ContT $ withAsync (runMessagingUnix clientN)
sink <- newNotifySink
void $ ContT $ withAsync $ flip runReaderT clientN $ do
debug $ red "notify restarted!"
runNotifyWorkerClient sink
sink <- newNotifySink
void $ ContT $ withAsync $ flip runReaderT clientN $ do
runProto @UNIX
[ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink)
]
void $ ContT $ withAsync $ flip runReaderT clientN $ do
debug $ red "notify restarted!"
runNotifyWorkerClient sink
env <- FixerEnv Nothing
lwwAPI
refLogAPI
sink
peerAPI
(AnyStorage (StorageClient storageAPI))
<$> newTVarIO mempty
<*> newTVarIO 30
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTQueueIO
p1 <- ContT $ withAsync $ flip runReaderT clientN $ do
runProto @UNIX
[ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink)
]
lift $ runReaderT (runFixerM $ withConfig cfgPath action) env
`finally` do
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
env <- FixerEnv Nothing
lwwAPI
refLogAPI
sink
peerAPI
(AnyStorage (StorageClient storageAPI))
<$> newTVarIO mempty
<*> newTVarIO 30
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTQueueIO
void $ ContT $ bracket (pure ()) $ \_ -> do
readTVarIO (_listeners env) <&> HM.elems >>= mapM_ cancel
p3 <- ContT $ withAsync $ runReaderT (runFixerM $ withConfig cfgPath action) env
void $ waitAnyCatchCancel [mess,mn,notif,p1,p3]
debug $ red "FUCKING CANCELLED!"
pause @'Seconds 5
next
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
where
errorPrefix = toStdout . logPrefix "[error] "
@ -238,7 +247,7 @@ data ConfWatch =
| ConfUpdate [Syntax C]
mainLoop :: FixerM IO ()
mainLoop = forever $ do
mainLoop = do
debug "hbs2-fixer. do stuff since 2024"
conf <- getConf
-- debug $ line <> vcat (fmap pretty conf)
@ -249,7 +258,7 @@ mainLoop = forever $ do
lift $ updateFromConfig conf
void $ ContT $ withAsync $ do
p1 <- ContT $ withAsync $ do
cfg <- asks _configFile `orDie` "config file not specified"
flip fix ConfRead $ \next -> \case
@ -279,7 +288,7 @@ mainLoop = forever $ do
next ConfRead
-- poll reflogs
void $ ContT $ withAsync do
p2 <- ContT $ withAsync do
let w = asks _watchers
>>= readTVarIO
@ -300,15 +309,22 @@ mainLoop = forever $ do
pure ()
jobs <- asks _pipeline
void $ ContT $ withAsync $ forever do
liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs)
>>= \case
Left e -> err (viaShow e)
_ -> pure ()
p3 <- ContT $ withAsync $ fix \next -> do
r <- liftIO $ E.try @SomeException (join $ atomically $ readTQueue jobs)
case r of
Left e -> do
err ("CATCHED" <+> viaShow e)
let ee = fromException @AsyncCancelled e
forever $ pause @'Seconds 60
unless (isJust ee) do
next
debug "WE'RE FUCKING CANCELLED!"
_ -> next
void $ waitAnyCatchCancel [p1,p2,p3]
oneSec :: MonadUnliftIO m => m b -> m (Either () b)
oneSec = race (pause @'Seconds 1)

View File

@ -10,6 +10,10 @@ import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Codec.Serialise
import Lens.Micro.Platform
import Control.Monad.Trans.Cont
import Control.Monad
import UnliftIO
-- желаемое поведение: добавить в новую версию A какое-нибудь поле так,
-- что бы предыдущие записи продолжали десериализоваться без этого поля,
@ -65,6 +69,29 @@ test w = case w of
A -> "Match A"
runWithAsync :: IO ()
runWithAsync = do
hSetBuffering stdout LineBuffering
flip runContT pure do
t1 <- ContT $ withAsync do
forever do
print "PIU"
pause @'Seconds 1
q <- ContT $ withAsync do
pause @'Seconds 10
print "FUCKIG QUIT"
pysh <- ContT $ withAsync $ forever do
pause @'Seconds 2
print "PYSHPYSH"
void $ waitAnyCatchCancel [t1,q,pysh]
main :: IO ()
main = do
print "1"