diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 1cfb5107..4bcdd8b9 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -1,35 +1,28 @@ {-# Language UndecidableInstances #-} +{-# Language RecordWildCards #-} module HBS2.Net.Messaging.UDP where -import HBS2.Clock +import HBS2.Prelude +import HBS2.OrDie import HBS2.Defaults import HBS2.Net.IP.Addr import HBS2.Net.Messaging --- import HBS2.Net.Proto -import HBS2.Prelude.Plated - --- import HBS2.System.Logger.Simple import Data.Function -import Control.Exception import Control.Monad.Trans.Maybe -import Control.Concurrent.Async -import Control.Concurrent.STM +import Control.Monad.Trans.Cont import Control.Concurrent.STM.TQueue qualified as Q0 -import Control.Monad import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.List qualified as L import Data.Maybe --- import Data.Text (Text) import Data.Text qualified as Text import Lens.Micro.Platform import Network.Socket import Network.Socket.ByteString import Network.Multicast -import Control.Monad.Trans.Resource - +import UnliftIO -- One address - one peer - one messaging data MessagingUDP = @@ -37,64 +30,57 @@ data MessagingUDP = { listenAddr :: SockAddr , sink :: TQueue (From L4Proto, ByteString) , inbox :: TQueue (To L4Proto, ByteString) - , sock :: TVar Socket + , sock :: TVar (Maybe Socket) , mcast :: Bool } - getOwnPeer :: MessagingUDP -> Peer L4Proto getOwnPeer mess = PeerL4 UDP (listenAddr mess) -newMessagingUDPMulticast :: MonadResource m => String -> m (Maybe MessagingUDP) +newMessagingUDPMulticast :: MonadUnliftIO m => String -> m (Maybe MessagingUDP) newMessagingUDPMulticast s = runMaybeT $ do (host, port) <- MaybeT $ pure $ getHostPort (Text.pack s) so <- liftIO $ multicastReceiver host port - _ <- register $ close so - liftIO $ setSocketOption so ReuseAddr 1 a <- liftIO $ getSocketName so liftIO $ MessagingUDP a <$> Q0.newTQueueIO <*> Q0.newTQueueIO - <*> newTVarIO so + <*> newTVarIO (Just so) <*> pure True +isUDPSocketClosed :: MonadUnliftIO m => MessagingUDP -> m Bool +isUDPSocketClosed MessagingUDP{..} = readTVarIO sock <&> isNothing -newMessagingUDP :: (MonadIO m, MonadResource m) => Bool -> Maybe String -> m (Maybe MessagingUDP) +newMessagingUDP :: (MonadUnliftIO m) => Bool -> Maybe String -> m (Maybe MessagingUDP) newMessagingUDP reuse saddr = case saddr of Just s -> do - runMaybeT $ do l <- MaybeT $ liftIO $ parseAddrUDP (Text.pack s) <&> listToMaybe . sorted let a = addrAddress l so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) - _ <- register $ close so - when reuse $ do liftIO $ setSocketOption so ReuseAddr 1 liftIO $ MessagingUDP a <$> Q0.newTQueueIO <*> Q0.newTQueueIO - <*> newTVarIO so + <*> newTVarIO (Just so) <*> pure False Nothing -> do so <- liftIO $ socket AF_INET Datagram defaultProtocol - - _ <- register $ close so - sa <- liftIO $ getSocketName so liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO <*> Q0.newTQueueIO - <*> newTVarIO so + <*> newTVarIO (Just so) <*> pure False ) @@ -106,24 +92,27 @@ newMessagingUDP reuse saddr = SockAddrUnix{} -> 2 -udpWorker :: MessagingUDP -> TVar Socket -> IO () -udpWorker env tso = do - so <- readTVarIO tso - forever $ do - (msg, from) <- recvFrom so defMaxDatagram - liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg) - -- FIXME: stopping -runMessagingUDP :: MonadIO m => MessagingUDP -> m () -runMessagingUDP udpMess = liftIO $ do - let addr = listenAddr udpMess - so <- readTVarIO (sock udpMess) +runMessagingUDP :: MonadUnliftIO m => MessagingUDP -> m () +runMessagingUDP MessagingUDP{..} = void $ flip runContT pure do - unless (mcast udpMess) $ do - bind so addr + let addr = listenAddr + so <- liftIO (readTVarIO sock) >>= orThrowUser "UDP socket is not ready" + + void $ ContT $ bracket (pure (Just so)) $ \case + Just so -> liftIO (close so >> atomically (writeTVar sock Nothing)) + Nothing -> pure () + + unless mcast $ do + liftIO $ bind so addr + + w <- ContT $ withAsync do + forever $ liftIO do + (msg, from) <- recvFrom so defMaxDatagram + liftIO $ atomically $ + Q0.writeTQueue sink (From (PeerL4 UDP from), LBS.fromStrict msg) - w <- async $ udpWorker udpMess (sock udpMess) link w waitCatch w >>= either throwIO (const $ pure ()) @@ -132,8 +121,9 @@ instance Messaging MessagingUDP L4Proto ByteString where sendTo bus (To whom) _ msg = liftIO do -- atomically $ Q0.writeTQueue (inbox bus) (To whom, msg) - so <- readTVarIO (sock bus) - sendAllTo so (LBS.toStrict msg) (view sockAddr whom) + mso <- readTVarIO (sock bus) + for_ mso $ \so -> do + sendAllTo so (LBS.toStrict msg) (view sockAddr whom) receive bus _ = liftIO do -- so <- readTVarIO (sock bus) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 801acbc7..6b12d0ff 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -122,11 +122,11 @@ import System.Metrics import System.Posix.Process import Control.Monad.Trans.Cont +import UnliftIO (MonadUnliftIO(..)) import UnliftIO.Exception qualified as U -- import UnliftIO.STM import UnliftIO.Async -import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S import Graphics.Vty qualified as Vty @@ -713,7 +713,7 @@ runPeer :: forall e s . ( e ~ L4Proto , HasStorage (PeerM e IO) )=> PeerOpts -> IO () -runPeer opts = respawnOnError opts $ runResourceT do +runPeer opts = respawnOnError opts $ do probes <- liftIO $ newTVarIO (mempty :: [AnyProbe]) @@ -842,7 +842,7 @@ runPeer opts = respawnOnError opts $ runResourceT do udpPoint <- runMaybeT do sa <- toMPlus listenSa - env <- toMPlus =<< newMessagingUDP False (Just sa) + env <- toMPlus =<< liftIO (newMessagingUDP False (Just sa)) void $ liftIO ( async do runMessagingUDP env @@ -1328,8 +1328,8 @@ runPeer opts = respawnOnError opts $ runResourceT do rpcProto <- async $ flip runReaderT rpcctx do env <- newNotifyEnvServer @(RefChanEvents L4Proto) refChanNotifySource envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource - w1 <- async $ runNotifyWorkerServer env - w2 <- async $ runNotifyWorkerServer envrl + w1 <- asyncLinked $ runNotifyWorkerServer env + w2 <- asyncLinked $ runNotifyWorkerServer envrl wws <- replicateM 1 $ async $ runProto @UNIX [ makeResponse (makeServer @PeerAPI) , makeResponse (makeServer @RefLogAPI) @@ -1358,7 +1358,7 @@ runPeer opts = respawnOnError opts $ runResourceT do pause @'Seconds 1 -- we want to clean up all resources - throwM GoAgainException + throwIO GoAgainException emitToPeer :: ( MonadIO m , EventEmitter e a (PeerM e IO) diff --git a/hbs2-tests/test/TestUDP.hs b/hbs2-tests/test/TestUDP.hs index f11209b2..7608e895 100644 --- a/hbs2-tests/test/TestUDP.hs +++ b/hbs2-tests/test/TestUDP.hs @@ -5,26 +5,29 @@ import HBS2.Prelude.Plated import HBS2.Net.Proto import HBS2.Net.Messaging.UDP import HBS2.Actors.Peer +import HBS2.Misc.PrettyStuff import HBS2.OrDie import Control.Monad.Reader +import Control.Monad.Trans.Cont +import Control.Concurrent.STM (retry) import Data.ByteString.Lazy (ByteString) import Prettyprinter -import System.IO +import System.IO (hPrint) import Lens.Micro.Platform import Codec.Serialise -- import Control.Concurrent.Async -import Control.Monad.Trans.Resource +import UnliftIO import UnliftIO.Async +import UnliftIO.STM type UDP = L4Proto debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p - data PingPong e = Ping Int | Pong Int deriving stock (Eq,Generic,Show,Read) @@ -43,15 +46,20 @@ pingPongHandler :: forall e m . ( MonadIO m , Response e (PingPong e) m , HasProtocol e (PingPong e) ) - => Int + => TVar Int + -> Int -> PingPong e -> m () -pingPongHandler n = \case +pingPongHandler tv n = \case Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c) - Pong c | c < n -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) + Pong c | c < n -> do + debug ("Pong" <+> pretty c) + liftIO $ atomically $ writeTVar tv c + response (Ping @e (succ c)) + | otherwise -> pure () data PPEnv = @@ -84,28 +92,58 @@ instance HasTimeLimits UDP (PingPong UDP) IO where tryLockForPeriod _ _ = pure True main :: IO () -main = runResourceT do +main = do liftIO $ hSetBuffering stdout LineBuffering liftIO $ hSetBuffering stderr LineBuffering - udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" - udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" + let tries = 1000 - m1 <- async $ runMessagingUDP udp1 - m2 <- async $ runMessagingUDP udp2 + replicateM_ 10 do - p1 <- async $ runPingPong udp1 do - request (getOwnPeer udp2) (Ping @UDP 0) - runProto @UDP - [ makeResponse (pingPongHandler 3) - ] + udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" + udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" - p2 <- async $ runPingPong udp2 do - -- request (getOwnPeer udp1) (Ping @UDP 0) - runProto @UDP - [ makeResponse (pingPongHandler 3) - ] + void $ flip runContT pure do - mapM_ wait [p1,p2,m1,m2] + m1 <- ContT $ withAsync $ runMessagingUDP udp1 + m2 <- ContT $ withAsync $ runMessagingUDP udp2 + + tping1 <- newTVarIO 0 + tping2 <- newTVarIO 0 + + pause @'Seconds 0.01 + + p1 <- ContT $ withAsync $ runPingPong udp1 do + request (getOwnPeer udp2) (Ping @UDP 0) + runProto @UDP + [ makeResponse (pingPongHandler tping1 tries) + ] + + p2 <- ContT $ withAsync $ runPingPong udp2 do + -- request (getOwnPeer udp1) (Ping @UDP 0) + runProto @UDP + [ makeResponse (pingPongHandler tping2 tries) + ] + + r <- liftIO $ race (pause @'Seconds 2) do + atomically do + r1 <- readTVar tping1 + r2 <- readTVar tping2 + if (max r1 r2) >= (tries-1) then pure () else retry + + let done = either (const "fail") (const "okay") r + + v1 <- readTVarIO tping1 + v2 <- readTVarIO tping2 + + liftIO $ hPrint stdout $ pretty "finished" <+> pretty done <+> pretty (max v1 v2) + + mapM_ cancel [m1,m2,p1,p2] + + c1 <- liftIO $ isUDPSocketClosed udp1 + c2 <- liftIO $ isUDPSocketClosed udp2 + + liftIO $ hPrint stdout $ pretty "socket1 closed" <+> pretty c1 + liftIO $ hPrint stdout $ pretty "socket2 closed" <+> pretty c2 diff --git a/test/RT/test-udp-messaging-1.baseline b/test/RT/test-udp-messaging-1.baseline new file mode 100644 index 00000000..ee21c982 --- /dev/null +++ b/test/RT/test-udp-messaging-1.baseline @@ -0,0 +1,31 @@ +(root + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True) + (finished okay 999) + (socket1 closed True) + (socket2 closed True)) \ No newline at end of file