From 453b7fd822f34b120f030c9191c8a2df66800390 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 24 May 2023 10:23:36 +0300 Subject: [PATCH] introducing resourcet --- hbs2-core/hbs2-core.cabal | 2 + hbs2-core/lib/HBS2/Net/Messaging/UDP.hs | 12 +++++- hbs2-peer/app/PeerMain.hs | 57 +++++++++++++------------ hbs2-peer/hbs2-peer.cabal | 1 + hbs2-tests/hbs2-tests.cabal | 2 + hbs2-tests/test/TestUDP.hs | 11 +++-- 6 files changed, 53 insertions(+), 32 deletions(-) diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index b6be314e..51601614 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -140,6 +140,7 @@ library , prettyprinter , random , random-shuffle + , resourcet , safe , saltine ^>=0.2.0.1 , serialise @@ -154,6 +155,7 @@ library , transformers , uniplate , unordered-containers + , unliftio hs-source-dirs: lib diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 9ae6079c..63e75c0d 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -8,6 +8,8 @@ import HBS2.Net.Messaging import HBS2.Net.Proto import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple + import Data.Function import Control.Exception import Control.Monad.Trans.Maybe @@ -27,6 +29,7 @@ import Network.Socket import Network.Socket.ByteString import Network.Multicast +import Control.Monad.Trans.Resource -- One address - one peer - one messaging @@ -59,7 +62,9 @@ newMessagingUDPMulticast s = runMaybeT $ do <*> newTVarIO so <*> pure True -newMessagingUDP :: MonadIO m => Bool -> Maybe String -> m (Maybe MessagingUDP) +close_ so = trace "closing fuckin socket!!" >> close so + +newMessagingUDP :: (MonadIO m, MonadResource m) => Bool -> Maybe String -> m (Maybe MessagingUDP) newMessagingUDP reuse saddr = case saddr of Just s -> do @@ -69,6 +74,8 @@ newMessagingUDP reuse saddr = let a = addrAddress l so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) + _ <- register $ close_ so + when reuse $ do liftIO $ setSocketOption so ReuseAddr 1 @@ -80,6 +87,9 @@ newMessagingUDP reuse saddr = Nothing -> do so <- liftIO $ socket AF_INET Datagram defaultProtocol + + _ <- register $ close_ so + sa <- liftIO $ getSocketName so liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index f0cf043a..46860edf 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -50,7 +50,7 @@ import ProxyMessaging import PeerMeta import Codec.Serialise -import Control.Concurrent.Async +-- import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception as Exception import Control.Monad.Reader @@ -77,9 +77,14 @@ import Options.Applicative import System.Directory import System.Exit import System.IO +import System.Mem import System.Metrics import Data.Cache qualified as Cache + import UnliftIO.Exception qualified as U +-- import UnliftIO.STM +import UnliftIO.Async as U +import Control.Monad.Trans.Resource -- TODO: write-workers-to-config @@ -353,7 +358,7 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ myException :: SomeException -> IO () -myException e = die ( show e ) >> exitFailure +myException e = err ( show e ) >> notice "RESTARTING..." newtype CredentialsM e s m a = @@ -418,17 +423,20 @@ runPeer :: forall e s . ( e ~ L4Proto , s ~ Encryption e ) => PeerOpts -> IO () -runPeer opts = Exception.handle myException $ do +runPeer opts = fix \me -> U.handle (\e -> myException e + >> performGC + >> pause @'Seconds 10 + >> me + ) $ runResourceT do - metrics <- newStore + metrics <- liftIO newStore - - xdg <- getXdgDirectory XdgData defStorePath <&> fromString + xdg <- liftIO $ getXdgDirectory XdgData defStorePath <&> fromString conf <- peerConfigRead (view peerConfig opts) -- let (PeerConfig syn) = conf - print $ pretty conf + liftIO $ print $ pretty conf let listenConf = cfgValue @PeerListenKey conf let rpcConf = cfgValue @PeerRpcKey conf @@ -462,7 +470,7 @@ runPeer opts = Exception.handle myException $ do let accptAnn = cfgValue @PeerAcceptAnnounceKey conf :: AcceptAnnounce - print $ pretty accptAnn + liftIO $ print $ pretty accptAnn -- FIXME: move-peerBanned-somewhere let peerBanned p d = do @@ -476,11 +484,11 @@ runPeer opts = Exception.handle myException $ do AcceptAnnounceAll -> pure True AcceptAnnounceFrom s -> pure $ view peerSignKey d `Set.member` s - rpcQ <- newTQueueIO @RPCCommand + rpcQ <- liftIO $ newTQueueIO @RPCCommand let ps = mempty - pc' <- LBS.readFile credFile + pc' <- liftIO $ LBS.readFile credFile <&> parseCredentials @(Encryption e) . AsCredFile . LBS.toStrict . LBS.take 4096 @@ -493,9 +501,9 @@ runPeer opts = Exception.handle myException $ do let blk = liftIO . hasBlock s - w <- replicateM defStorageThreads $ async $ simpleStorageWorker s + w <- replicateM defStorageThreads $ async $ liftIO $ simpleStorageWorker s - localMulticast <- (headMay <$> parseAddrUDP (fromString defLocalMulticast) + localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast) <&> fmap (fromSockAddr @'UDP . addrAddress) ) `orDie` "assertion: localMulticastPeer not set" @@ -506,19 +514,16 @@ runPeer opts = Exception.handle myException $ do `orDie` "unable listen on the given addr" udp <- async $ runMessagingUDP mess - `catch` (\(e::SomeException) -> throwIO e ) udp1 <- newMessagingUDP False rpcSa `orDie` "Can't start RPC listener" mrpc <- async $ runMessagingUDP udp1 - `catch` (\(e::SomeException) -> throwIO e ) mcast <- newMessagingUDPMulticast defLocalMulticast `orDie` "Can't start RPC listener" messMcast <- async $ runMessagingUDP mcast - `catch` (\(e::SomeException) -> throwIO e ) brains <- newBasicBrains @e conf @@ -535,13 +540,11 @@ runPeer opts = Exception.handle myException $ do tcpEnv <- newMessagingTCP addr -- FIXME: handle-tcp-thread-somehow void $ async $ runMessagingTCP tcpEnv - `catch` (\(e::SomeException) -> throwIO e ) pure $ Just tcpEnv proxy <- newProxyMessaging mess tcp proxyThread <- async $ runProxyMessaging proxy - `catch` (\(e::SomeException) -> throwIO e ) penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) @@ -562,7 +565,7 @@ runPeer opts = Exception.handle myException $ do -- debug $ "onNoBlock" <+> pretty p <+> pretty h withPeerM penv $ withDownload denv (addDownload mzero h) - loop <- async do + loop <- liftIO $ async do runPeerM penv $ do adapter <- mkAdapter @@ -963,7 +966,7 @@ runPeer opts = Exception.handle myException $ do menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) - ann <- async $ runPeerM menv $ do + ann <- liftIO $ async $ runPeerM menv $ do self <- ownPeer @e @@ -983,7 +986,7 @@ runPeer opts = Exception.handle myException $ do void $ waitAnyCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast,brainsThread] - simpleStorageStop s + liftIO $ simpleStorageStop s @@ -1003,9 +1006,9 @@ rpcClientMain opt action = do action withRPC :: FromStringMaybe (PeerAddr L4Proto) => RPCOpt -> RPC L4Proto -> IO () -withRPC o cmd = rpcClientMain o $ do +withRPC o cmd = rpcClientMain o $ runResourceT do - hSetBuffering stdout LineBuffering + liftIO $ hSetBuffering stdout LineBuffering conf <- peerConfigRead (view rpcOptConf o) @@ -1013,7 +1016,7 @@ withRPC o cmd = rpcClientMain o $ do saddr <- pure (view rpcOptAddr o <|> rpcConf) `orDie` "RPC endpoint not set" - as <- parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) + as <- liftIO $ parseAddrUDP (fromString saddr) <&> fmap (fromSockAddr @'UDP . addrAddress) let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as rpc <- pure rpc' `orDie` "Can't parse RPC endpoint" @@ -1022,13 +1025,13 @@ withRPC o cmd = rpcClientMain o $ do mrpc <- async $ runMessagingUDP udp1 - pingQ <- newTQueueIO + pingQ <- liftIO newTQueueIO - pokeQ <- newTQueueIO + pokeQ <- liftIO newTQueueIO - pokeFQ <- newTQueueIO + pokeFQ <- liftIO newTQueueIO - refQ <- newTQueueIO + refQ <- liftIO newTQueueIO let adapter = RpcAdapter dontHandle diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 8092c059..89a17d2c 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -38,6 +38,7 @@ common common-deps , prettyprinter , random , random-shuffle + , resourcet -- , resolv , safe , saltine >=0.2.0.1 diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 9773da7c..229674b7 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -35,6 +35,7 @@ common common-deps , QuickCheck , random , random-shuffle + , resourcet , safe , serialise , split @@ -51,6 +52,7 @@ common common-deps , vector , prettyprinter-ansi-terminal , interpolatedstring-perl6 + , unliftio common shared-properties ghc-options: diff --git a/hbs2-tests/test/TestUDP.hs b/hbs2-tests/test/TestUDP.hs index e286391c..f11209b2 100644 --- a/hbs2-tests/test/TestUDP.hs +++ b/hbs2-tests/test/TestUDP.hs @@ -14,7 +14,10 @@ import System.IO import Lens.Micro.Platform import Codec.Serialise -import Control.Concurrent.Async +-- import Control.Concurrent.Async + +import Control.Monad.Trans.Resource +import UnliftIO.Async type UDP = L4Proto @@ -81,9 +84,9 @@ instance HasTimeLimits UDP (PingPong UDP) IO where tryLockForPeriod _ _ = pure True main :: IO () -main = do - hSetBuffering stdout LineBuffering - hSetBuffering stderr LineBuffering +main = runResourceT do + liftIO $ hSetBuffering stdout LineBuffering + liftIO $ hSetBuffering stderr LineBuffering udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002"