This commit is contained in:
voidlizard 2024-11-01 16:05:32 +03:00
parent 11c1124994
commit 43eb9abb7e
4 changed files with 130 additions and 71 deletions

View File

@ -1,35 +1,28 @@
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language RecordWildCards #-}
module HBS2.Net.Messaging.UDP where module HBS2.Net.Messaging.UDP where
import HBS2.Clock import HBS2.Prelude
import HBS2.OrDie
import HBS2.Defaults import HBS2.Defaults
import HBS2.Net.IP.Addr import HBS2.Net.IP.Addr
import HBS2.Net.Messaging import HBS2.Net.Messaging
-- import HBS2.Net.Proto
import HBS2.Prelude.Plated
-- import HBS2.System.Logger.Simple
import Data.Function import Data.Function
import Control.Exception
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Concurrent.Async import Control.Monad.Trans.Cont
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue qualified as Q0 import Control.Concurrent.STM.TQueue qualified as Q0
import Control.Monad
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.List qualified as L import Data.List qualified as L
import Data.Maybe import Data.Maybe
-- import Data.Text (Text)
import Data.Text qualified as Text import Data.Text qualified as Text
import Lens.Micro.Platform import Lens.Micro.Platform
import Network.Socket import Network.Socket
import Network.Socket.ByteString import Network.Socket.ByteString
import Network.Multicast import Network.Multicast
import Control.Monad.Trans.Resource import UnliftIO
-- One address - one peer - one messaging -- One address - one peer - one messaging
data MessagingUDP = data MessagingUDP =
@ -37,64 +30,57 @@ data MessagingUDP =
{ listenAddr :: SockAddr { listenAddr :: SockAddr
, sink :: TQueue (From L4Proto, ByteString) , sink :: TQueue (From L4Proto, ByteString)
, inbox :: TQueue (To L4Proto, ByteString) , inbox :: TQueue (To L4Proto, ByteString)
, sock :: TVar Socket , sock :: TVar (Maybe Socket)
, mcast :: Bool , mcast :: Bool
} }
getOwnPeer :: MessagingUDP -> Peer L4Proto getOwnPeer :: MessagingUDP -> Peer L4Proto
getOwnPeer mess = PeerL4 UDP (listenAddr mess) getOwnPeer mess = PeerL4 UDP (listenAddr mess)
newMessagingUDPMulticast :: MonadResource m => String -> m (Maybe MessagingUDP) newMessagingUDPMulticast :: MonadUnliftIO m => String -> m (Maybe MessagingUDP)
newMessagingUDPMulticast s = runMaybeT $ do newMessagingUDPMulticast s = runMaybeT $ do
(host, port) <- MaybeT $ pure $ getHostPort (Text.pack s) (host, port) <- MaybeT $ pure $ getHostPort (Text.pack s)
so <- liftIO $ multicastReceiver host port so <- liftIO $ multicastReceiver host port
_ <- register $ close so
liftIO $ setSocketOption so ReuseAddr 1 liftIO $ setSocketOption so ReuseAddr 1
a <- liftIO $ getSocketName so a <- liftIO $ getSocketName so
liftIO $ MessagingUDP a <$> Q0.newTQueueIO liftIO $ MessagingUDP a <$> Q0.newTQueueIO
<*> Q0.newTQueueIO <*> Q0.newTQueueIO
<*> newTVarIO so <*> newTVarIO (Just so)
<*> pure True <*> pure True
isUDPSocketClosed :: MonadUnliftIO m => MessagingUDP -> m Bool
isUDPSocketClosed MessagingUDP{..} = readTVarIO sock <&> isNothing
newMessagingUDP :: (MonadIO m, MonadResource m) => Bool -> Maybe String -> m (Maybe MessagingUDP) newMessagingUDP :: (MonadUnliftIO m) => Bool -> Maybe String -> m (Maybe MessagingUDP)
newMessagingUDP reuse saddr = newMessagingUDP reuse saddr =
case saddr of case saddr of
Just s -> do Just s -> do
runMaybeT $ do runMaybeT $ do
l <- MaybeT $ liftIO $ parseAddrUDP (Text.pack s) <&> listToMaybe . sorted l <- MaybeT $ liftIO $ parseAddrUDP (Text.pack s) <&> listToMaybe . sorted
let a = addrAddress l let a = addrAddress l
so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l)
_ <- register $ close so
when reuse $ do when reuse $ do
liftIO $ setSocketOption so ReuseAddr 1 liftIO $ setSocketOption so ReuseAddr 1
liftIO $ MessagingUDP a <$> Q0.newTQueueIO liftIO $ MessagingUDP a <$> Q0.newTQueueIO
<*> Q0.newTQueueIO <*> Q0.newTQueueIO
<*> newTVarIO so <*> newTVarIO (Just so)
<*> pure False <*> pure False
Nothing -> do Nothing -> do
so <- liftIO $ socket AF_INET Datagram defaultProtocol so <- liftIO $ socket AF_INET Datagram defaultProtocol
_ <- register $ close so
sa <- liftIO $ getSocketName so sa <- liftIO $ getSocketName so
liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO
<*> Q0.newTQueueIO <*> Q0.newTQueueIO
<*> newTVarIO so <*> newTVarIO (Just so)
<*> pure False <*> pure False
) )
@ -106,24 +92,27 @@ newMessagingUDP reuse saddr =
SockAddrUnix{} -> 2 SockAddrUnix{} -> 2
udpWorker :: MessagingUDP -> TVar Socket -> IO ()
udpWorker env tso = do
so <- readTVarIO tso
forever $ do
(msg, from) <- recvFrom so defMaxDatagram
liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg)
-- FIXME: stopping -- FIXME: stopping
runMessagingUDP :: MonadIO m => MessagingUDP -> m () runMessagingUDP :: MonadUnliftIO m => MessagingUDP -> m ()
runMessagingUDP udpMess = liftIO $ do runMessagingUDP MessagingUDP{..} = void $ flip runContT pure do
let addr = listenAddr udpMess
so <- readTVarIO (sock udpMess)
unless (mcast udpMess) $ do let addr = listenAddr
bind so addr so <- liftIO (readTVarIO sock) >>= orThrowUser "UDP socket is not ready"
void $ ContT $ bracket (pure (Just so)) $ \case
Just so -> liftIO (close so >> atomically (writeTVar sock Nothing))
Nothing -> pure ()
unless mcast $ do
liftIO $ bind so addr
w <- ContT $ withAsync do
forever $ liftIO do
(msg, from) <- recvFrom so defMaxDatagram
liftIO $ atomically $
Q0.writeTQueue sink (From (PeerL4 UDP from), LBS.fromStrict msg)
w <- async $ udpWorker udpMess (sock udpMess)
link w link w
waitCatch w >>= either throwIO (const $ pure ()) waitCatch w >>= either throwIO (const $ pure ())
@ -132,7 +121,8 @@ instance Messaging MessagingUDP L4Proto ByteString where
sendTo bus (To whom) _ msg = liftIO do sendTo bus (To whom) _ msg = liftIO do
-- atomically $ Q0.writeTQueue (inbox bus) (To whom, msg) -- atomically $ Q0.writeTQueue (inbox bus) (To whom, msg)
so <- readTVarIO (sock bus) mso <- readTVarIO (sock bus)
for_ mso $ \so -> do
sendAllTo so (LBS.toStrict msg) (view sockAddr whom) sendAllTo so (LBS.toStrict msg) (view sockAddr whom)
receive bus _ = liftIO do receive bus _ = liftIO do

