From b7079c291540c5f9e14dd3377ed092fc2d695df6 Mon Sep 17 00:00:00 2001 From: Sergey Ivanov Date: Wed, 16 Aug 2023 16:50:16 +0400 Subject: [PATCH] Revert "PR CG2C18TK8v "account asyncs, respawn node on errors"" This reverts commit 8904704edcdbb7a74ec7ec9c8c5cd3632fdedf6d. --- docs/devlog.md | 1 - hbs2-core/hbs2-core.cabal | 1 - hbs2-core/lib/Dialog/Helpers/Streaming.hs | 1 + hbs2-core/lib/HBS2/Actors.hs | 1 + hbs2-core/lib/HBS2/Actors/Peer.hs | 13 +-- hbs2-core/lib/HBS2/Concurrent/Supervisor.hs | 78 ------------- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 20 ++-- hbs2-core/lib/HBS2/Net/Messaging/UDP.hs | 15 ++- hbs2-core/lib/HBS2/Net/Messaging/Unix.hs | 15 +-- hbs2-core/lib/HBS2/Prelude.hs | 11 +- hbs2-git/git-hbs2-http/GitHttpDumbMain.hs | 5 +- hbs2-git/lib/HBS2/Git/Local/CLI.hs | 1 + hbs2-git/lib/HBS2Git/App.hs | 1 + hbs2-peer/app/BlockDownload.hs | 16 ++- hbs2-peer/app/BlockHttpDownload.hs | 2 +- hbs2-peer/app/Brains.hs | 10 +- hbs2-peer/app/DownloadQ.hs | 6 +- hbs2-peer/app/EncryptionKeys.hs | 1 + hbs2-peer/app/PeerInfo.hs | 12 +- hbs2-peer/app/PeerMain.hs | 102 +++++++---------- hbs2-peer/app/PeerMeta.hs | 1 + hbs2-peer/app/PeerTypes.hs | 1 + hbs2-peer/app/ProxyMessaging.hs | 10 +- hbs2-peer/app/RPC.hs | 10 +- hbs2-peer/app/RefChan.hs | 25 ++--- hbs2-peer/app/RefLog.hs | 106 ++++++++---------- hbs2-peer/app/SignalHandlers.hs | 25 ----- hbs2-peer/hbs2-peer.cabal | 1 - hbs2-storage-simple/hbs2-storage-simple.cabal | 2 - .../lib/HBS2/Storage/Simple.hs | 10 +- .../lib/HBS2/Storage/Simple/Extra.hs | 2 +- hbs2-storage-simple/test/TestSimpleStorage.hs | 1 + hbs2-tests/test/TestTCP.hs | 1 + hbs2-tests/test/TestTCPNet.hs | 1 + hbs2/Main.hs | 6 +- hbs2/hbs2.cabal | 2 - 36 files changed, 178 insertions(+), 338 deletions(-) delete mode 100644 hbs2-core/lib/HBS2/Concurrent/Supervisor.hs delete mode 100644 hbs2-peer/app/SignalHandlers.hs diff --git a/docs/devlog.md b/docs/devlog.md index 8416977c..c90d3f94 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1319,4 +1319,3 @@ PR: bus-crypt Шифрование протокола общения нод. Обмен асимметричными публичными ключами выполняется на стадии хэндшейка в ping/pong. Для шифрования данных создаётся симметричный ключ по diffie-hellman. - diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index ca534cc9..b337620d 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -75,7 +75,6 @@ library , HBS2.Actors.Peer.Types , HBS2.Base58 , HBS2.Clock - , HBS2.Concurrent.Supervisor , HBS2.Crypto , HBS2.Data.Detect , HBS2.Data.Types diff --git a/hbs2-core/lib/Dialog/Helpers/Streaming.hs b/hbs2-core/lib/Dialog/Helpers/Streaming.hs index 7ab544a3..a38be2b6 100644 --- a/hbs2-core/lib/Dialog/Helpers/Streaming.hs +++ b/hbs2-core/lib/Dialog/Helpers/Streaming.hs @@ -10,6 +10,7 @@ import Streaming as S import Streaming.Internal import Streaming.Prelude (cons) import Streaming.Prelude qualified as S +import UnliftIO.Async import UnliftIO.STM import Prelude hiding (cons) diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index 028a58b4..0c303c7a 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -14,6 +14,7 @@ import Control.Concurrent.STM.TBMQueue qualified as TBMQ import Control.Concurrent.STM.TBMQueue (TBMQueue) import Control.Concurrent.STM.TVar qualified as TVar import Control.Monad +import Control.Concurrent.Async import Data.Function import Data.Functor import Data.Kind diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 867f5acc..bf82a7e7 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -21,14 +21,13 @@ import HBS2.Net.PeerLocator import HBS2.Net.PeerLocator.Static import HBS2.Net.Proto import HBS2.Net.Proto.Sessions -import HBS2.Prelude import HBS2.Prelude.Plated import HBS2.Storage import HBS2.System.Logger.Simple -import HBS2.Concurrent.Supervisor import Control.Applicative import Control.Monad.Trans.Maybe +import Control.Concurrent.Async import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import Data.Cache (Cache) @@ -440,7 +439,7 @@ newPeerEnv s bus p = do _envEncryptionKeys <- liftIO (newTVarIO mempty) pure PeerEnv {..} -runPeerM :: forall e m . ( MonadUnliftIO m +runPeerM :: forall e m . ( MonadIO m , HasPeer e , Ord (Peer e) , Pretty (Peer e) @@ -450,12 +449,12 @@ runPeerM :: forall e m . ( MonadUnliftIO m -> PeerM e m () -> m () -runPeerM env f = withAsyncSupervisor "runPeerM" \sup -> do +runPeerM env f = do let de = view envDeferred env - as <- liftIO $ replicateM 8 $ asyncStick' sup "runPipeline" $ runPipeline de + as <- liftIO $ replicateM 8 $ async $ runPipeline de - sw <- liftIO $ asyncStick' sup "sweeps" $ forever $ withPeerM env $ do + sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout se <- asks (view envSessions) liftIO $ Cache.purgeExpired se @@ -463,7 +462,7 @@ runPeerM env f = withAsyncSupervisor "runPeerM" \sup -> do void $ runReaderT (fromPeerM f) env void $ liftIO $ stopPipeline de - -- liftIO $ mapM_ cancel (as <> [sw]) + liftIO $ mapM_ cancel (as <> [sw]) withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a withPeerM env action = runReaderT (fromPeerM action) env diff --git a/hbs2-core/lib/HBS2/Concurrent/Supervisor.hs b/hbs2-core/lib/HBS2/Concurrent/Supervisor.hs deleted file mode 100644 index acbfa0be..00000000 --- a/hbs2-core/lib/HBS2/Concurrent/Supervisor.hs +++ /dev/null @@ -1,78 +0,0 @@ -module HBS2.Concurrent.Supervisor -( module HBS2.Concurrent.Supervisor -, module X -) where - -import Control.Arrow hiding ((<+>)) -import Control.Concurrent.Async qualified as Async -import Control.Monad -import Control.Monad.IO.Class -import Control.Monad.Trans.Class -import Control.Monad.Trans.Maybe -import Data.Proxy -import Data.Text (Text) -import Prettyprinter -import System.IO (Handle) -import UnliftIO (MonadUnliftIO(..)) -import UnliftIO.Async -import UnliftIO.Async as X hiding (async) -import UnliftIO.Concurrent -import UnliftIO.Exception - -import HBS2.System.Logger.Simple - - -data Sup = Sup - { supAsync :: Async () - } - -data SupFinished = SupFinished Text - deriving (Show) -instance Exception SupFinished - -withAsyncSupervisor :: (MonadUnliftIO io) => Text -> (Sup -> io a) -> io a -withAsyncSupervisor name k = - bracket - (Sup <$> async (forever (threadDelay (10^9)))) - (flip throwTo (SupFinished name) . asyncThreadId . supAsync) - (\sup -> (k sup) - `withException` \(e :: SomeException) -> do - debug $ "Finished sup " <> pretty name <> " " <> viaShow e - ) - -asyncStick :: MonadUnliftIO m => Sup -> m a -> m (Async a) -asyncStick sup ioa = do - a <- async ioa - liftIO $ Async.link2Only (const True) (supAsync sup) a - pure a - -asyncStick' :: MonadUnliftIO m => Sup -> Text -> m a -> m (Async a) -asyncStick' sup name ioa = do - a <- async $ - ioa - `withException` \(e :: SomeException) -> - debug $ "finished async" <+> pretty name <+> ":" <+> viaShow e - liftIO $ Async.link2Only (const True) (supAsync sup) a - pure a - - -selectException_ :: forall e m . (Exception e, Monad m) - => Proxy e -> SomeException -> MaybeT m () -selectException_ _ = fromException >>> \case - Nothing -> MaybeT (pure Nothing) - Just (e :: e) -> pure () - -selectException :: forall e m . (Exception e, Monad m) - => SomeException -> (e -> m ()) -> MaybeT m () -selectException e f = case (fromException e) of - Nothing -> MaybeT (pure Nothing) - Just e' -> lift (f e') - -withExceptionIO :: Exception e => IO a -> (e -> IO b) -> IO a -withExceptionIO io what = io `catch` \e -> do - _ <- what e - throwIO e - -withSomeExceptionIO :: IO a -> (SomeException -> IO b) -> IO a -withSomeExceptionIO = withExceptionIO - diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index fbc3da7f..4cb28d6b 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -14,7 +14,6 @@ import HBS2.Net.IP.Addr import HBS2.Net.Messaging import HBS2.Net.Proto.Types import HBS2.Prelude.Plated -import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple @@ -41,6 +40,7 @@ import Streaming.Prelude qualified as S import System.Random hiding (next) import Control.Monad.Trans.Resource +import UnliftIO.Async import UnliftIO.STM import UnliftIO.Exception qualified as U @@ -245,7 +245,7 @@ spawnConnection tp env so sa = liftIO do when ( used <= 2 ) do atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) - when (used == 1) $ withAsyncSupervisor "in spawnConnection" \sup -> do + when (used == 1) do q <- getWriteQueue connId updatePeer connId newP @@ -254,7 +254,7 @@ spawnConnection tp env so sa = liftIO do <+> pretty newP <+> parens ("used:" <+> pretty used) - rd <- asyncStick sup $ fix \next -> do + rd <- async $ fix \next -> do spx <- readFromSocket so 4 <&> LBS.toStrict ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг @@ -276,7 +276,7 @@ spawnConnection tp env so sa = liftIO do next - wr <- asyncStick sup $ fix \next -> do + wr <- async $ fix \next -> do (rcpt, bs) <- atomically $ readTQueue q pq <- makeReqId rcpt @@ -364,14 +364,14 @@ connectPeerTCP env peer = liftIO do -- FIXME: link-all-asyncs runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () -runMessagingTCP env = liftIO $ withAsyncSupervisor "in runMessagingTCP" \sup -> do +runMessagingTCP env = liftIO do own <- toPeerAddr $ view tcpOwnPeer env let (L4Address _ (IPAddrPort (i,p))) = own let defs = view tcpDefer env - mon <- asyncStick sup $ forever do + mon <- async $ forever do pause @'Seconds 30 now <- getTimeCoarse @@ -384,7 +384,7 @@ runMessagingTCP env = liftIO $ withAsyncSupervisor "in runMessagingTCP" \sup -> [] -> Nothing xs -> Just xs - con <- asyncStick sup $ forever do + con <- async $ forever do let ev = view tcpDeferEv env @@ -408,7 +408,7 @@ runMessagingTCP env = liftIO $ withAsyncSupervisor "in runMessagingTCP" \sup -> co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip - maybe1 co' (void $ asyncStick sup (connectPeerTCP env pip)) $ \co -> do + maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co maybe1 q' none $ \q -> do atomically do @@ -418,7 +418,7 @@ runMessagingTCP env = liftIO $ withAsyncSupervisor "in runMessagingTCP" \sup -> pure () - stat <- asyncStick sup $ forever do + stat <- async $ forever do pause @'Seconds 120 ps <- readTVarIO $ view tcpConnPeer env let peers = HashMap.toList ps @@ -429,6 +429,8 @@ runMessagingTCP env = liftIO $ withAsyncSupervisor "in runMessagingTCP" \sup -> <+> pretty c <+> parens ("used:" <+> pretty used) + mapM_ link [mon,con,stat] + liftIO ( listen (Host (show i)) (show p) $ \(sock, sa) -> do withFdSocket sock setCloseOnExecIfNeeded diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 5d8b4b12..e47789fb 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -7,13 +7,13 @@ import HBS2.Net.IP.Addr import HBS2.Net.Messaging import HBS2.Net.Proto import HBS2.Prelude.Plated -import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple import Data.Function import Control.Exception import Control.Monad.Trans.Maybe +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Monad @@ -108,11 +108,11 @@ newMessagingUDP reuse saddr = udpWorker :: MessagingUDP -> TVar Socket -> IO () -udpWorker env tso = withAsyncSupervisor "in udpWorker" \sup -> do +udpWorker env tso = do so <- readTVarIO tso - rcvLoop <- asyncStick sup $ forever $ do + rcvLoop <- async $ forever $ do -- so <- readTVarIO tso -- pause ( 10 :: Timeout 'Seconds ) (msg, from) <- recvFrom so defMaxDatagram @@ -120,7 +120,7 @@ udpWorker env tso = withAsyncSupervisor "in udpWorker" \sup -> do -- FIXME: ASAP-check-addr-type liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg) - sndLoop <- asyncStick sup $ forever $ do + sndLoop <- async $ forever $ do pause ( 10 :: Timeout 'Seconds ) -- (To whom, msg) <- atomically $ Q0.readTQueue (inbox env) -- print "YAY!" @@ -135,16 +135,15 @@ udpWorker env tso = withAsyncSupervisor "in udpWorker" \sup -> do -- FIXME: stopping runMessagingUDP :: MonadIO m => MessagingUDP -> m () -runMessagingUDP udpMess = liftIO $ withAsyncSupervisor "in runMessagingUDP" \sup -> do +runMessagingUDP udpMess = liftIO $ do let addr = listenAddr udpMess so <- readTVarIO (sock udpMess) unless (mcast udpMess) $ do bind so addr - w <- asyncStick sup $ udpWorker udpMess (sock udpMess) - wait w - -- waitCatch w >>= either throwIO (const $ pure ()) + w <- async $ udpWorker udpMess (sock udpMess) + waitCatch w >>= either throwIO (const $ pure ()) instance Messaging MessagingUDP L4Proto ByteString where diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index 79fbd9fe..b1b948d8 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -4,7 +4,6 @@ import HBS2.Prelude.Plated import HBS2.Net.Proto.Types import HBS2.Net.Messaging import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple @@ -107,8 +106,7 @@ runMessagingUnix env = do where - runServer = forever $ handleAny cleanupAndRetry $ runResourceT $ - withAsyncSupervisor "runServer" \sup -> do + runServer = forever $ handleAny cleanupAndRetry $ runResourceT do t0 <- getTimeCoarse atomically $ writeTVar (msgUnixLast env) t0 @@ -120,7 +118,7 @@ runMessagingUnix env = do liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) liftIO $ listen sock 1 - watchdog <- asyncStick sup $ do + watchdog <- async $ do let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] @@ -141,14 +139,14 @@ runMessagingUnix env = do when ( acc > 0 && diff >= toNanoSeconds (TimeoutSec $ realToFrac wd) ) do throwIO ReadTimeoutException - run <- asyncStick sup $ forever $ runResourceT do + run <- async $ forever $ runResourceT do (so, sa) <- liftIO $ accept sock atomically $ modifyTVar (msgUnixAccepts env) succ void $ allocate (pure so) close - writer <- asyncStick sup $ forever do + writer <- async $ forever do msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) let len = fromIntegral $ LBS.length msg :: Int liftIO $ sendAll so $ bytestring32 (fromIntegral len) @@ -174,8 +172,7 @@ runMessagingUnix env = do Right{} -> pure () - runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT $ - withAsyncSupervisor "runClient" \sup -> do + runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do sock <- liftIO $ socket AF_UNIX Stream defaultProtocol @@ -194,7 +191,7 @@ runMessagingUnix env = do attemptConnect - reader <- asyncStick sup $ forever do + reader <- async $ forever do -- Read response from server frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral frame <- liftIO $ recv sock frameLen diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index e2024f48..1273cd07 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -24,20 +24,16 @@ import GHC.Generics as X (Generic) import Data.ByteString (ByteString) import Data.String (IsString(..)) import Safe -import Control.Concurrent.Async as X (ExceptionInLinkedThread) import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad (void,guard,when,unless) import Control.Monad.Trans.Class (lift) -import Control.Monad.IO.Unlift as X -import Data.Char qualified as Char import Data.Function -import Data.Hashable +import Data.Char qualified as Char import Data.Text qualified as Text -import Data.Word +import Data.Hashable import Prettyprinter -import UnliftIO as X (MonadUnliftIO(..)) -import UnliftIO.Async as X +import Data.Word none :: forall m . Monad m => m () none = pure () @@ -66,4 +62,3 @@ class ToByteString a where class FromByteString a where fromByteString :: ByteString -> Maybe a - diff --git a/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs b/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs index bb01edc1..53537a2b 100644 --- a/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs +++ b/hbs2-git/git-hbs2-http/GitHttpDumbMain.hs @@ -2,7 +2,6 @@ module Main where import HBS2.Prelude.Plated import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2Git.App import HBS2Git.State @@ -36,6 +35,7 @@ import System.FilePath.Posix import System.IO.Temp import System.Timeout (timeout) import Text.InterpolatedString.Perl6 (qc) +import UnliftIO.Async import Streaming.ByteString qualified as SB import Streaming.Zip qualified as SZip @@ -107,7 +107,6 @@ retryFor num waity sleep action = timeout (ceiling $ waity * 1000000) $ go num dumbHttpServe :: MonadUnliftIO m => Port -> m () dumbHttpServe pnum = do - withAsyncSupervisor "dumbHttpServe" \sup -> do locks <- liftIO $ newMVar (HashMap.empty @HashRef @(MVar ())) @@ -122,7 +121,7 @@ dumbHttpServe pnum = do -- с логом, тогда в следующий раз будет обратно -- распакован - updater <- asyncStick sup $ forever do + updater <- async $ forever do pause @'Seconds 300 pure () diff --git a/hbs2-git/lib/HBS2/Git/Local/CLI.hs b/hbs2-git/lib/HBS2/Git/Local/CLI.hs index a9ebe18f..0e4abb00 100644 --- a/hbs2-git/lib/HBS2/Git/Local/CLI.hs +++ b/hbs2-git/lib/HBS2/Git/Local/CLI.hs @@ -11,6 +11,7 @@ import HBS2.Git.Types import HBS2.System.Logger.Simple +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Writer import Data.HashSet (HashSet) diff --git a/hbs2-git/lib/HBS2Git/App.hs b/hbs2-git/lib/HBS2Git/App.hs index 656a0a1e..223ff20b 100644 --- a/hbs2-git/lib/HBS2Git/App.hs +++ b/hbs2-git/lib/HBS2Git/App.hs @@ -49,6 +49,7 @@ import Data.Text qualified as Text import Data.IORef import System.IO.Unsafe (unsafePerformIO) import Data.Cache qualified as Cache +import Control.Concurrent.Async import System.Environment import Prettyprinter.Render.Terminal diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 83b740f6..1f4acf23 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -5,7 +5,6 @@ module BlockDownload where import HBS2.Actors.Peer import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Defaults @@ -26,6 +25,7 @@ import PeerTypes import PeerInfo import Brains +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TSem import Control.Monad.Reader @@ -418,7 +418,6 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO ) => DownloadEnv e -> m () blockDownloadLoop env0 = do - withAsyncSupervisor "blockDownloadLoop" \sup -> do e <- ask @@ -430,7 +429,7 @@ blockDownloadLoop env0 = do let withAllStuff = withPeerM e . withDownload env0 - void $ liftIO $ asyncStick sup $ forever $ withPeerM e do + void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 30 pee <- knownPeers @e pl @@ -441,7 +440,7 @@ blockDownloadLoop env0 = do liftIO $ atomically $ writeTVar (view peerBurstMax pinfo) Nothing - void $ liftIO $ asyncStick sup $ forever $ withPeerM e do + void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 1.5 pee <- knownPeers @e pl @@ -452,7 +451,7 @@ blockDownloadLoop env0 = do updatePeerInfo False p pinfo - void $ liftIO $ asyncStick sup $ forever $ withAllStuff do + void $ liftIO $ async $ forever $ withAllStuff do pause @'Seconds 5 -- FIXME: put to defaults -- we need to show download stats @@ -508,7 +507,7 @@ blockDownloadLoop env0 = do liftIO $ atomically $ do modifyTVar busyPeers (HashSet.insert p) - void $ liftIO $ asyncStick sup $ withAllStuff do + void $ liftIO $ async $ withAllStuff do -- trace $ "start downloading shit" <+> pretty p <+> pretty h @@ -563,7 +562,7 @@ blockDownloadLoop env0 = do proposed <- asks (view blockProposed) - void $ liftIO $ asyncStick sup $ forever do + void $ liftIO $ async $ forever do pause @'Seconds 20 -- debug "block download loop. does not do anything" liftIO $ Cache.purgeExpired proposed @@ -579,12 +578,11 @@ postponedLoop :: forall e m . ( MyPeer e ) => DownloadEnv e -> m () postponedLoop env0 = do - withAsyncSupervisor "postponedLoop" \sup -> do e <- ask pause @'Seconds 2.57 - void $ liftIO $ asyncStick sup $ withPeerM e $ withDownload env0 do + void $ liftIO $ async $ withPeerM e $ withDownload env0 do q <- asks (view blockDelayTo) fix \next -> do w <- liftIO $ atomically $ readTQueue q diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs index 5d6388cb..8a2e91a6 100644 --- a/hbs2-peer/app/BlockHttpDownload.hs +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -5,7 +5,6 @@ module BlockHttpDownload where import HBS2.Actors.Peer import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Defaults @@ -31,6 +30,7 @@ import PeerInfo import BlockDownload import Brains +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 86bcf332..c4f60307 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -3,10 +3,8 @@ {-# Language TemplateHaskell #-} module Brains where -import HBS2.Prelude import HBS2.Prelude.Plated import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Data.Types.Refs import HBS2.Net.Proto.RefChan(ForRefChans) import HBS2.Net.Proto @@ -40,6 +38,7 @@ import System.Directory import System.FilePath import System.Random (randomRIO) import Text.InterpolatedString.Perl6 (qc) +import UnliftIO (MonadUnliftIO(..),async,race) data PeerBrainsDb @@ -810,14 +809,13 @@ runBasicBrains :: forall e m . ( e ~ L4Proto -> m () runBasicBrains cfg brains = do - withAsyncSupervisor "runBasicBrains" \sup -> do let pip = view brainsPipeline brains let expire = view brainsExpire brains let commit = view brainsCommit brains -- FIXME: async-error-handling - void $ liftIO $ asyncStick sup $ forever do + void $ liftIO $ async $ forever do ewaiters <- race (pause @'Seconds 5) $ do atomically $ do @@ -833,7 +831,7 @@ runBasicBrains cfg brains = do transactional brains (sequence_ (w:ws)) sequence_ waiters - void $ liftIO $ asyncStick sup $ forever do + void $ liftIO $ async $ forever do pause @'Seconds 60 updateOP brains (cleanupHashes brains) @@ -845,7 +843,7 @@ runBasicBrains cfg brains = do | ListVal @C (Key "poll" [SymbolVal tp, LitIntVal n, LitStrVal ref]) <- syn ] ) - void $ asyncStick sup $ do + void $ async $ do -- pause @'Seconds 5 forM_ polls $ \(t,mi,x) -> do trace $ "BRAINS: poll" <+> pretty t <+> pretty (AsBase58 x) <+> pretty mi diff --git a/hbs2-peer/app/DownloadQ.hs b/hbs2-peer/app/DownloadQ.hs index 4b19218e..e7e73dde 100644 --- a/hbs2-peer/app/DownloadQ.hs +++ b/hbs2-peer/app/DownloadQ.hs @@ -3,7 +3,6 @@ module DownloadQ where import HBS2.Prelude import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Hash import HBS2.Events import HBS2.Data.Types.Refs @@ -28,6 +27,7 @@ import Data.Functor import Data.Function import Control.Exception import Control.Monad +import Control.Concurrent.Async import System.IO @@ -46,11 +46,9 @@ downloadQueue :: forall e m . ( MyPeer e , HasPeerLocator e (BlockDownloadM e m) , HasPeerLocator e m , EventListener e (DownloadReq e) m - , MonadUnliftIO m ) => PeerConfig -> DownloadEnv e -> m () downloadQueue conf denv = do - withAsyncSupervisor "in downloadQueue" \sup -> do sto <- getStorage hq <- liftIO newTQueueIO @@ -64,7 +62,7 @@ downloadQueue conf denv = do liftIO $ atomically $ writeTQueue hq h maybe1 qfile' noLogFile $ \fn -> do - void $ liftIO $ asyncStick sup $ forever $ do + void $ liftIO $ async $ forever $ do pause @'Seconds 10 fromq <- liftIO $ atomically $ flushTQueue hq unless (null fromq) do diff --git a/hbs2-peer/app/EncryptionKeys.hs b/hbs2-peer/app/EncryptionKeys.hs index 622fdf37..dd524d5d 100644 --- a/hbs2-peer/app/EncryptionKeys.hs +++ b/hbs2-peer/app/EncryptionKeys.hs @@ -23,6 +23,7 @@ import PeerConfig import PeerTypes import Codec.Serialise +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Reader diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 5ce81777..58598c3a 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -4,7 +4,6 @@ module PeerInfo where import HBS2.Actors.Peer import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Data.Types import HBS2.Events import HBS2.Net.Auth.Credentials @@ -23,6 +22,7 @@ import PeerConfig import PeerTypes import Brains +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Reader @@ -80,18 +80,17 @@ pexLoop :: forall e brains m . ( HasPeerLocator e m , HasNonces (PeerExchange e) m , Request e (PeerExchange e) m , Sessions e (PeerExchange e) m - , MonadUnliftIO m + , MonadIO m , e ~ L4Proto ) => brains -> Maybe MessagingTCP -> m () pexLoop brains tcpEnv = do - withAsyncSupervisor "pexLoop" \sup -> do pause @'Seconds 5 pl <- getPeerLocator @e - tcpPexInfo <- liftIO $ asyncStick sup $ forever do + tcpPexInfo <- liftIO $ async $ forever do -- FIXME: fix-hardcode pause @'Seconds 20 @@ -151,7 +150,6 @@ peerPingLoop :: forall e m . ( HasPeerLocator e m ) => PeerConfig -> PeerEnv e -> m () peerPingLoop cfg penv = do - withAsyncSupervisor "peerPingLoop" \sup -> do e <- ask @@ -173,7 +171,7 @@ peerPingLoop cfg penv = do -- TODO: peer info loop - infoLoop <- liftIO $ asyncStick sup $ forever $ withPeerM e $ do + infoLoop <- liftIO $ async $ forever $ withPeerM e $ do pause @'Seconds 10 pee <- knownPeers @e pl @@ -210,7 +208,7 @@ peerPingLoop cfg penv = do pure () - watch <- liftIO $ asyncStick sup $ forever $ withPeerM e $ do + watch <- liftIO $ async $ forever $ withPeerM e $ do pause @'Seconds 120 pips <- getKnownPeers @e now <- getTimeCoarse diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index aa2bdbbd..df01d834 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -10,7 +10,6 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Defaults import HBS2.Events import HBS2.Hash @@ -61,9 +60,9 @@ import PeerMain.PeerDialog import PeerMeta import CLI.RefChan import RefChan -import SignalHandlers import Codec.Serialise as Serialise +-- import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception as Exception import Control.Monad.Reader @@ -101,6 +100,7 @@ import Text.InterpolatedString.Perl6 (qc) import UnliftIO.Exception qualified as U -- import UnliftIO.STM +import UnliftIO.Async as U import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S @@ -430,8 +430,9 @@ instance ( Monad m response = lift . response -respawn :: IO () -respawn = do +respawn :: PeerOpts -> IO () +respawn opts = case view peerRespawn opts of + Just True -> do let secs = 5 notice $ "RESPAWNING in" <+> viaShow secs <> "s" pause @'Seconds secs @@ -440,34 +441,18 @@ respawn = do print (self, args) executeFile self False args Nothing + _ -> exitFailure + runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) , s ~ Encryption e , HasStorage (PeerM e IO) ) => PeerOpts -> IO () -runPeer opts = do - installSignalHandlers - - let h = case view peerRespawn opts of - Just True -> - Exception.handle (\e -> do - myException e - performGC - respawn - ) - _ -> id - - h (runPeer' opts) - - -runPeer' :: forall e s . ( e ~ L4Proto - , FromStringMaybe (PeerAddr e) - , s ~ Encryption e - , HasStorage (PeerM e IO) - ) => PeerOpts -> IO () - -runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do +runPeer opts = U.handle (\e -> myException e + >> performGC + >> respawn opts + ) $ runResourceT do metrics <- liftIO newStore @@ -546,7 +531,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let blk = liftIO . hasBlock s - w <- replicateM defStorageThreads $ asyncStick sup $ liftIO $ simpleStorageWorker s + w <- replicateM defStorageThreads $ async $ liftIO $ simpleStorageWorker s localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast) <&> fmap (fromSockAddr @'UDP . addrAddress) ) @@ -558,21 +543,21 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do mess <- newMessagingUDP False listenSa `orDie` "unable listen on the given addr" - udp <- asyncStick sup $ runMessagingUDP mess + udp <- async $ runMessagingUDP mess udp1 <- newMessagingUDP False rpcSa `orDie` "Can't start RPC listener" - mrpc <- asyncStick sup $ runMessagingUDP udp1 + mrpc <- async $ runMessagingUDP udp1 mcast <- newMessagingUDPMulticast defLocalMulticast `orDie` "Can't start RPC listener" - messMcast <- asyncStick sup $ runMessagingUDP mcast + messMcast <- async $ runMessagingUDP mcast brains <- newBasicBrains @e conf - brainsThread <- asyncStick sup $ runBasicBrains conf brains + brainsThread <- async $ runBasicBrains conf brains denv <- newDownloadEnv brains @@ -584,7 +569,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do tcp <- maybe1 addr' (pure Nothing) $ \addr -> do tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains) -- FIXME: handle-tcp-thread-somehow - void $ asyncStick sup $ runMessagingTCP tcpEnv + void $ async $ runMessagingTCP tcpEnv pure $ Just tcpEnv (proxy, penv) <- mdo @@ -620,13 +605,13 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) pure (proxy, penv) - proxyThread <- asyncStick sup $ runProxyMessaging proxy + proxyThread <- async $ runProxyMessaging proxy let peerMeta = mkPeerMeta conf penv nbcache <- liftIO $ Cache.newCache (Just $ toTimeSpec ( 600 :: Timeout 'Seconds)) - void $ asyncStick sup $ forever do + void $ async $ forever do pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache @@ -660,7 +645,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do -- debug $ "onNoBlock" <+> pretty p <+> pretty h withPeerM penv $ withDownload denv (addDownload mzero h) - loop <- liftIO $ asyncStick sup do + loop <- liftIO $ async do runPeerM penv $ do adapter <- mkAdapter @@ -850,19 +835,16 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do doAddPeer p - void $ asyncStick sup $ withPeerM env do + void $ liftIO $ async $ withPeerM env do pause @'Seconds 1 debug "sending first peer announce" request localMulticast (PeerAnnounce @e pnonce) - let peerThread t mx = W.tell . L.singleton =<< (liftIO . asyncStick sup) do + let peerThread t mx = W.tell . L.singleton =<< (liftIO . async) do withPeerM env mx - `U.withException` \e -> runMaybeT $ - selectException @AsyncCancelled e (\e' -> pure ()) - <|> selectException @ExceptionInLinkedThread e (\e' -> pure ()) - <|> lift do - err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) - + `U.withException` \e -> case (fromException e) of + Just (e' :: AsyncCancelled) -> pure () + Nothing -> err ("peerThread" <+> viaShow t <+> "Failed with" <+> viaShow e) debug $ "peerThread Finished:" <+> t workers <- W.execWriterT do @@ -1060,7 +1042,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let peersAction _ = do who <- thatPeer (Proxy @(RPC e)) - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do forKnownPeers @e $ \p pde -> do pa <- toPeerAddr p let k = view peerSignKey pde @@ -1069,7 +1051,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let pexInfoAction :: RPC L4Proto -> ResponseM L4Proto (RpcM (ResourceT IO)) () pexInfoAction _ = do who <- thatPeer (Proxy @(RPC e)) - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do -- FIXME: filter-pexinfo-entries ps <- getAllPex2Peers request who (RPCPexInfoAnswer @e ps) @@ -1097,20 +1079,20 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do -- let reflogFetchAction puk = do trace "reflogFetchAction" - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do broadCastMessage (RefLogRequest @e puk) let reflogGetAction puk = do trace $ "reflogGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do sto <- getStorage h <- liftIO $ getRef sto (RefLogKey @(Encryption e) puk) request who (RPCRefLogGetAnswer @e h) let refChanHeadSendAction h = do trace $ "refChanHeadSendAction" <+> pretty h - void $ liftIO $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do me <- ownPeer @e sto <- getStorage @@ -1132,19 +1114,19 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let refChanHeadGetAction puk = do trace $ "refChanHeadGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do sto <- getStorage h <- liftIO $ getRef sto (RefChanHeadKey @(Encryption e) puk) request who (RPCRefChanHeadGetAnsw @e h) let refChanHeadFetchAction puk = do trace "reChanFetchAction" - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do broadCastMessage (RefChanGetHead @e puk) let refChanProposeAction (puk, lbs) = do trace "reChanProposeAction" - void $ liftIO $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do me <- ownPeer @e runMaybeT do box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just @@ -1160,7 +1142,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let refChanNotifyAction (puk, lbs) = do trace "refChanNotifyAction" - void $ liftIO $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do me <- ownPeer @e runMaybeT do box <- MaybeT $ pure $ deserialiseOrFail lbs & either (const Nothing) Just @@ -1169,7 +1151,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let refChanGetAction puk = do trace $ "refChanGetAction" <+> pretty (AsBase58 puk) who <- thatPeer (Proxy @(RPC e)) - void $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do sto <- getStorage h <- liftIO $ getRef sto (RefChanLogKey @(Encryption e) puk) trace $ "refChanGetAction ANSWER IS" <+> pretty h @@ -1177,7 +1159,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do let refChanFetchAction puk = do trace $ "refChanFetchAction" <+> pretty (AsBase58 puk) - void $ liftIO $ asyncStick sup $ withPeerM penv $ do + void $ liftIO $ async $ withPeerM penv $ do gossip (RefChanRequest @e puk) let arpc = RpcAdapter @@ -1216,7 +1198,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do dialReqProtoAdapterRouter <- pure dialogRoutes pure DialReqProtoAdapter {..} - rpc <- asyncStick sup $ runRPC udp1 do + rpc <- async $ runRPC udp1 do runProto @e [ makeResponse (rpcHandler arpc) , makeResponse (dialReqProto dialReqProtoAdapter) @@ -1224,7 +1206,7 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) - ann <- liftIO $ asyncStick sup $ runPeerM menv $ do + ann <- liftIO $ async $ runPeerM menv $ do self <- ownPeer @e @@ -1242,10 +1224,9 @@ runPeer' opts = runResourceT $ withAsyncSupervisor "in runPeer" \sup -> do , makeResponse peerAnnounceProto ] - lift $ - (void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread]) - `finally` - (liftIO $ simpleStorageStop s) + void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread] + + liftIO $ simpleStorageStop s @@ -1259,3 +1240,4 @@ emitToPeer :: ( MonadIO m emitToPeer env k e = liftIO $ withPeerM env (emit k e) + diff --git a/hbs2-peer/app/PeerMeta.hs b/hbs2-peer/app/PeerMeta.hs index 532b2a26..b596afeb 100644 --- a/hbs2-peer/app/PeerMeta.hs +++ b/hbs2-peer/app/PeerMeta.hs @@ -17,6 +17,7 @@ import HBS2.System.Logger.Simple import PeerTypes +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 85b5cab5..e0251e31 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -32,6 +32,7 @@ import PeerConfig import Prelude hiding (log) import Data.Foldable (for_) +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Writer qualified as W diff --git a/hbs2-peer/app/ProxyMessaging.hs b/hbs2-peer/app/ProxyMessaging.hs index f1f4afb2..e2cf8d52 100644 --- a/hbs2-peer/app/ProxyMessaging.hs +++ b/hbs2-peer/app/ProxyMessaging.hs @@ -9,7 +9,6 @@ module ProxyMessaging import HBS2.Prelude.Plated import HBS2.Net.Messaging import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Crypto import HBS2.Net.Auth.Credentials import HBS2.Net.Proto.Definition () @@ -28,6 +27,7 @@ import Crypto.Saltine.Core.Box qualified as Encrypt import Codec.Serialise import Control.Applicative import Control.Arrow hiding ((<+>)) +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue import Control.Monad.Trans.Maybe @@ -85,25 +85,23 @@ runProxyMessaging :: forall m . MonadIO m -> m () runProxyMessaging env = liftIO do - withAsyncSupervisor "runProxyMessaging" \sup -> do let udp = view proxyUDP env let answ = view proxyAnswers env let udpPeer = getOwnPeer udp - u <- asyncStick sup $ forever do + u <- async $ forever do msgs <- receive udp (To udpPeer) atomically $ do forM_ msgs $ writeTQueue answ - t <- asyncStick sup $ maybe1 (view proxyTCP env) none $ \tcp -> do + t <- async $ maybe1 (view proxyTCP env) none $ \tcp -> do forever do msgs <- receive tcp (To $ view tcpOwnPeer tcp) atomically $ do forM_ msgs $ writeTQueue answ - -- liftIO $ void $ waitAnyCatch [u,t] ??? - liftIO $ void $ waitAny [u,t] + liftIO $ mapM_ waitCatch [u,t] instance Messaging ProxyMessaging L4Proto LBS.ByteString where diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index c2c961b8..12f806f9 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -6,7 +6,6 @@ module RPC where import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Hash import HBS2.Net.Auth.Credentials import HBS2.Net.IP.Addr @@ -36,6 +35,7 @@ import Lens.Micro.Platform import Network.Socket import System.Exit import System.IO +import UnliftIO.Async as U import Control.Concurrent.MVar data PeerRpcKey @@ -268,7 +268,7 @@ runRpcCommand opt = \case withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () -withRPC o cmd = rpcClientMain o $ runResourceT $ withAsyncSupervisor "withRPC" \sup -> do +withRPC o cmd = rpcClientMain o $ runResourceT do liftIO $ hSetBuffering stdout LineBuffering @@ -285,7 +285,7 @@ withRPC o cmd = rpcClientMain o $ runResourceT $ withAsyncSupervisor "withRPC" \ udp1 <- newMessagingUDP False Nothing `orDie` "Can't start RPC" - mrpc <- asyncStick sup $ runMessagingUDP udp1 + mrpc <- async $ runMessagingUDP udp1 pingQ <- liftIO newTQueueIO @@ -332,9 +332,9 @@ withRPC o cmd = rpcClientMain o $ runResourceT $ withAsyncSupervisor "withRPC" \ , rpcOnRefChanNotify = dontHandle } - prpc <- asyncStick sup $ runRPC udp1 do + prpc <- async $ runRPC udp1 do env <- ask - proto <- liftIO $ asyncStick sup $ continueWithRPC env $ do + proto <- liftIO $ async $ continueWithRPC env $ do runProto @L4Proto [ makeResponse (rpcHandler adapter) ] diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index a13442ce..f996fb49 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -17,7 +17,6 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Events @@ -253,7 +252,6 @@ refChanWorkerInitValidators :: forall e m . ( MonadIO m refChanWorkerInitValidators env = do - withAsyncSupervisor "refChanWorkerInitValidators" \sup -> do debug "refChanWorkerInitValidators" let (PeerConfig syn) = view refChanWorkerConf env @@ -273,7 +271,7 @@ refChanWorkerInitValidators env = do unless here do q <- newTQueueIO - val <- asyncStick sup $ validatorThread sup rc sa q + val <- async $ validatorThread rc sa q let rcv = RefChanValidator q val atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) @@ -283,22 +281,22 @@ refChanWorkerInitValidators env = do mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) -- FIXME: make-thread-respawning - validatorThread sup chan sa q = liftIO do + validatorThread chan sa q = liftIO do client <- newMessagingUnix False 1.0 sa - msg <- asyncStick sup $ runMessagingUnix client + msg <- async $ runMessagingUnix client -- FIXME: hardcoded-timeout waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) runValidateProtoM client do - poke <- asyncStick sup $ forever do + poke <- async $ forever do pause @'Seconds 10 mv <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) - z <- asyncStick sup $ runProto + z <- async $ runProto [ makeResponse (refChanValidateProto waiters) ] @@ -349,29 +347,28 @@ refChanWorker :: forall e s m . ( MonadIO m -> m () refChanWorker env brains = do - withAsyncSupervisor "refChanWorker" \sup -> do penv <- ask mergeQ <- newTQueueIO -- FIXME: resume-on-exception - hw <- asyncStick sup (refChanHeadMon penv) + hw <- async (refChanHeadMon penv) -- FIXME: insist-more-during-download -- что-то частая ситуация, когда блоки -- с трудом докачиваются. надо бы -- разобраться. возможно переделать -- механизм скачивания блоков - downloads <- asyncStick sup monitorHeadDownloads + downloads <- async monitorHeadDownloads - polls <- asyncStick sup refChanPoll + polls <- async refChanPoll - wtrans <- asyncStick sup refChanWriter + wtrans <- async refChanWriter - cleanup1 <- asyncStick sup cleanupRounds + cleanup1 <- async cleanupRounds - merge <- asyncStick sup (logMergeProcess env mergeQ) + merge <- async (logMergeProcess env mergeQ) sto <- getStorage diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index affbc2ec..97624708 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -4,7 +4,6 @@ module RefLog where import HBS2.Prelude.Plated import HBS2.Clock -import HBS2.Concurrent.Supervisor import HBS2.Actors.Peer import HBS2.Events import HBS2.Data.Types.Refs @@ -31,7 +30,6 @@ import Data.Maybe import Data.Foldable(for_) import Data.Text qualified as Text import Control.Concurrent.STM -import Control.Exception qualified as Exception import Control.Monad import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS @@ -39,6 +37,7 @@ import Data.HashMap.Strict qualified as HashMap import Codec.Serialise import Data.HashSet qualified as HashSet import Data.HashSet (HashSet) +import Control.Concurrent.Async import Control.Monad.Trans.Maybe import Lens.Micro.Platform @@ -102,7 +101,7 @@ data RefLogWorkerAdapter e = , reflogFetch :: PubKey 'Sign (Encryption e) -> IO () } -reflogWorker :: forall e s m . ( MonadUnliftIO m, MyPeer e +reflogWorker :: forall e s m . ( MonadIO m, MyPeer e , EventListener e (RefLogUpdateEv e) m , EventListener e (RefLogRequestAnswer e) m -- , Request e (RefLogRequest e) (Peerm @@ -120,7 +119,6 @@ reflogWorker :: forall e s m . ( MonadUnliftIO m, MyPeer e -> m () reflogWorker conf adapter = do - withAsyncSupervisor "reflog worker" \supw -> do sto <- getStorage @@ -167,9 +165,9 @@ reflogWorker conf adapter = do here <- liftIO $ readTVarIO reflogMon <&> HashSet.member h unless here do liftIO $ atomically $ modifyTVar' reflogMon (HashSet.insert h) - void $ liftIO $ asyncStick supw $ do - timeout <- asyncStick supw (reflogTimeout reflog h) - work <- asyncStick supw $ do + void $ liftIO $ async $ do + timeout <- async (reflogTimeout reflog h) + work <- async $ do trace $ "reflog worker. GOT REFLOG ANSWER" <+> pretty (AsBase58 reflog) <+> pretty h reflogDownload adapter h fix \next -> do @@ -218,64 +216,18 @@ reflogWorker conf adapter = do let pollIntervals = HashMap.fromListWith (<>) [ (i, [r]) | (r,i) <- HashMap.toList polls ] & HashMap.toList - withAsyncSupervisor "reflog updater" \sup -> do - pollers <- - forM pollIntervals \(i,refs) -> liftIO do - asyncStick' sup "poller" $ do - pause @'Seconds 10 - forever $ do - for_ refs $ \r -> do - trace $ "POLL REFERENCE" <+> pretty (AsBase58 r) <+> pretty i <> "m" - reflogFetch adapter r - pause (fromIntegral i :: Timeout 'Minutes) - updaters <- replicateM 4 $ liftIO $ asyncStick' sup "updater" $ - (`Exception.finally` (err "reflog updater ended. HOW?!")) $ - (`withSomeExceptionIO` (\e -> err $ "REFLOG UPDATER:" <> viaShow e)) $ - forever $ do - pause @'Seconds 1 - reflogUpdater pQ sto - `withSomeExceptionIO` (\e -> err $ "reflog updater:" <> viaShow e) - -- `Exception.finally` (debug "reflog updater fin") - -- debug "reflog updater normally performed" + pollers' <- liftIO $ async $ do + pause @'Seconds 10 + forM pollIntervals $ \(i,refs) -> liftIO do + async $ forever $ do + for_ refs $ \r -> do + trace $ "POLL REFERENCE" <+> pretty (AsBase58 r) <+> pretty i <> "m" + reflogFetch adapter r - void $ liftIO $ waitAnyCatchCancel $ updaters <> pollers + pause (fromIntegral i :: Timeout 'Minutes) - where - - missedEntries sto h = do - missed <- liftIO $ newTVarIO mempty - walkMerkle h (getBlock sto) $ \hr -> do - case hr of - Left ha -> do - atomically $ modifyTVar missed (ha:) - Right (hs :: [HashRef]) -> do - w <- mapM ( hasBlock sto . fromHashRef ) hs <&> fmap isJust - let mi = [ hx | (False,hx) <- zip w hs ] - for_ mi $ \hx -> liftIO $ atomically $ modifyTVar missed (fromHashRef hx:) - - liftIO $ readTVarIO missed - -readHashesFromBlock :: AnyStorage -> Maybe (Hash HbSync) -> IO [HashRef] -readHashesFromBlock _ Nothing = pure mempty -readHashesFromBlock sto (Just h) = do - treeQ <- liftIO newTQueueIO - walkMerkle h (getBlock sto) $ \hr -> do - case hr of - Left{} -> pure () - Right (hrr :: [HashRef]) -> atomically $ writeTQueue treeQ hrr - re <- liftIO $ atomically $ flushTQueue treeQ - pure $ mconcat re - -reflogUpdater :: forall e s . - ( Serialise (RefLogUpdate e) - , s ~ Encryption e - , IsRefPubKey s - , Pretty (AsBase58 (PubKey 'Sign s)) - ) - => TQueue (PubKey 'Sign s, [RefLogUpdate e]) -> AnyStorage -> IO () - -reflogUpdater pQ sto = do + w1 <- liftIO $ async $ forever $ replicateConcurrently_ 4 do -- TODO: reflog-process-period-to-config -- pause @'Seconds 10 @@ -317,3 +269,33 @@ reflogUpdater pQ sto = do trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty newRoot -- trace "I'm a reflog update worker" + + pollers <- liftIO $ wait pollers' + void $ liftIO $ waitAnyCatchCancel $ w1 : pollers + + where + + readHashesFromBlock _ Nothing = pure mempty + readHashesFromBlock sto (Just h) = do + treeQ <- liftIO newTQueueIO + walkMerkle h (getBlock sto) $ \hr -> do + case hr of + Left{} -> pure () + Right (hrr :: [HashRef]) -> atomically $ writeTQueue treeQ hrr + re <- liftIO $ atomically $ flushTQueue treeQ + pure $ mconcat re + + missedEntries sto h = do + missed <- liftIO $ newTVarIO mempty + walkMerkle h (getBlock sto) $ \hr -> do + case hr of + Left ha -> do + atomically $ modifyTVar missed (ha:) + Right (hs :: [HashRef]) -> do + w <- mapM ( hasBlock sto . fromHashRef ) hs <&> fmap isJust + let mi = [ hx | (False,hx) <- zip w hs ] + for_ mi $ \hx -> liftIO $ atomically $ modifyTVar missed (fromHashRef hx:) + + liftIO $ readTVarIO missed + + diff --git a/hbs2-peer/app/SignalHandlers.hs b/hbs2-peer/app/SignalHandlers.hs deleted file mode 100644 index 449c2f24..00000000 --- a/hbs2-peer/app/SignalHandlers.hs +++ /dev/null @@ -1,25 +0,0 @@ -module SignalHandlers where - -import Control.Exception (Exception, toException) -import Control.Monad -import System.Mem.Weak (deRefWeak) -import System.Posix.Signals -import UnliftIO.Concurrent - -newtype SignalException = SignalException Signal - deriving (Show) -instance Exception SignalException - -installSignalHandlers :: IO () -installSignalHandlers = do - main_thread_id <- myThreadId - weak_tid <- mkWeakThreadId main_thread_id - forM_ [ sigHUP, sigTERM, sigUSR1, sigUSR2, sigXCPU, sigXFSZ ] $ \sig -> - installHandler sig (Catch $ send_exception weak_tid sig) Nothing - where - send_exception weak_tid sig = do - m <- deRefWeak weak_tid - case m of - Nothing -> return () - Just tid -> throwTo tid (toException $ SignalException sig) - diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index d1ab47d0..0f951bdf 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -148,7 +148,6 @@ executable hbs2-peer , Brains , ProxyMessaging , CLI.RefChan - , SignalHandlers -- other-extensions: build-depends: base diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index 768fbb26..0931ed13 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -80,8 +80,6 @@ library , unordered-containers , temporary , filepattern - , unliftio - , unliftio-core hs-source-dirs: lib diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 1bf491bd..d701456c 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -9,14 +9,13 @@ module HBS2.Storage.Simple import HBS2.Clock import HBS2.Hash -import HBS2.Prelude import HBS2.Prelude.Plated import HBS2.Storage import HBS2.Base58 -import HBS2.Concurrent.Supervisor import HBS2.System.Logger.Simple +import Control.Concurrent.Async import Control.Exception import Control.Monad import Control.Monad.Except @@ -165,15 +164,14 @@ simpleStorageStop ss = do simpleStorageWorker :: IsSimpleStorageKey h => SimpleStorage h -> IO () simpleStorageWorker ss = do - withAsyncSupervisor "in simpleStorageWorker" \sup -> do - ops <- asyncStick sup $ fix \next -> do + ops <- async $ fix \next -> do s <- atomically $ do TBMQ.readTBMQueue ( ss ^. storageOpQ ) case s of Nothing -> pure () Just a -> a >> next - killer <- asyncStick sup $ forever $ do + killer <- async $ forever $ do pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting simpleAddTask ss $ do @@ -186,7 +184,7 @@ simpleStorageWorker ss = do writeTVar ( ss ^. storageMMaped ) survived - killerLRU <- asyncStick sup $ forever $ do + killerLRU <- async $ forever $ do pause ( 10 :: Timeout 'Seconds ) -- FIXME: setting atomically $ writeTVar ( ss ^. storageMMapedLRU ) mempty diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index 05d09022..bbeee924 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -21,7 +21,7 @@ import Data.ByteString.Char8 qualified as BS import System.FilePath import Data.Maybe import Control.Concurrent.STM --- import Control.Concurrent.Async +import Control.Concurrent.Async import Control.Monad import Streaming.Prelude qualified as S diff --git a/hbs2-storage-simple/test/TestSimpleStorage.hs b/hbs2-storage-simple/test/TestSimpleStorage.hs index abde354a..fadda6e4 100644 --- a/hbs2-storage-simple/test/TestSimpleStorage.hs +++ b/hbs2-storage-simple/test/TestSimpleStorage.hs @@ -11,6 +11,7 @@ import Control.Monad.Except import Control.Monad import Data.Traversable import Data.Foldable +import Control.Concurrent.Async import Control.Concurrent import Data.ByteString.Lazy qualified as LBS import Data.Maybe diff --git a/hbs2-tests/test/TestTCP.hs b/hbs2-tests/test/TestTCP.hs index a4b8d7c4..08f81479 100644 --- a/hbs2-tests/test/TestTCP.hs +++ b/hbs2-tests/test/TestTCP.hs @@ -16,6 +16,7 @@ import Control.Monad.Writer hiding (listen) import Test.Tasty.HUnit import Data.ByteString.Lazy (ByteString) +import Control.Concurrent.Async import Lens.Micro.Platform import Codec.Serialise diff --git a/hbs2-tests/test/TestTCPNet.hs b/hbs2-tests/test/TestTCPNet.hs index b93292b1..e57fcd37 100644 --- a/hbs2-tests/test/TestTCPNet.hs +++ b/hbs2-tests/test/TestTCPNet.hs @@ -16,6 +16,7 @@ import Control.Monad.Writer hiding (listen) import Test.Tasty.HUnit import Data.ByteString.Lazy (ByteString) +import Control.Concurrent.Async import Lens.Micro.Platform import Codec.Serialise import System.Environment diff --git a/hbs2/Main.hs b/hbs2/Main.hs index 8aa08796..afd5d721 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -2,7 +2,6 @@ module Main where import HBS2.Base58 import HBS2.Data.Detect -import HBS2.Concurrent.Supervisor import HBS2.Data.Types import HBS2.Defaults import HBS2.Merkle @@ -19,6 +18,7 @@ import HBS2.OrDie import HBS2.System.Logger.Simple hiding (info) +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad import Control.Monad.Trans.Maybe @@ -356,7 +356,7 @@ runRefLogGet s ss = do exitSuccess withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () -withStore opts f = withAsyncSupervisor "in withStore" \sup -> do +withStore opts f = do setLogging @DEBUG debugPrefix setLogging @INFO defLog @@ -371,7 +371,7 @@ withStore opts f = withAsyncSupervisor "in withStore" \sup -> do let pref = uniLastDef xdg opts :: StoragePrefix s <- simpleStorageInit (Just pref) - w <- replicateM 4 $ asyncStick sup $ simpleStorageWorker s + w <- replicateM 4 $ async $ simpleStorageWorker s f s diff --git a/hbs2/hbs2.cabal b/hbs2/hbs2.cabal index 9a6d0965..ec761066 100644 --- a/hbs2/hbs2.cabal +++ b/hbs2/hbs2.cabal @@ -91,8 +91,6 @@ executable hbs2 , uuid , terminal-progress-bar , stm - , unliftio - , unliftio-core hs-source-dirs: . default-language: Haskell2010