From e9a552d78aebfab76234f411d33d3f4b699192f1 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 24 May 2023 12:31:41 +0300 Subject: [PATCH] introducing resourcet --- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 58 +++++++++++++++---------- hbs2-core/lib/HBS2/Net/Messaging/UDP.hs | 11 ++--- hbs2-peer/app/PeerInfo.hs | 4 +- hbs2-tests/hbs2-tests.cabal | 44 +++++++++++++++++++ hbs2-tests/test/TestMisc.hs | 36 +++++++++++++++ 5 files changed, 122 insertions(+), 31 deletions(-) create mode 100644 hbs2-tests/test/TestMisc.hs diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index e3f0916e..bf19be3f 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -16,9 +16,9 @@ import HBS2.Prelude.Plated import HBS2.System.Logger.Simple -import Control.Concurrent.Async -import Control.Concurrent.STM -import Control.Exception +-- import Control.Concurrent.Async +import Control.Concurrent.STM (flushTQueue,stateTVar) +import Control.Exception (try,Exception,SomeException,throwIO) import Control.Monad import Data.Bits import Data.ByteString.Lazy (ByteString) @@ -35,10 +35,14 @@ import Lens.Micro.Platform import Network.ByteOrder hiding (ByteString) import Network.Simple.TCP import Network.Socket hiding (listen,connect) -import Network.Socket.ByteString.Lazy hiding (send,recv) +-- import Network.Socket.ByteString.Lazy hiding (send,recv) import Streaming.Prelude qualified as S import System.Random hiding (next) +import UnliftIO.Async +import UnliftIO.STM +import UnliftIO.Exception qualified as U + data SocketClosedException = SocketClosedException deriving stock (Show, Typeable) @@ -142,7 +146,6 @@ readFromSocket sock size = LBS.fromChunks <$> (go size & S.toList_) go (max 0 (n - nread)) eos = do - debug "SOCKET FUCKING CLOSED!" liftIO $ throwIO SocketClosedException connectionId :: Word32 -> Word32 -> Word64 @@ -346,14 +349,18 @@ connectPeerTCP env peer = liftIO do -- REVIEW: так что в итоге? где-то здесь? shutdown sock ShutdownBoth + +-- FIXME: link-all-asyncs + runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP env = liftIO do + own <- toPeerAddr $ view tcpOwnPeer env let (L4Address _ (IPAddrPort (i,p))) = own let defs = view tcpDefer env - void $ async $ forever do + mon <- async $ forever do pause @'Seconds 30 now <- getTimeCoarse @@ -366,7 +373,7 @@ runMessagingTCP env = liftIO do [] -> Nothing xs -> Just xs - void $ async $ forever do + con <- async $ forever do let ev = view tcpDeferEv env @@ -400,7 +407,7 @@ runMessagingTCP env = liftIO do pure () - void $ async $ forever do + stat <- async $ forever do pause @'Seconds 120 ps <- readTVarIO $ view tcpConnPeer env let peers = HashMap.toList ps @@ -411,28 +418,31 @@ runMessagingTCP env = liftIO do <+> pretty c <+> parens ("used:" <+> pretty used) - listen (Host (show i)) (show p) $ \(sock, sa) -> do - debug $ "Listening on" <+> pretty sa + mapM_ link [mon,con,stat] - forever do - void $ acceptFork sock $ \(so, remote) -> do - trace $ "GOT INCOMING CONNECTION FROM" - <+> brackets (pretty own) - <+> brackets (pretty sa) - <+> pretty remote + liftIO ( + listen (Host (show i)) (show p) $ \(sock, sa) -> do + debug $ "Listening on" <+> pretty sa - void $ try @SomeException $ do + forever do + void $ acceptFork sock $ \(so, remote) -> do + trace $ "GOT INCOMING CONNECTION FROM" + <+> brackets (pretty own) + <+> brackets (pretty sa) + <+> pretty remote - spawnConnection Server env so remote + void $ try @SomeException $ do - -- gracefulClose so 1000 + spawnConnection Server env so remote - -- TODO: probably-cleanup-peer - -- TODO: periodically-drop-inactive-connections + -- gracefulClose so 1000 - debug $ "CLOSING CONNECTION" <+> pretty remote - shutdown so ShutdownBoth - close so + -- TODO: probably-cleanup-peer + -- TODO: periodically-drop-inactive-connections + + debug $ "CLOSING CONNECTION" <+> pretty remote + shutdown so ShutdownBoth + close so ) `U.finally` mapM_ cancel [mon,con,stat] traceCmd :: forall a ann b m . ( Pretty a diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 63e75c0d..e47789fb 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -46,13 +46,15 @@ data MessagingUDP = getOwnPeer :: MessagingUDP -> Peer L4Proto getOwnPeer mess = PeerL4 UDP (listenAddr mess) -newMessagingUDPMulticast :: MonadIO m => String -> m (Maybe MessagingUDP) +newMessagingUDPMulticast :: MonadResource m => String -> m (Maybe MessagingUDP) newMessagingUDPMulticast s = runMaybeT $ do (host, port) <- MaybeT $ pure $ getHostPort (Text.pack s) so <- liftIO $ multicastReceiver host port + _ <- register $ close so + liftIO $ setSocketOption so ReuseAddr 1 a <- liftIO $ getSocketName so @@ -62,7 +64,6 @@ newMessagingUDPMulticast s = runMaybeT $ do <*> newTVarIO so <*> pure True -close_ so = trace "closing fuckin socket!!" >> close so newMessagingUDP :: (MonadIO m, MonadResource m) => Bool -> Maybe String -> m (Maybe MessagingUDP) newMessagingUDP reuse saddr = @@ -74,7 +75,7 @@ newMessagingUDP reuse saddr = let a = addrAddress l so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) - _ <- register $ close_ so + _ <- register $ close so when reuse $ do liftIO $ setSocketOption so ReuseAddr 1 @@ -88,7 +89,7 @@ newMessagingUDP reuse saddr = Nothing -> do so <- liftIO $ socket AF_INET Datagram defaultProtocol - _ <- register $ close_ so + _ <- register $ close so sa <- liftIO $ getSocketName so @@ -129,7 +130,7 @@ udpWorker env tso = do -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) -- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) - mapM_ wait [rcvLoop,sndLoop] + void $ waitAnyCatchCancel [rcvLoop,sndLoop] -- FIXME: stopping diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index ddd4eef8..738db87e 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -133,7 +133,7 @@ peerPingLoop cfg = do -- TODO: peer info loop - void $ liftIO $ async $ forever $ withPeerM e $ do + infoLoop <- liftIO $ async $ forever $ withPeerM e $ do pause @'Seconds 10 pee <- knownPeers @e pl @@ -186,7 +186,7 @@ peerPingLoop cfg = do expire (PeerInfoKey p) expire (KnownPeerKey p) - liftIO $ link watch + liftIO $ mapM_ link [watch, infoLoop] forever do diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 229674b7..3e94e70a 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -501,3 +501,47 @@ test-suite test-concurrent-write , terminal-progress-bar +test-suite test-misc + import: shared-properties + import: common-deps + default-language: Haskell2010 + + + ghc-options: + -threaded + -rtsopts + "-with-rtsopts=-N6 -A64m -AL256m -I0" + + other-modules: + + -- other-extensions: + + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestMisc.hs + + build-depends: + base, hbs2-storage-simple, hbs2-core + , async + , bytestring + , cborg + , containers + , directory + , filepath + , hashable + , microlens-platform + , mtl + , prettyprinter + , QuickCheck + , stm + , random + , safe + , serialise + , tasty + , tasty-hunit + , temporary + , timeit + , uniplate + , vector + , terminal-progress-bar + diff --git a/hbs2-tests/test/TestMisc.hs b/hbs2-tests/test/TestMisc.hs new file mode 100644 index 00000000..998c6ebf --- /dev/null +++ b/hbs2-tests/test/TestMisc.hs @@ -0,0 +1,36 @@ +module Main where + +import HBS2.Clock + +import Control.Concurrent.Async +import UnliftIO.Exception +import System.IO +import Control.Monad + +testOne :: IO () +testOne = do + + t1 <- async $ forever $ do + pause @'Seconds 1 + print "ONE" + + t2 <- async $ forever $ do + pause @'Seconds 2 + print "TWO" + + link t1 + link t2 + + print "testOne DONE" + pause @'Seconds 10 + +main = do + hSetBuffering stdout LineBuffering + + testOne + + pause @'Seconds 30 + + print "WTF?" + + pure ()