View File

@ -122,11 +122,11 @@ import System.Metrics
import System.Posix.Process import System.Posix.Process
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
import UnliftIO (MonadUnliftIO(..))
import UnliftIO.Exception qualified as U import UnliftIO.Exception qualified as U
-- import UnliftIO.STM -- import UnliftIO.STM
import UnliftIO.Async import UnliftIO.Async
import Control.Monad.Trans.Resource
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
import Graphics.Vty qualified as Vty import Graphics.Vty qualified as Vty
@ -713,7 +713,7 @@ runPeer :: forall e s . ( e ~ L4Proto
, HasStorage (PeerM e IO) , HasStorage (PeerM e IO)
)=> PeerOpts -> IO () )=> PeerOpts -> IO ()
runPeer opts = respawnOnError opts $ runResourceT do runPeer opts = respawnOnError opts $ do
probes <- liftIO $ newTVarIO (mempty :: [AnyProbe]) probes <- liftIO $ newTVarIO (mempty :: [AnyProbe])
@ -842,7 +842,7 @@ runPeer opts = respawnOnError opts $ runResourceT do
udpPoint <- runMaybeT do udpPoint <- runMaybeT do
sa <- toMPlus listenSa sa <- toMPlus listenSa
env <- toMPlus =<< newMessagingUDP False (Just sa) env <- toMPlus =<< liftIO (newMessagingUDP False (Just sa))
void $ liftIO ( async do void $ liftIO ( async do
runMessagingUDP env runMessagingUDP env
@ -1328,8 +1328,8 @@ runPeer opts = respawnOnError opts $ runResourceT do
rpcProto <- async $ flip runReaderT rpcctx do rpcProto <- async $ flip runReaderT rpcctx do
env <- newNotifyEnvServer @(RefChanEvents L4Proto) refChanNotifySource env <- newNotifyEnvServer @(RefChanEvents L4Proto) refChanNotifySource
envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource
w1 <- async $ runNotifyWorkerServer env w1 <- asyncLinked $ runNotifyWorkerServer env
w2 <- async $ runNotifyWorkerServer envrl w2 <- asyncLinked $ runNotifyWorkerServer envrl
wws <- replicateM 1 $ async $ runProto @UNIX wws <- replicateM 1 $ async $ runProto @UNIX
[ makeResponse (makeServer @PeerAPI) [ makeResponse (makeServer @PeerAPI)
, makeResponse (makeServer @RefLogAPI) , makeResponse (makeServer @RefLogAPI)
@ -1358,7 +1358,7 @@ runPeer opts = respawnOnError opts $ runResourceT do
pause @'Seconds 1 pause @'Seconds 1
-- we want to clean up all resources -- we want to clean up all resources
throwM GoAgainException throwIO GoAgainException
emitToPeer :: ( MonadIO m emitToPeer :: ( MonadIO m
, EventEmitter e a (PeerM e IO) , EventEmitter e a (PeerM e IO)

View File

@ -5,26 +5,29 @@ import HBS2.Prelude.Plated
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Net.Messaging.UDP import HBS2.Net.Messaging.UDP
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Misc.PrettyStuff
import HBS2.OrDie import HBS2.OrDie
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Cont
import Control.Concurrent.STM (retry)
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Prettyprinter import Prettyprinter
import System.IO import System.IO (hPrint)
import Lens.Micro.Platform import Lens.Micro.Platform
import Codec.Serialise import Codec.Serialise
-- import Control.Concurrent.Async -- import Control.Concurrent.Async
import Control.Monad.Trans.Resource import UnliftIO
import UnliftIO.Async import UnliftIO.Async
import UnliftIO.STM
type UDP = L4Proto type UDP = L4Proto
debug :: (MonadIO m) => Doc ann -> m () debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p debug p = liftIO $ hPrint stderr p
data PingPong e = Ping Int data PingPong e = Ping Int
| Pong Int | Pong Int
deriving stock (Eq,Generic,Show,Read) deriving stock (Eq,Generic,Show,Read)
@ -43,15 +46,20 @@ pingPongHandler :: forall e m . ( MonadIO m
, Response e (PingPong e) m , Response e (PingPong e) m
, HasProtocol e (PingPong e) , HasProtocol e (PingPong e)
) )
=> Int => TVar Int
-> Int
-> PingPong e -> PingPong e
-> m () -> m ()
pingPongHandler n = \case pingPongHandler tv n = \case
Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c) Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c)
Pong c | c < n -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) Pong c | c < n -> do
debug ("Pong" <+> pretty c)
liftIO $ atomically $ writeTVar tv c
response (Ping @e (succ c))
| otherwise -> pure () | otherwise -> pure ()
data PPEnv = data PPEnv =
@ -84,28 +92,58 @@ instance HasTimeLimits UDP (PingPong UDP) IO where
tryLockForPeriod _ _ = pure True tryLockForPeriod _ _ = pure True
main :: IO () main :: IO ()
main = runResourceT do main = do
liftIO $ hSetBuffering stdout LineBuffering liftIO $ hSetBuffering stdout LineBuffering
liftIO $ hSetBuffering stderr LineBuffering liftIO $ hSetBuffering stderr LineBuffering
let tries = 1000
replicateM_ 10 do
udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001"
udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002"
m1 <- async $ runMessagingUDP udp1 void $ flip runContT pure do
m2 <- async $ runMessagingUDP udp2
p1 <- async $ runPingPong udp1 do m1 <- ContT $ withAsync $ runMessagingUDP udp1
m2 <- ContT $ withAsync $ runMessagingUDP udp2
tping1 <- newTVarIO 0
tping2 <- newTVarIO 0
pause @'Seconds 0.01
p1 <- ContT $ withAsync $ runPingPong udp1 do
request (getOwnPeer udp2) (Ping @UDP 0) request (getOwnPeer udp2) (Ping @UDP 0)
runProto @UDP runProto @UDP
[ makeResponse (pingPongHandler 3) [ makeResponse (pingPongHandler tping1 tries)
] ]
p2 <- async $ runPingPong udp2 do p2 <- ContT $ withAsync $ runPingPong udp2 do
-- request (getOwnPeer udp1) (Ping @UDP 0) -- request (getOwnPeer udp1) (Ping @UDP 0)
runProto @UDP runProto @UDP
[ makeResponse (pingPongHandler 3) [ makeResponse (pingPongHandler tping2 tries)
] ]
mapM_ wait [p1,p2,m1,m2] r <- liftIO $ race (pause @'Seconds 2) do
atomically do
r1 <- readTVar tping1
r2 <- readTVar tping2
if (max r1 r2) >= (tries-1) then pure () else retry
let done = either (const "fail") (const "okay") r
v1 <- readTVarIO tping1
v2 <- readTVarIO tping2
liftIO $ hPrint stdout $ pretty "finished" <+> pretty done <+> pretty (max v1 v2)
mapM_ cancel [m1,m2,p1,p2]
c1 <- liftIO $ isUDPSocketClosed udp1
c2 <- liftIO $ isUDPSocketClosed udp2
liftIO $ hPrint stdout $ pretty "socket1 closed" <+> pretty c1
liftIO $ hPrint stdout $ pretty "socket2 closed" <+> pretty c2

View File

@ -0,0 +1,31 @@
(root
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True)
(finished okay 999)
(socket1 closed True)
(socket2 closed True))