diff --git a/docs/devlog.md b/docs/devlog.md index 87e98a23..f7c1b4df 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1311,3 +1311,4 @@ PR: bus-crypt Шифрование протокола общения нод. Обмен асимметричными публичными ключами выполняется на стадии хэндшейка в ping/pong. Для шифрования данных создаётся симметричный ключ по diffie-hellman. + diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index b337620d..ca534cc9 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -75,6 +75,7 @@ library , HBS2.Actors.Peer.Types , HBS2.Base58 , HBS2.Clock + , HBS2.Concurrent.Supervisor , HBS2.Crypto , HBS2.Data.Detect , HBS2.Data.Types diff --git a/hbs2-core/lib/Dialog/Helpers/Streaming.hs b/hbs2-core/lib/Dialog/Helpers/Streaming.hs index a38be2b6..7ab544a3 100644 --- a/hbs2-core/lib/Dialog/Helpers/Streaming.hs +++ b/hbs2-core/lib/Dialog/Helpers/Streaming.hs @@ -10,7 +10,6 @@ import Streaming as S import Streaming.Internal import Streaming.Prelude (cons) import Streaming.Prelude qualified as S -import UnliftIO.Async import UnliftIO.STM import Prelude hiding (cons) diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index 0c303c7a..028a58b4 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -14,7 +14,6 @@ import Control.Concurrent.STM.TBMQueue qualified as TBMQ import Control.Concurrent.STM.TBMQueue (TBMQueue) import Control.Concurrent.STM.TVar qualified as TVar import Control.Monad -import Control.Concurrent.Async import Data.Function import Data.Functor import Data.Kind diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index bf82a7e7..867f5acc 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -21,13 +21,14 @@ import HBS2.Net.PeerLocator import HBS2.Net.PeerLocator.Static import HBS2.Net.Proto import HBS2.Net.Proto.Sessions +import HBS2.Prelude import HBS2.Prelude.Plated import HBS2.Storage import HBS2.System.Logger.Simple +import HBS2.Concurrent.Supervisor import Control.Applicative import Control.Monad.Trans.Maybe -import Control.Concurrent.Async import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import Data.Cache (Cache) @@ -439,7 +440,7 @@ newPeerEnv s bus p = do _envEncryptionKeys <- liftIO (newTVarIO mempty) pure PeerEnv {..} -runPeerM :: forall e m . ( MonadIO m +runPeerM :: forall e m . ( MonadUnliftIO m , HasPeer e , Ord (Peer e) , Pretty (Peer e) @@ -449,12 +450,12 @@ runPeerM :: forall e m . ( MonadIO m -> PeerM e m () -> m () -runPeerM env f = do +runPeerM env f = withAsyncSupervisor "runPeerM" \sup -> do let de = view envDeferred env - as <- liftIO $ replicateM 8 $ async $ runPipeline de + as <- liftIO $ replicateM 8 $ asyncStick' sup "runPipeline" $ runPipeline de - sw <- liftIO $ async $ forever $ withPeerM env $ do + sw <- liftIO $ asyncStick' sup "sweeps" $ forever $ withPeerM env $ do pause defSweepTimeout se <- asks (view envSessions) liftIO $ Cache.purgeExpired se @@ -462,7 +463,7 @@ runPeerM env f = do void $ runReaderT (fromPeerM f) env void $ liftIO $ stopPipeline de - liftIO $ mapM_ cancel (as <> [sw]) + -- liftIO $ mapM_ cancel (as <> [sw]) withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a withPeerM env action = runReaderT (fromPeerM action) env diff --git a/hbs2-core/lib/HBS2/Concurrent/Supervisor.hs b/hbs2-core/lib/HBS2/Concurrent/Supervisor.hs new file mode 100644 index 00000000..acbfa0be --- /dev/null +++ b/hbs2-core/lib/HBS2/Concurrent/Supervisor.hs @@ -0,0 +1,78 @@ +module HBS2.Concurrent.Supervisor +( module HBS2.Concurrent.Supervisor +, module X +) where + +import Control.Arrow hiding ((<+>)) +import Control.Concurrent.Async qualified as Async +import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Maybe +import Data.Proxy +import Data.Text (Text) +import Prettyprinter +import System.IO (Handle) +import UnliftIO (MonadUnliftIO(..)) +import UnliftIO.Async +import UnliftIO.Async as X hiding (async) +import UnliftIO.Concurrent +import UnliftIO.Exception + +import HBS2.System.Logger.Simple + + +data Sup = Sup + { supAsync :: Async () + } + +data SupFinished = SupFinished Text + deriving (Show) +instance Exception SupFinished + +withAsyncSupervisor :: (MonadUnliftIO io) => Text -> (Sup -> io a) -> io a +withAsyncSupervisor name k = + bracket + (Sup <$> async (forever (threadDelay (10^9)))) + (flip throwTo (SupFinished name) . asyncThreadId . supAsync) + (\sup -> (k sup) + `withException` \(e :: SomeException) -> do + debug $ "Finished sup " <> pretty name <> " " <> viaShow e + ) + +asyncStick :: MonadUnliftIO m => Sup -> m a -> m (Async a) +asyncStick sup ioa = do + a <- async ioa + liftIO $ Async.link2Only (const True) (supAsync sup) a + pure a + +asyncStick' :: MonadUnliftIO m => Sup -> Text -> m a -> m (Async a) +asyncStick' sup name ioa = do + a <- async $ + ioa + `withException` \(e :: SomeException) -> + debug $ "finished async" <+> pretty name <+> ":" <+> viaShow e + liftIO $ Async.link2Only (const True) (supAsync sup) a + pure a + + +selectException_ :: forall e m . (Exception e, Monad m) + => Proxy e -> SomeException -> MaybeT m () +selectException_ _ = fromException >>> \case + Nothing -> MaybeT (pure Nothing) + Just (e :: e) -> pure () + +selectException :: forall e m . (Exception e, Monad m) + => SomeException -> (e -> m ()) -> MaybeT m () +selectException e f = case (fromException e) of + Nothing -> MaybeT (pure Nothing) + Just e' -> lift (f e') + +withExceptionIO :: Exception e => IO a -> (e -> IO b) -> IO a +withExceptionIO io what = io `catch` \e -> do + _ <- what e + throwIO e + +withSomeExceptionIO :: IO a -> (SomeException -> IO b) -> IO a +withSomeExceptionIO = withExceptionIO + diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 4cb28d6b..fbc3da7f 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -14,6 +14,7 @@ import HBS2.Net.IP.Addr import HBS2.Net.Messaging import HBS2.Net.Proto.Types import HBS2.Prelude.Plated +import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple @@ -40,7 +41,6 @@ import Streaming.Prelude qualified as S import System.Random hiding (next) import Control.Monad.Trans.Resource -import UnliftIO.Async import UnliftIO.STM import UnliftIO.Exception qualified as U @@ -245,7 +245,7 @@ spawnConnection tp env so sa = liftIO do when ( used <= 2 ) do atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) - when (used == 1) do + when (used == 1) $ withAsyncSupervisor "in spawnConnection" \sup -> do q <- getWriteQueue connId updatePeer connId newP @@ -254,7 +254,7 @@ spawnConnection tp env so sa = liftIO do <+> pretty newP <+> parens ("used:" <+> pretty used) - rd <- async $ fix \next -> do + rd <- asyncStick sup $ fix \next -> do spx <- readFromSocket so 4 <&> LBS.toStrict ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг @@ -276,7 +276,7 @@ spawnConnection tp env so sa = liftIO do next - wr <- async $ fix \next -> do + wr <- asyncStick sup $ fix \next -> do (rcpt, bs) <- atomically $ readTQueue q pq <- makeReqId rcpt @@ -364,14 +364,14 @@ connectPeerTCP env peer = liftIO do -- FIXME: link-all-asyncs runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () -runMessagingTCP env = liftIO do +runMessagingTCP env = liftIO $ withAsyncSupervisor "in runMessagingTCP" \sup -> do own <- toPeerAddr $ view tcpOwnPeer env let (L4Address _ (IPAddrPort (i,p))) = own let defs = view tcpDefer env - mon <- async $ forever do + mon <- asyncStick sup $ forever do pause @'Seconds 30 now <- getTimeCoarse @@ -384,7 +384,7 @@ runMessagingTCP env = liftIO do [] -> Nothing xs -> Just xs - con <- async $ forever do + con <- asyncStick sup $ forever do let ev = view tcpDeferEv env @@ -408,7 +408,7 @@ runMessagingTCP env = liftIO do co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip - maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do + maybe1 co' (void $ asyncStick sup (connectPeerTCP env pip)) $ \co -> do q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co maybe1 q' none $ \q -> do atomically do @@ -418,7 +418,7 @@ runMessagingTCP env = liftIO do pure () - stat <- async $ forever do + stat <- asyncStick sup $ forever do pause @'Seconds 120 ps <- readTVarIO $ view tcpConnPeer env let peers = HashMap.toList ps @@ -429,8 +429,6 @@ runMessagingTCP env = liftIO do <+> pretty c <+> parens ("used:" <+> pretty used) - mapM_ link [mon,con,stat] - liftIO ( listen (Host (show i)) (show p) $ \(sock, sa) -> do withFdSocket sock setCloseOnExecIfNeeded diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index e47789fb..5d8b4b12 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -7,13 +7,13 @@ import HBS2.Net.IP.Addr import HBS2.Net.Messaging import HBS2.Net.Proto import HBS2.Prelude.Plated +import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple import Data.Function import Control.Exception import Control.Monad.Trans.Maybe -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Monad @@ -108,11 +108,11 @@ newMessagingUDP reuse saddr = udpWorker :: MessagingUDP -> TVar Socket -> IO () -udpWorker env tso = do +udpWorker env tso = withAsyncSupervisor "in udpWorker" \sup -> do so <- readTVarIO tso - rcvLoop <- async $ forever $ do + rcvLoop <- asyncStick sup $ forever $ do -- so <- readTVarIO tso -- pause ( 10 :: Timeout 'Seconds ) (msg, from) <- recvFrom so defMaxDatagram @@ -120,7 +120,7 @@ udpWorker env tso = do -- FIXME: ASAP-check-addr-type liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg) - sndLoop <- async $ forever $ do + sndLoop <- asyncStick sup $ forever $ do pause ( 10 :: Timeout 'Seconds ) -- (To whom, msg) <- atomically $ Q0.readTQueue (inbox env) -- print "YAY!" @@ -135,15 +135,16 @@ udpWorker env tso = do -- FIXME: stopping runMessagingUDP :: MonadIO m => MessagingUDP -> m () -runMessagingUDP udpMess = liftIO $ do +runMessagingUDP udpMess = liftIO $ withAsyncSupervisor "in runMessagingUDP" \sup -> do let addr = listenAddr udpMess so <- readTVarIO (sock udpMess) unless (mcast udpMess) $ do bind so addr - w <- async $ udpWorker udpMess (sock udpMess) - waitCatch w >>= either throwIO (const $ pure ()) + w <- asyncStick sup $ udpWorker udpMess (sock udpMess) + wait w + -- waitCatch w >>= either throwIO (const $ pure ()) instance Messaging MessagingUDP L4Proto ByteString where diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index b1b948d8..79fbd9fe 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -4,6 +4,7 @@ import HBS2.Prelude.Plated import HBS2.Net.Proto.Types import HBS2.Net.Messaging import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple @@ -106,7 +107,8 @@ runMessagingUnix env = do where - runServer = forever $ handleAny cleanupAndRetry $ runResourceT do + runServer = forever $ handleAny cleanupAndRetry $ runResourceT $ + withAsyncSupervisor "runServer" \sup -> do t0 <- getTimeCoarse atomically $ writeTVar (msgUnixLast env) t0 @@ -118,7 +120,7 @@ runMessagingUnix env = do liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) liftIO $ listen sock 1 - watchdog <- async $ do + watchdog <- asyncStick sup $ do let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] @@ -139,14 +141,14 @@ runMessagingUnix env = do when ( acc > 0 && diff >= toNanoSeconds (TimeoutSec $ realToFrac wd) ) do throwIO ReadTimeoutException - run <- async $ forever $ runResourceT do + run <- asyncStick sup $ forever $ runResourceT do (so, sa) <- liftIO $ accept sock atomically $ modifyTVar (msgUnixAccepts env) succ void $ allocate (pure so) close - writer <- async $ forever do + writer <- asyncStick sup $ forever do msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) let len = fromIntegral $ LBS.length msg :: Int liftIO $ sendAll so $ bytestring32 (fromIntegral len) @@ -172,7 +174,8 @@ runMessagingUnix env = do Right{} -> pure () - runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do + runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT $ + withAsyncSupervisor "runClient" \sup -> do sock <- liftIO $ socket AF_UNIX Stream defaultProtocol @@ -191,7 +194,7 @@ runMessagingUnix env = do attemptConnect - reader <- async $ forever do + reader <- asyncStick sup $ forever do -- Read response from server frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral frame <- liftIO $ recv sock frameLen diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index 1273cd07..e2024f48 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -24,16 +24,20 @@ import GHC.Generics as X (Generic) import Data.ByteString (ByteString) import Data.String (IsString(..)) import Safe +import Control.Concurrent.Async as X (ExceptionInLinkedThread) import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad (void,guard,when,unless) import Control.Monad.Trans.Class (lift) -import Data.Function +import Control.Monad.IO.Unlift as X import Data.Char qualified as Char -import Data.Text qualified as Text +import Data.Function import Data.Hashable -import Prettyprinter +import Data.Text qualified as Text import Data.Word +import Prettyprinter +import UnliftIO as X (MonadUnliftIO(..)) +import UnliftIO.Async as X none :: forall m . Monad m => m () none = pure () @@ -62,3 +66,4 @@ class ToByteString a where class FromByteString a where fromByteString :: ByteString -> Maybe a + diff --git a/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs b/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs index 53537a2b..bb01edc1 100644 --- a/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs +++ b/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs @@ -2,6 +2,7 @@ module Main where import HBS2.Prelude.Plated import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2Git.App import HBS2Git.State @@ -35,7 +36,6 @@ import System.FilePath.Posix import System.IO.Temp import System.Timeout (timeout) import Text.InterpolatedString.Perl6 (qc) -import UnliftIO.Async import Streaming.ByteString qualified as SB import Streaming.Zip qualified as SZip @@ -107,6 +107,7 @@ retryFor num waity sleep action = timeout (ceiling $ waity * 1000000) $ go num dumbHttpServe :: MonadUnliftIO m => Port -> m () dumbHttpServe pnum = do + withAsyncSupervisor "dumbHttpServe" \sup -> do locks <- liftIO $ newMVar (HashMap.empty @HashRef @(MVar ())) @@ -121,7 +122,7 @@ dumbHttpServe pnum = do -- с логом, тогда в следующий раз будет обратно -- распакован - updater <- async $ forever do + updater <- asyncStick sup $ forever do pause @'Seconds 300 pure () diff --git a/hbs2-git/lib/HBS2/Git/Local/CLI.hs b/hbs2-git/lib/HBS2/Git/Local/CLI.hs index b8bd5910..1d7d15b1 100644 --- a/hbs2-git/lib/HBS2/Git/Local/CLI.hs +++ b/hbs2-git/lib/HBS2/Git/Local/CLI.hs @@ -11,7 +11,6 @@ import HBS2.Git.Types import HBS2.System.Logger.Simple -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Writer import Data.HashSet (HashSet) diff --git a/hbs2-git/lib/HBS2Git/App.hs b/hbs2-git/lib/HBS2Git/App.hs index 7c230ebe..8e59ad38 100644 --- a/hbs2-git/lib/HBS2Git/App.hs +++ b/hbs2-git/lib/HBS2Git/App.hs @@ -50,7 +50,6 @@ import Data.Text qualified as Text import Data.IORef import System.IO.Unsafe (unsafePerformIO) import Data.Cache qualified as Cache -import Control.Concurrent.Async import System.Environment import Prettyprinter.Render.Terminal diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 1f4acf23..83b740f6 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -5,6 +5,7 @@ module BlockDownload where import HBS2.Actors.Peer import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Defaults @@ -25,7 +26,6 @@ import PeerTypes import PeerInfo import Brains -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TSem import Control.Monad.Reader @@ -418,6 +418,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO ) => DownloadEnv e -> m () blockDownloadLoop env0 = do + withAsyncSupervisor "blockDownloadLoop" \sup -> do e <- ask @@ -429,7 +430,7 @@ blockDownloadLoop env0 = do let withAllStuff = withPeerM e . withDownload env0 - void $ liftIO $ async $ forever $ withPeerM e do + void $ liftIO $ asyncStick sup $ forever $ withPeerM e do pause @'Seconds 30 pee <- knownPeers @e pl @@ -440,7 +441,7 @@ blockDownloadLoop env0 = do liftIO $ atomically $ writeTVar (view peerBurstMax pinfo) Nothing - void $ liftIO $ async $ forever $ withPeerM e do + void $ liftIO $ asyncStick sup $ forever $ withPeerM e do pause @'Seconds 1.5 pee <- knownPeers @e pl @@ -451,7 +452,7 @@ blockDownloadLoop env0 = do updatePeerInfo False p pinfo - void $ liftIO $ async $ forever $ withAllStuff do + void $ liftIO $ asyncStick sup $ forever $ withAllStuff do pause @'Seconds 5 -- FIXME: put to defaults -- we need to show download stats @@ -507,7 +508,7 @@ blockDownloadLoop env0 = do liftIO $ atomically $ do modifyTVar busyPeers (HashSet.insert p) - void $ liftIO $ async $ withAllStuff do + void $ liftIO $ asyncStick sup $ withAllStuff do -- trace $ "start downloading shit" <+> pretty p <+> pretty h @@ -562,7 +563,7 @@ blockDownloadLoop env0 = do proposed <- asks (view blockProposed) - void $ liftIO $ async $ forever do + void $ liftIO $ asyncStick sup $ forever do pause @'Seconds 20 -- debug "block download loop. does not do anything" liftIO $ Cache.purgeExpired proposed @@ -578,11 +579,12 @@ postponedLoop :: forall e m . ( MyPeer e ) => DownloadEnv e -> m () postponedLoop env0 = do + withAsyncSupervisor "postponedLoop" \sup -> do e <- ask pause @'Seconds 2.57 - void $ liftIO $ async $ withPeerM e $ withDownload env0 do + void $ liftIO $ asyncStick sup $ withPeerM e $ withDownload env0 do q <- asks (view blockDelayTo) fix \next -> do w <- liftIO $ atomically $ readTQueue q diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs index 8a2e91a6..5d6388cb 100644 --- a/hbs2-peer/app/BlockHttpDownload.hs +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -5,6 +5,7 @@ module BlockHttpDownload where import HBS2.Actors.Peer import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Defaults @@ -30,7 +31,6 @@ import PeerInfo import BlockDownload import Brains -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index c4f60307..86bcf332 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -3,8 +3,10 @@ {-# Language TemplateHaskell #-} module Brains where +import HBS2.Prelude import HBS2.Prelude.Plated import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Data.Types.Refs import HBS2.Net.Proto.RefChan(ForRefChans) import HBS2.Net.Proto @@ -38,7 +40,6 @@ import System.Directory import System.FilePath import System.Random (randomRIO) import Text.InterpolatedString.Perl6 (qc) -import UnliftIO (MonadUnliftIO(..),async,race) data PeerBrainsDb @@ -809,13 +810,14 @@ runBasicBrains :: forall e m . ( e ~ L4Proto -> m () runBasicBrains cfg brains = do + withAsyncSupervisor "runBasicBrains" \sup -> do let pip = view brainsPipeline brains let expire = view brainsExpire brains let commit = view brainsCommit brains -- FIXME: async-error-handling - void $ liftIO $ async $ forever do + void $ liftIO $ asyncStick sup $ forever do ewaiters <- race (pause @'Seconds 5) $ do atomically $ do @@ -831,7 +833,7 @@ runBasicBrains cfg brains = do transactional brains (sequence_ (w:ws)) sequence_ waiters - void $ liftIO $ async $ forever do + void $ liftIO $ asyncStick sup $ forever do pause @'Seconds 60 updateOP brains (cleanupHashes brains) @@ -843,7 +845,7 @@ runBasicBrains cfg brains = do | ListVal @C (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn ] ) - void $ async $ do + void $ asyncStick sup $ do -- pause @'Seconds 5 forM_ polls $ \(t,mi,x) -> do trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi diff --git a/hbs2-peer/app/DownloadQ.hs b/hbs2-peer/app/DownloadQ.hs index e7e73dde..4b19218e 100644 --- a/hbs2-peer/app/DownloadQ.hs +++ b/hbs2-peer/app/DownloadQ.hs @@ -3,6 +3,7 @@ module DownloadQ where import HBS2.Prelude import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Hash import HBS2.Events import HBS2.Data.Types.Refs @@ -27,7 +28,6 @@ import Data.Functor import Data.Function import Control.Exception import Control.Monad -import Control.Concurrent.Async import System.IO @@ -46,9 +46,11 @@ downloadQueue :: forall e m . ( MyPeer e , HasPeerLocator e (BlockDownloadM e m) , HasPeerLocator e m , EventListener e (DownloadReq e) m + , MonadUnliftIO m ) => PeerConfig -> DownloadEnv e -> m () downloadQueue conf denv = do + withAsyncSupervisor "in downloadQueue" \sup -> do sto <- getStorage hq <- liftIO newTQueueIO @@ -62,7 +64,7 @@ downloadQueue conf denv = do liftIO $ atomically $ writeTQueue hq h maybe1 qfile' noLogFile $ \fn -> do - void $ liftIO $ async $ forever $ do + void $ liftIO $ asyncStick sup $ forever $ do pause @'Seconds 10 fromq <- liftIO $ atomically $ flushTQueue hq unless (null fromq) do diff --git a/hbs2-peer/app/EncryptionKeys.hs b/hbs2-peer/app/EncryptionKeys.hs index dd524d5d..622fdf37 100644 --- a/hbs2-peer/app/EncryptionKeys.hs +++ b/hbs2-peer/app/EncryptionKeys.hs @@ -23,7 +23,6 @@ import PeerConfig import PeerTypes import Codec.Serialise -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Reader diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 58598c3a..5ce81777 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -4,6 +4,7 @@ module PeerInfo where import HBS2.Actors.Peer import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Data.Types import HBS2.Events import HBS2.Net.Auth.Credentials @@ -22,7 +23,6 @@ import PeerConfig import PeerTypes import Brains -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Reader @@ -80,17 +80,18 @@ pexLoop :: forall e brains m . ( HasPeerLocator e m , HasNonces (PeerExchange e) m , Request e (PeerExchange e) m , Sessions e (PeerExchange e) m - , MonadIO m + , MonadUnliftIO m , e ~ L4Proto ) => brains -> Maybe MessagingTCP -> m () pexLoop brains tcpEnv = do + withAsyncSupervisor "pexLoop" \sup -> do pause @'Seconds 5 pl <- getPeerLocator @e - tcpPexInfo <- liftIO $ async $ forever do + tcpPexInfo <- liftIO $ asyncStick sup $ forever do -- FIXME: fix-hardcode pause @'Seconds 20 @@ -150,6 +151,7 @@ peerPingLoop :: forall e m . ( HasPeerLocator e m ) => PeerConfig -> PeerEnv e -> m () peerPingLoop cfg penv = do + withAsyncSupervisor "peerPingLoop" \sup -> do e <- ask @@ -171,7 +173,7 @@ peerPingLoop cfg penv = do -- TODO: peer info loop - infoLoop <- liftIO $ async $ forever $ withPeerM e $ do + infoLoop <- liftIO $ asyncStick sup $ forever $ withPeerM e $ do pause @'Seconds 10 pee <- knownPeers @e pl @@ -208,7 +210,7 @@ peerPingLoop cfg penv = do pure () - watch <- liftIO $ async $ forever $ withPeerM e $ do + watch <- liftIO $ asyncStick sup $ forever $ withPeerM e $ do pause @'Seconds 120 pips <- getKnownPeers @e now <- getTimeCoarse diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index df01d834..aa2bdbbd 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -10,6 +10,7 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Defaults import HBS2.Events import HBS2.Hash @@ -60,9 +61,9 @@ import PeerMain.PeerDialog import PeerMeta import CLI.RefChan import RefChan +import SignalHandlers import Codec.Serialise as Serialise --- import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception as Exception import Control.Monad.Reader @@ -100,7 +101,6 @@ import Text.InterpolatedString.Perl6 (qc) import UnliftIO.Exception qualified as U -- import UnliftIO.STM -import UnliftIO.Async as U import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S @@ -430,9 +430,8 @@ instance ( Monad m response = lift . response -respawn :: PeerOpts -> IO () -respawn opts = case view peerRespawn opts of - Just True -> do +respawn :: IO () +respawn = do let secs = 5 notice $ "RESPAWNING in" <+> viaShow secs <> "s" pause @'Seconds secs @@ -441,18 +440,34 @@ respawn opts = case view peerRespawn opts of print (self, args) executeFile self False args Nothing - _ -> exitFailure - runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) , s ~ Encryption e , HasStorage (PeerM e IO) ) => PeerOpts -> IO () -runPeer opts = U.handle (\e -> myException e - >> performGC - >> respawn opts - ) $ runResourceT do +runPeer opts = do + installSignalHandlers + + let h = case view peerRespawn opts of + Just True -> + Exception.handle (\e -> do + myException e + performGC + respawn + ) + _ -> id + + h (runPeer' opts) + + +runPeer' :: forall e s . ( e ~ L4Proto + , FromStringMaybe (PeerAddr e) + , s ~ Encryption e + , HasStorage (PeerM e IO) + ) => PeerOpts -> IO () + +runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do metrics <- liftIO newStore @@ -531,7 +546,7 @@ runPeer opts = U.handle (\e -> myException e let blk = liftIO . hasBlock s - w <- replicateM defStorageThreads $ async $ liftIO $ simpleStorageWorker s + w <- replicateM defStorageThreads $ asyncStick sup $ liftIO $ simpleStorageWorker s localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast) <&> fmap (fromSockAddr @'UDP . addrAddress) ) @@ -543,21 +558,21 @@ runPeer opts = U.handle (\e -> myException e mess <- newMessagingUDP False listenSa `orDie` "unable listen on the given addr" - udp <- async $ runMessagingUDP mess + udp <- asyncStick sup $ runMessagingUDP mess udp1 <- newMessagingUDP False rpcSa `orDie` "Can't start RPC listener" - mrpc <- async $ runMessagingUDP udp1 + mrpc <- asyncStick sup $ runMessagingUDP udp1 mcast <- newMessagingUDPMulticast defLocalMulticast `orDie` "Can't start RPC listener" - messMcast <- async $ runMessagingUDP mcast + messMcast <- asyncStick sup $ runMessagingUDP mcast brains <- newBasicBrains @e conf - brainsThread <- async $ runBasicBrains conf brains + brainsThread <- asyncStick sup $ runBasicBrains conf brains denv <- newDownloadEnv brains @@ -569,7 +584,7 @@ runPeer opts = U.handle (\e -> myException e tcp <- maybe1 addr' (pure Nothing) $ \addr -> do tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains) -- FIXME: handle-tcp-thread-somehow - void $ async $ runMessagingTCP tcpEnv + void $ asyncStick sup $ runMessagingTCP tcpEnv pure $ Just tcpEnv (proxy, penv) <- mdo @@ -605,13 +620,13 @@ runPeer opts = U.handle (\e -> myException e penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) pure (proxy, penv) - proxyThread <- async $ runProxyMessaging proxy + proxyThread <- asyncStick sup $ runProxyMessaging proxy let peerMeta = mkPeerMeta conf penv nbcache <- liftIO $ Cache.newCache (Just $ toTimeSpec ( 600 :: Timeout 'Seconds)) - void $ async $ forever do + void $ asyncStick sup $ forever do pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache @@ -645,7 +660,7 @@ runPeer opts = U.handle (\e -> myException e -- debug $ "onNoBlock" <+> pretty p <+> pretty h withPeerM penv $ withDownload denv (addDownload mzero h) - loop <- liftIO $ async do + loop <- liftIO $ asyncStick sup do runPeerM penv $ do adapter <- mkAdapter @@ -835,16 +850,19 @@ runPeer opts = U.handle (\e -> myException e doAddPeer p - void $ liftIO $ async $ withPeerM env do + void $ asyncStick sup $ withPeerM env do pause @'Seconds 1 debug "sending first peer announce" request localMulticast (PeerAnnounce @e pnonce) - let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do + let peerThread t mx = W.tell . L.singleton =<< (liftIO . asyncStick sup) do withPeerM env mx - `U.withException` \e -> case (fromException e) of - Just (e' :: AsyncCancelled) -> pure () - Nothing -> err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) + `U.withException` \e -> runMaybeT $ + selectException @AsyncCancelled e (\e' -> pure ()) + <|> selectException @ExceptionInLinkedThread e (\e' -> pure ()) + <|> lift do + err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) + debug $ "peerThread Finished:" <+> t workers <- W.execWriterT do @@ -1042,7 +1060,7 @@ runPeer opts = U.handle (\e -> myException e let peersAction _ = do who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do forKnownPeers @e $ \p pde -> do pa <- toPeerAddr p let k = view peerSignKey pde @@ -1051,7 +1069,7 @@ runPeer opts = U.handle (\e -> myException e let pexInfoAction :: RPC L4Proto -> ResponseM L4Proto (RpcM (ResourceT IO)) () pexInfoAction _ = do who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do -- FIXME: filter-pexinfo-entries ps <- getAllPex2Peers request who (RPCPexInfoAnswer @e ps) @@ -1079,20 +1097,20 @@ runPeer opts = U.handle (\e -> myException e -- let reflogFetchAction puk = do trace "reflogFetchAction" - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do broadCastMessage (RefLogRequest @e puk) let reflogGetAction puk = do trace $ "reflogGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do sto <- getStorage h <- liftIO $ getRef sto (RefLogKey @(Encryption e) puk) request who (RPCRefLogGetAnswer @e h) let refChanHeadSendAction h = do trace $ "refChanHeadSendAction" <+> pretty h - void $ liftIO $ async $ withPeerM penv $ do + void $ liftIO $ asyncStick sup $ withPeerM penv $ do me <- ownPeer @e sto <- getStorage @@ -1114,19 +1132,19 @@ runPeer opts = U.handle (\e -> myException e let refChanHeadGetAction puk = do trace $ "refChanHeadGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do sto <- getStorage h <- liftIO $ getRef sto (RefChanHeadKey @(Encryption e) puk) request who (RPCRefChanHeadGetAnsw @e h) let refChanHeadFetchAction puk = do trace "reChanFetchAction" - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do broadCastMessage (RefChanGetHead @e puk) let refChanProposeAction (puk, lbs) = do trace "reChanProposeAction" - void $ liftIO $ async $ withPeerM penv $ do + void $ liftIO $ asyncStick sup $ withPeerM penv $ do me <- ownPeer @e runMaybeT do box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just @@ -1142,7 +1160,7 @@ runPeer opts = U.handle (\e -> myException e let refChanNotifyAction (puk, lbs) = do trace "refChanNotifyAction" - void $ liftIO $ async $ withPeerM penv $ do + void $ liftIO $ asyncStick sup $ withPeerM penv $ do me <- ownPeer @e runMaybeT do box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just @@ -1151,7 +1169,7 @@ runPeer opts = U.handle (\e -> myException e let refChanGetAction puk = do trace $ "refChanGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) - void $ liftIO $ async $ withPeerM penv $ do + void $ asyncStick sup $ withPeerM penv $ do sto <- getStorage h <- liftIO $ getRef sto (RefChanLogKey @(Encryption e) puk) trace $ "refChanGetAction ANSWER IS" <+> pretty h @@ -1159,7 +1177,7 @@ runPeer opts = U.handle (\e -> myException e let refChanFetchAction puk = do trace $ "refChanFetchAction" <+> pretty (AsBase58 puk) - void $ liftIO $ async $ withPeerM penv $ do + void $ liftIO $ asyncStick sup $ withPeerM penv $ do gossip (RefChanRequest @e puk) let arpc = RpcAdapter @@ -1198,7 +1216,7 @@ runPeer opts = U.handle (\e -> myException e dialReqProtoAdapterRouter <- pure dialogRoutes pure DialReqProtoAdapter {..} - rpc <- async $ runRPC udp1 do + rpc <- asyncStick sup $ runRPC udp1 do runProto @e [ makeResponse (rpcHandler arpc) , makeResponse (dialReqProto dialReqProtoAdapter) @@ -1206,7 +1224,7 @@ runPeer opts = U.handle (\e -> myException e menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) - ann <- liftIO $ async $ runPeerM menv $ do + ann <- liftIO $ asyncStick sup $ runPeerM menv $ do self <- ownPeer @e @@ -1224,9 +1242,10 @@ runPeer opts = U.handle (\e -> myException e , makeResponse peerAnnounceProto ] - void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread] - - liftIO $ simpleStorageStop s + lift $ + (void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread]) + `finally` + (liftIO $ simpleStorageStop s) @@ -1240,4 +1259,3 @@ emitToPeer :: ( MonadIO m emitToPeer env k e = liftIO $ withPeerM env (emit k e) - diff --git a/hbs2-peer/app/PeerMeta.hs b/hbs2-peer/app/PeerMeta.hs index b596afeb..532b2a26 100644 --- a/hbs2-peer/app/PeerMeta.hs +++ b/hbs2-peer/app/PeerMeta.hs @@ -17,7 +17,6 @@ import HBS2.System.Logger.Simple import PeerTypes -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index e0251e31..85b5cab5 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -32,7 +32,6 @@ import PeerConfig import Prelude hiding (log) import Data.Foldable (for_) -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Writer qualified as W diff --git a/hbs2-peer/app/ProxyMessaging.hs b/hbs2-peer/app/ProxyMessaging.hs index e2cf8d52..f1f4afb2 100644 --- a/hbs2-peer/app/ProxyMessaging.hs +++ b/hbs2-peer/app/ProxyMessaging.hs @@ -9,6 +9,7 @@ module ProxyMessaging import HBS2.Prelude.Plated import HBS2.Net.Messaging import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Crypto import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition () @@ -27,7 +28,6 @@ import Crypto.Saltine.Core.Box qualified as Encrypt import Codec.Serialise import Control.Applicative import Control.Arrow hiding ((<+>)) -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue import Control.Monad.Trans.Maybe @@ -85,23 +85,25 @@ runProxyMessaging :: forall m . MonadIO m -> m () runProxyMessaging env = liftIO do + withAsyncSupervisor "runProxyMessaging" \sup -> do let udp = view proxyUDP env let answ = view proxyAnswers env let udpPeer = getOwnPeer udp - u <- async $ forever do + u <- asyncStick sup $ forever do msgs <- receive udp (To udpPeer) atomically $ do forM_ msgs $ writeTQueue answ - t <- async $ maybe1 (view proxyTCP env) none $ \tcp -> do + t <- asyncStick sup $ maybe1 (view proxyTCP env) none $ \tcp -> do forever do msgs <- receive tcp (To $ view tcpOwnPeer tcp) atomically $ do forM_ msgs $ writeTQueue answ - liftIO $ mapM_ waitCatch [u,t] + -- liftIO $ void $ waitAnyCatch [u,t] ??? + liftIO $ void $ waitAny [u,t] instance Messaging ProxyMessaging L4Proto LBS.ByteString where diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 12f806f9..c2c961b8 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -6,6 +6,7 @@ module RPC where import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Hash import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr @@ -35,7 +36,6 @@ import Lens.Micro.Platform import Network.Socket import System.Exit import System.IO -import UnliftIO.Async as U import Control.Concurrent.MVar data PeerRpcKey @@ -268,7 +268,7 @@ runRpcCommand opt = \case withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () -withRPC o cmd = rpcClientMain o $ runResourceT do +withRPC o cmd = rpcClientMain o $ runResourceT $ withAsyncSupervisor "withRPC" \sup -> do liftIO $ hSetBuffering stdout LineBuffering @@ -285,7 +285,7 @@ withRPC o cmd = rpcClientMain o $ runResourceT do udp1 <- newMessagingUDP False Nothing `orDie` "Can't start RPC" - mrpc <- async $ runMessagingUDP udp1 + mrpc <- asyncStick sup $ runMessagingUDP udp1 pingQ <- liftIO newTQueueIO @@ -332,9 +332,9 @@ withRPC o cmd = rpcClientMain o $ runResourceT do , rpcOnRefChanNotify = dontHandle } - prpc <- async $ runRPC udp1 do + prpc <- asyncStick sup $ runRPC udp1 do env <- ask - proto <- liftIO $ async $ continueWithRPC env $ do + proto <- liftIO $ asyncStick sup $ continueWithRPC env $ do runProto @L4Proto [ makeResponse (rpcHandler adapter) ] diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index f996fb49..a13442ce 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -17,6 +17,7 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Events @@ -252,6 +253,7 @@ refChanWorkerInitValidators :: forall e m . ( MonadIO m refChanWorkerInitValidators env = do + withAsyncSupervisor "refChanWorkerInitValidators" \sup -> do debug "refChanWorkerInitValidators" let (PeerConfig syn) = view refChanWorkerConf env @@ -271,7 +273,7 @@ refChanWorkerInitValidators env = do unless here do q <- newTQueueIO - val <- async $ validatorThread rc sa q + val <- asyncStick sup $ validatorThread sup rc sa q let rcv = RefChanValidator q val atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) @@ -281,22 +283,22 @@ refChanWorkerInitValidators env = do mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) -- FIXME: make-thread-respawning - validatorThread chan sa q = liftIO do + validatorThread sup chan sa q = liftIO do client <- newMessagingUnix False 1.0 sa - msg <- async $ runMessagingUnix client + msg <- asyncStick sup $ runMessagingUnix client -- FIXME: hardcoded-timeout waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) runValidateProtoM client do - poke <- async $ forever do + poke <- asyncStick sup $ forever do pause @'Seconds 10 mv <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) - z <- async $ runProto + z <- asyncStick sup $ runProto [ makeResponse (refChanValidateProto waiters) ] @@ -347,28 +349,29 @@ refChanWorker :: forall e s m . ( MonadIO m -> m () refChanWorker env brains = do + withAsyncSupervisor "refChanWorker" \sup -> do penv <- ask mergeQ <- newTQueueIO -- FIXME: resume-on-exception - hw <- async (refChanHeadMon penv) + hw <- asyncStick sup (refChanHeadMon penv) -- FIXME: insist-more-during-download -- что-то частая ситуация, когда блоки -- с трудом докачиваются. надо бы -- разобраться. возможно переделать -- механизм скачивания блоков - downloads <- async monitorHeadDownloads + downloads <- asyncStick sup monitorHeadDownloads - polls <- async refChanPoll + polls <- asyncStick sup refChanPoll - wtrans <- async refChanWriter + wtrans <- asyncStick sup refChanWriter - cleanup1 <- async cleanupRounds + cleanup1 <- asyncStick sup cleanupRounds - merge <- async (logMergeProcess env mergeQ) + merge <- asyncStick sup (logMergeProcess env mergeQ) sto <- getStorage diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index 97624708..affbc2ec 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -4,6 +4,7 @@ module RefLog where import HBS2.Prelude.Plated import HBS2.Clock +import HBS2.Concurrent.Supervisor import HBS2.Actors.Peer import HBS2.Events import HBS2.Data.Types.Refs @@ -30,6 +31,7 @@ import Data.Maybe import Data.Foldable(for_) import Data.Text qualified as Text import Control.Concurrent.STM +import Control.Exception qualified as Exception import Control.Monad import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS @@ -37,7 +39,6 @@ import Data.HashMap.Strict qualified as HashMap import Codec.Serialise import Data.HashSet qualified as HashSet import Data.HashSet (HashSet) -import Control.Concurrent.Async import Control.Monad.Trans.Maybe import Lens.Micro.Platform @@ -101,7 +102,7 @@ data RefLogWorkerAdapter e = , reflogFetch :: PubKey 'Sign (Encryption e) -> IO () } -reflogWorker :: forall e s m . ( MonadIO m, MyPeer e +reflogWorker :: forall e s m . ( MonadUnliftIO m, MyPeer e , EventListener e (RefLogUpdateEv e) m , EventListener e (RefLogRequestAnswer e) m -- , Request e (RefLogRequest e) (Peerm @@ -119,6 +120,7 @@ reflogWorker :: forall e s m . ( MonadIO m, MyPeer e -> m () reflogWorker conf adapter = do + withAsyncSupervisor "reflog worker" \supw -> do sto <- getStorage @@ -165,9 +167,9 @@ reflogWorker conf adapter = do here <- liftIO $ readTVarIO reflogMon <&> HashSet.member h unless here do liftIO $ atomically $ modifyTVar' reflogMon (HashSet.insert h) - void $ liftIO $ async $ do - timeout <- async (reflogTimeout reflog h) - work <- async $ do + void $ liftIO $ asyncStick supw $ do + timeout <- asyncStick supw (reflogTimeout reflog h) + work <- asyncStick supw $ do trace $ "reflog worker. GOT REFLOG ANSWER" <+> pretty (AsBase58 reflog) <+> pretty h reflogDownload adapter h fix \next -> do @@ -216,18 +218,64 @@ reflogWorker conf adapter = do let pollIntervals = HashMap.fromListWith (<>) [ (i, [r]) | (r,i) <- HashMap.toList polls ] & HashMap.toList + withAsyncSupervisor "reflog updater" \sup -> do + pollers <- + forM pollIntervals \(i,refs) -> liftIO do + asyncStick' sup "poller" $ do + pause @'Seconds 10 + forever $ do + for_ refs $ \r -> do + trace $ "POLL REFERENCE" <+> pretty (AsBase58 r) <+> pretty i <> "m" + reflogFetch adapter r + pause (fromIntegral i :: Timeout 'Minutes) - pollers' <- liftIO $ async $ do - pause @'Seconds 10 - forM pollIntervals $ \(i,refs) -> liftIO do - async $ forever $ do - for_ refs $ \r -> do - trace $ "POLL REFERENCE" <+> pretty (AsBase58 r) <+> pretty i <> "m" - reflogFetch adapter r + updaters <- replicateM 4 $ liftIO $ asyncStick' sup "updater" $ + (`Exception.finally` (err "reflog updater ended. HOW?!")) $ + (`withSomeExceptionIO` (\e -> err $ "REFLOG UPDATER:" <> viaShow e)) $ + forever $ do + pause @'Seconds 1 + reflogUpdater pQ sto + `withSomeExceptionIO` (\e -> err $ "reflog updater:" <> viaShow e) + -- `Exception.finally` (debug "reflog updater fin") + -- debug "reflog updater normally performed" - pause (fromIntegral i :: Timeout 'Minutes) + void $ liftIO $ waitAnyCatchCancel $ updaters <> pollers - w1 <- liftIO $ async $ forever $ replicateConcurrently_ 4 do + where + + missedEntries sto h = do + missed <- liftIO $ newTVarIO mempty + walkMerkle h (getBlock sto) $ \hr -> do + case hr of + Left ha -> do + atomically $ modifyTVar missed (ha:) + Right (hs :: [HashRef]) -> do + w <- mapM ( hasBlock sto . fromHashRef ) hs <&> fmap isJust + let mi = [ hx | (False,hx) <- zip w hs ] + for_ mi $ \hx -> liftIO $ atomically $ modifyTVar missed (fromHashRef hx:) + + liftIO $ readTVarIO missed + +readHashesFromBlock :: AnyStorage -> Maybe (Hash HbSync) -> IO [HashRef] +readHashesFromBlock _ Nothing = pure mempty +readHashesFromBlock sto (Just h) = do + treeQ <- liftIO newTQueueIO + walkMerkle h (getBlock sto) $ \hr -> do + case hr of + Left{} -> pure () + Right (hrr :: [HashRef]) -> atomically $ writeTQueue treeQ hrr + re <- liftIO $ atomically $ flushTQueue treeQ + pure $ mconcat re + +reflogUpdater :: forall e s . + ( Serialise (RefLogUpdate e) + , s ~ Encryption e + , IsRefPubKey s + , Pretty (AsBase58 (PubKey 'Sign s)) + ) + => TQueue (PubKey 'Sign s, [RefLogUpdate e]) -> AnyStorage -> IO () + +reflogUpdater pQ sto = do -- TODO: reflog-process-period-to-config -- pause @'Seconds 10 @@ -269,33 +317,3 @@ reflogWorker conf adapter = do trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty newRoot -- trace "I'm a reflog update worker" - - pollers <- liftIO $ wait pollers' - void $ liftIO $ waitAnyCatchCancel $ w1 : pollers - - where - - readHashesFromBlock _ Nothing = pure mempty - readHashesFromBlock sto (Just h) = do - treeQ <- liftIO newTQueueIO - walkMerkle h (getBlock sto) $ \hr -> do - case hr of - Left{} -> pure () - Right (hrr :: [HashRef]) -> atomically $ writeTQueue treeQ hrr - re <- liftIO $ atomically $ flushTQueue treeQ - pure $ mconcat re - - missedEntries sto h = do - missed <- liftIO $ newTVarIO mempty - walkMerkle h (getBlock sto) $ \hr -> do - case hr of - Left ha -> do - atomically $ modifyTVar missed (ha:) - Right (hs :: [HashRef]) -> do - w <- mapM ( hasBlock sto . fromHashRef ) hs <&> fmap isJust - let mi = [ hx | (False,hx) <- zip w hs ] - for_ mi $ \hx -> liftIO $ atomically $ modifyTVar missed (fromHashRef hx:) - - liftIO $ readTVarIO missed - - diff --git a/hbs2-peer/app/SignalHandlers.hs b/hbs2-peer/app/SignalHandlers.hs new file mode 100644 index 00000000..449c2f24 --- /dev/null +++ b/hbs2-peer/app/SignalHandlers.hs @@ -0,0 +1,25 @@ +module SignalHandlers where + +import Control.Exception (Exception, toException) +import Control.Monad +import System.Mem.Weak (deRefWeak) +import System.Posix.Signals +import UnliftIO.Concurrent + +newtype SignalException = SignalException Signal + deriving (Show) +instance Exception SignalException + +installSignalHandlers :: IO () +installSignalHandlers = do + main_thread_id <- myThreadId + weak_tid <- mkWeakThreadId main_thread_id + forM_ [ sigHUP, sigTERM, sigUSR1, sigUSR2, sigXCPU, sigXFSZ ] $ \sig -> + installHandler sig (Catch $ send_exception weak_tid sig) Nothing + where + send_exception weak_tid sig = do + m <- deRefWeak weak_tid + case m of + Nothing -> return () + Just tid -> throwTo tid (toException $ SignalException sig) + diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 0f951bdf..d1ab47d0 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -148,6 +148,7 @@ executable hbs2-peer , Brains , ProxyMessaging , CLI.RefChan + , SignalHandlers -- other-extensions: build-depends: base diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index 0931ed13..768fbb26 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -80,6 +80,8 @@ library , unordered-containers , temporary , filepattern + , unliftio + , unliftio-core hs-source-dirs: lib diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index d701456c..1bf491bd 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -9,13 +9,14 @@ module HBS2.Storage.Simple import HBS2.Clock import HBS2.Hash +import HBS2.Prelude import HBS2.Prelude.Plated import HBS2.Storage import HBS2.Base58 +import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple -import Control.Concurrent.Async import Control.Exception import Control.Monad import Control.Monad.Except @@ -164,14 +165,15 @@ simpleStorageStop ss = do simpleStorageWorker :: IsSimpleStorageKey h => SimpleStorage h -> IO () simpleStorageWorker ss = do + withAsyncSupervisor "in simpleStorageWorker" \sup -> do - ops <- async $ fix \next -> do + ops <- asyncStick sup $ fix \next -> do s <- atomically $ do TBMQ.readTBMQueue ( ss ^. storageOpQ ) case s of Nothing -> pure () Just a -> a >> next - killer <- async $ forever $ do + killer <- asyncStick sup $ forever $ do pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting simpleAddTask ss $ do @@ -184,7 +186,7 @@ simpleStorageWorker ss = do writeTVar ( ss ^. storageMMaped ) survived - killerLRU <- async $ forever $ do + killerLRU <- asyncStick sup $ forever $ do pause ( 10 :: Timeout 'Seconds ) -- FIXME: setting atomically $ writeTVar ( ss ^. storageMMapedLRU ) mempty diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index bbeee924..05d09022 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -21,7 +21,7 @@ import Data.ByteString.Char8 qualified as BS import System.FilePath import Data.Maybe import Control.Concurrent.STM -import Control.Concurrent.Async +-- import Control.Concurrent.Async import Control.Monad import Streaming.Prelude qualified as S diff --git a/hbs2-storage-simple/test/TestSimpleStorage.hs b/hbs2-storage-simple/test/TestSimpleStorage.hs index fadda6e4..abde354a 100644 --- a/hbs2-storage-simple/test/TestSimpleStorage.hs +++ b/hbs2-storage-simple/test/TestSimpleStorage.hs @@ -11,7 +11,6 @@ import Control.Monad.Except import Control.Monad import Data.Traversable import Data.Foldable -import Control.Concurrent.Async import Control.Concurrent import Data.ByteString.Lazy qualified as LBS import Data.Maybe diff --git a/hbs2-tests/test/TestTCP.hs b/hbs2-tests/test/TestTCP.hs index 08f81479..a4b8d7c4 100644 --- a/hbs2-tests/test/TestTCP.hs +++ b/hbs2-tests/test/TestTCP.hs @@ -16,7 +16,6 @@ import Control.Monad.Writer hiding (listen) import Test.Tasty.HUnit import Data.ByteString.Lazy (ByteString) -import Control.Concurrent.Async import Lens.Micro.Platform import Codec.Serialise diff --git a/hbs2-tests/test/TestTCPNet.hs b/hbs2-tests/test/TestTCPNet.hs index e57fcd37..b93292b1 100644 --- a/hbs2-tests/test/TestTCPNet.hs +++ b/hbs2-tests/test/TestTCPNet.hs @@ -16,7 +16,6 @@ import Control.Monad.Writer hiding (listen) import Test.Tasty.HUnit import Data.ByteString.Lazy (ByteString) -import Control.Concurrent.Async import Lens.Micro.Platform import Codec.Serialise import System.Environment diff --git a/hbs2/Main.hs b/hbs2/Main.hs index afd5d721..8aa08796 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -2,6 +2,7 @@ module Main where import HBS2.Base58 import HBS2.Data.Detect +import HBS2.Concurrent.Supervisor import HBS2.Data.Types import HBS2.Defaults import HBS2.Merkle @@ -18,7 +19,6 @@ import HBS2.OrDie import HBS2.System.Logger.Simple hiding (info) -import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Trans.Maybe @@ -356,7 +356,7 @@ runRefLogGet s ss = do exitSuccess withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () -withStore opts f = do +withStore opts f = withAsyncSupervisor "in withStore" \sup -> do setLogging @DEBUG debugPrefix setLogging @INFO defLog @@ -371,7 +371,7 @@ withStore opts f = do let pref = uniLastDef xdg opts :: StoragePrefix s <- simpleStorageInit (Just pref) - w <- replicateM 4 $ async $ simpleStorageWorker s + w <- replicateM 4 $ asyncStick sup $ simpleStorageWorker s f s diff --git a/hbs2/hbs2.cabal b/hbs2/hbs2.cabal index ec761066..9a6d0965 100644 --- a/hbs2/hbs2.cabal +++ b/hbs2/hbs2.cabal @@ -91,6 +91,8 @@ executable hbs2 , uuid , terminal-progress-bar , stm + , unliftio + , unliftio-core hs-source-dirs: . default-language: Haskell2010