From 902125da7564689b732cef821e4d7562dcafa016 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 28 Sep 2023 05:23:41 +0300 Subject: [PATCH] unix sockets to support multiple clients --- docs/notes/git-encryption.txt | 20 +++ hbs2-core/lib/HBS2/Actors/Peer.hs | 2 +- hbs2-core/lib/HBS2/Net/Messaging/Unix.hs | 188 +++++++++++++++++------ hbs2-tests/test/TestUNIX.hs | 135 +++++++++++++--- 4 files changed, 273 insertions(+), 72 deletions(-) diff --git a/docs/notes/git-encryption.txt b/docs/notes/git-encryption.txt index 687c9480..68cb01fa 100644 --- a/docs/notes/git-encryption.txt +++ b/docs/notes/git-encryption.txt @@ -1,3 +1,23 @@ TODO: test-unix-sockets-multiple-clients Протестировать, работают ли Unix сокеты в режиме с многими клиентами. + +TODO: hbs2-peer-storage-unix-socket-rpc + RPC специфичное для операций со сторейджем. + + Команды соответствуют методам Storage. + + Синхронная обёртка, т.е некий тайпкласс, + который говорит, как в команды протокола + добавлять уникальный нонс, и обёртка, + которая после отправки запроса ждёт + ответа с заданным нонсом. + + Хорошо бы типизировать, т.е с одной стороны + не хочется делать разные типы, это вроде бы + один протокол. + + С другой стороны, непонятно, как это + типизировать тогда. + + diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index ac81b2c2..60dff981 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -308,7 +308,7 @@ instance ( MonadIO m when allowed do sendTo pipe (To peer_e) (From me) (AnyMessage @(Encoded e) @e proto (encode msg)) - trace $ "REQUEST: after sendTo" <+> viaShow peer_e + -- trace $ "REQUEST: after sendTo" <+> viaShow peer_e instance ( Typeable (EventHandler e p (PeerM e IO)) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index dcf2e655..60ef43ca 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -1,3 +1,4 @@ +{-# Language TemplateHaskell #-} module HBS2.Net.Messaging.Unix where import HBS2.Prelude.Plated @@ -14,6 +15,8 @@ import Data.ByteString.Lazy qualified as LBS import Data.Function import Data.Functor import Data.Hashable +import Data.HashMap.Strict qualified as HashMap +import Data.HashMap.Strict (HashMap) import Data.List qualified as List import Network.ByteOrder hiding (ByteString) import Network.Socket @@ -21,11 +24,28 @@ import Network.Socket.ByteString import Control.Concurrent.STM.TQueue (flushTQueue) import Data.Set (Set) import Data.Set qualified as Set +import Lens.Micro.Platform import UnliftIO +import Control.Concurrent (myThreadId) + data UNIX = UNIX deriving (Eq,Ord,Show,Generic) +type PeerUnixAddr = String + +instance HasPeer UNIX where + newtype instance Peer UNIX = PeerUNIX { _fromPeerUnix :: PeerUnixAddr} + deriving stock (Eq,Ord,Show,Generic) + deriving newtype (Pretty) + + +instance IsString (Peer UNIX) where + fromString = PeerUNIX + +instance Hashable (Peer UNIX) where + hashWithSalt salt (PeerUNIX p) = hashWithSalt salt p + {- HLINT ignore "Use newtype instead of data" -} data MessagingUnixOpts = MUWatchdog Int @@ -40,13 +60,14 @@ data MessagingUnix = , msgUnixRetryTime :: Timeout 'Seconds , msgUnixSelf :: Peer UNIX , msgUnixOpts :: Set MessagingUnixOpts - , msgUnixInbox :: TQueue ByteString + , msgUnixSendTo :: TVar (HashMap (Peer UNIX) (TQueue ByteString)) , msgUnixRecv :: TQueue (From UNIX, ByteString) , msgUnixLast :: TVar TimeSpec , msgUnixAccepts :: TVar Int + , msgSockets :: TVar (HashMap (Peer UNIX) Socket) } - +makeLenses 'PeerUNIX newMessagingUnix :: MonadIO m => Bool @@ -57,7 +78,7 @@ newMessagingUnix :: MonadIO m newMessagingUnix server tsec path = do newMessagingUnixOpts mempty server tsec path -newMessagingUnixOpts :: MonadIO m +newMessagingUnixOpts :: (MonadIO m) => [MessagingUnixOpts] -> Bool -> Timeout 'Seconds @@ -65,39 +86,23 @@ newMessagingUnixOpts :: MonadIO m -> m MessagingUnix newMessagingUnixOpts opts server tsec path = do - let sa = SockAddrUnix path now <- getTimeCoarse MessagingUnix path server tsec - (PeerUNIX sa) + (PeerUNIX path) (Set.fromList opts) - <$> liftIO newTQueueIO + <$> liftIO (newTVarIO mempty) <*> liftIO newTQueueIO <*> liftIO (newTVarIO now) <*> liftIO (newTVarIO 0) - -instance HasPeer UNIX where - newtype instance Peer UNIX = PeerUNIX {fromPeerUnix :: SockAddr} - deriving stock (Eq,Ord,Show,Generic) - deriving newtype (Pretty) - -instance IsString (Peer UNIX) where - fromString p = PeerUNIX (SockAddrUnix p) - --- FIXME: fix-code-dup? -instance Hashable (Peer UNIX) where - hashWithSalt salt p = case fromPeerUnix p of - SockAddrInet pn h -> hashWithSalt salt (4, fromIntegral pn, h) - SockAddrInet6 pn _ h _ -> hashWithSalt salt (6, fromIntegral pn, h) - SockAddrUnix s -> hashWithSalt salt ("unix", s) + <*> liftIO (newTVarIO mempty) data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable) instance Exception ReadTimeoutException - runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m () runMessagingUnix env = do @@ -118,12 +123,9 @@ runMessagingUnix env = do void $ allocate (pure sock) (`shutdown` ShutdownBoth) liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) - liftIO $ listen sock 1 + liftIO $ listen sock 5 - let doFork = Set.member MUFork (msgUnixOpts env) - - let withSession | doFork = void . async . runResourceT - | otherwise = void . runResourceT + let withSession = void . async . runResourceT watchdog <- async $ do @@ -149,27 +151,52 @@ runMessagingUnix env = do run <- async $ forever $ runResourceT do (so, sa) <- liftIO $ accept sock - atomically $ modifyTVar (msgUnixAccepts env) succ + + -- FIXME: fixing-unix-sockets + -- Вот тут: нумеруем клиентов, в PeerAddr ставим + -- строку или номер. + + peerNum <- atomically $ do + n <- readTVar (msgUnixAccepts env) + modifyTVar (msgUnixAccepts env) succ + pure n withSession do + ti <- liftIO myThreadId + + let that = msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum) + + void $ allocate ( createQueues env that ) dropQueuesFor + void $ allocate (pure so) close writer <- async $ forever do - msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) - let len = fromIntegral $ LBS.length msg :: Int - liftIO $ sendAll so $ bytestring32 (fromIntegral len) - liftIO $ sendAll so $ LBS.toStrict msg + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that + + maybe1 mq none $ \q -> do + msg <- liftIO . atomically $ readTQueue q + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll so $ bytestring32 (fromIntegral len) + liftIO $ sendAll so $ LBS.toStrict msg void $ allocate (pure writer) cancel link writer fix \next -> do - -- FIXME: timeout-hardcode + me <- liftIO myThreadId + + let mq = Just (msgUnixRecv env) + frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral frame <- liftIO $ recv so frameLen - atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame) + + let s = if msgUnixServer env then "S-" else "C-" + + maybe1 mq none $ \q -> do + atomically $ writeTQueue q (From that, LBS.fromStrict frame) + now <- getTimeCoarse atomically $ writeTVar (msgUnixLast env) now next @@ -183,12 +210,16 @@ runMessagingUnix env = do runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do + let sa = SockAddrUnix (msgUnixSockPath env) + let p = msgUnixSockPath env + let who = PeerUNIX p + + createQueues env who + sock <- liftIO $ socket AF_UNIX Stream defaultProtocol void $ allocate (pure sock) close - let sa = SockAddrUnix (msgUnixSockPath env) - let attemptConnect = do result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env) case result of @@ -200,17 +231,32 @@ runMessagingUnix env = do attemptConnect - reader <- async $ forever do + -- TODO: create-queues! + + reader <- async $ do + forever do + let q = msgUnixRecv env + -- Read response from server - frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral - frame <- liftIO $ recv sock frameLen - atomically $ writeTQueue (msgUnixRecv env) (From (PeerUNIX sa), LBS.fromStrict frame) + frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral + frame <- liftIO $ recv sock frameLen + + -- сообщения кому? **МНЕ** + -- сообщения от кого? от **КОГО-ТО** + atomically $ writeTQueue q (From who, LBS.fromStrict frame) forever do - msg <- liftIO . atomically $ readTQueue (msgUnixInbox env) - let len = fromIntegral $ LBS.length msg :: Int - liftIO $ sendAll sock $ bytestring32 (fromIntegral len) - liftIO $ sendAll sock $ LBS.toStrict msg + + -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. + -- У нас один контрагент, имя сокета (файла) == адрес пира. + -- Как в TCP порт сервиса (а отвечает тот с другого порта) + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who + + maybe1 mq none $ \q -> do + msg <- liftIO . atomically $ readTQueue q + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll sock $ bytestring32 (fromIntegral len) + liftIO $ sendAll sock $ LBS.toStrict msg void $ waitAnyCatchCancel [reader] @@ -218,22 +264,68 @@ runMessagingUnix env = do warn $ "MessagingUnix. client seems gone. restaring server" <+> pretty (msgUnixSelf env) err (viaShow e) atomically $ writeTVar (msgUnixAccepts env) 0 - liftIO $ atomically $ void $ flushTQueue (msgUnixInbox env) + liftIO $ atomically $ void $ flushTQueue (msgUnixRecv env) + + dropQueues + pause (msgUnixRetryTime env) logAndRetry :: SomeException -> IO () logAndRetry e = do warn $ "MessagingUnix. runClient failed, probably server is gone. Retrying:" <+> pretty (msgUnixSelf env) err (viaShow e) + dropQueues pause (msgUnixRetryTime env) + dropQueues :: MonadIO m => m () + dropQueues = do + -- liftIO $ atomically $ modifyTVar (msgUnixRecvFrom env) mempty + liftIO $ atomically $ modifyTVar (msgUnixSendTo env) mempty + -- мы не дропаем обратную очередь (принятые сообщения), потому, + -- что нет смысла. она живёт столько, сколько живёт клиент + -- очередь отправки мы удаляем, потому, что этого клиента + -- мы больше никогда не увидим, ведь они разделяются на уровне + -- сокетов и больше никак. + + dropQueuesFor :: MonadIO m => Peer UNIX -> m () + dropQueuesFor who = liftIO do + atomically do + modifyTVar (msgUnixSendTo env) (HashMap.delete who) + -- modifyTVar (msgUnixRecvFrom env) (HashMap.delete who) + +createQueues :: MonadIO m => MessagingUnix -> Peer UNIX -> m (Peer UNIX) +createQueues env who = liftIO do + atomically $ do + + sHere <- readTVar (msgUnixSendTo env) <&> HashMap.member who + + if sHere then do + pure False + else do + sendToQ <- newTQueue + modifyTVar (msgUnixSendTo env) (HashMap.insert who sendToQ) + pure True + + pure who instance Messaging MessagingUnix UNIX ByteString where - sendTo bus (To _) _ msg = liftIO do - atomically $ writeTQueue (msgUnixInbox bus) msg + sendTo bus (To who) (From me) msg = liftIO do + + createQueues bus who + + -- FIXME: handle-no-queue-for-rcpt-situation-1 + + mq <- atomically $ readTVar (msgUnixSendTo bus) <&> HashMap.lookup who + + maybe1 mq none $ \q -> do + atomically $ writeTQueue q msg receive bus _ = liftIO do - atomically $ readTQueue (msgUnixRecv bus) <&> List.singleton + let q = msgUnixRecv bus + atomically $ peekTQueue q >> flushTQueue q + + + diff --git a/hbs2-tests/test/TestUNIX.hs b/hbs2-tests/test/TestUNIX.hs index ee168fb7..1c80baae 100644 --- a/hbs2-tests/test/TestUNIX.hs +++ b/hbs2-tests/test/TestUNIX.hs @@ -8,6 +8,8 @@ import HBS2.Net.Messaging.Unix import HBS2.Actors.Peer import HBS2.OrDie +import HBS2.System.Logger.Simple + import Codec.Serialise import Control.Monad.Reader import Control.Monad.Trans.Resource @@ -18,11 +20,8 @@ import System.FilePath.Posix import System.IO import System.IO.Temp import UnliftIO.Async - - -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p - +import UnliftIO qualified as UIO +import UnliftIO (TVar) data PingPong e = Ping Int | Pong Int @@ -38,20 +37,72 @@ instance HasProtocol UNIX (PingPong UNIX) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -pingPongHandler :: forall e m . ( MonadIO m - , Response e (PingPong e) m - , HasProtocol e (PingPong e) - ) - => Int +pingPongHandlerS :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + , Pretty (Peer e) + ) + => TVar [(Peer e, PingPong e)] + -> Int -> PingPong e -> m () -pingPongHandler n = \case +pingPongHandlerS tv n msg = do - Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c) + that <- thatPeer (Proxy @(PingPong e)) + + UIO.atomically $ UIO.modifyTVar tv ((that,msg):) + + case msg of + + Ping c -> do + debug ("S: Ping" <+> pretty c <+> "from" <+> pretty that ) >> response (Pong @e c) + + Pong _ -> pure () + +pingPongHandler1 :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + ) + => TVar [PingPong e] + -> Int + -> PingPong e + -> m () + +pingPongHandler1 t n msg = do + + UIO.atomically $ UIO.modifyTVar t (msg:) + + case msg of + + Ping c -> pure () + Pong c -> pure () + + -- Pong c | c < n -> debug ("C1: Pong" <+> pretty c) >> response (Ping @e (succ c)) + -- | otherwise -> pure () + + +pingPongHandler2 :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + ) + => TVar [PingPong e] + -> Int + -> PingPong e + -> m () + +pingPongHandler2 t n msg = do + + UIO.atomically $ UIO.modifyTVar t (msg:) + + case msg of + + Ping c -> pure () + Pong c -> pure () + + -- Pong c | c < n -> debug ("C2: Pong" <+> pretty c) >> response (Ping @e (succ c)) + -- | otherwise -> pure () - Pong c | c < n -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) - | otherwise -> pure () data PPEnv = PPEnv @@ -66,6 +117,7 @@ newtype PingPongM m a = PingPongM { fromPingPong :: ReaderT PPEnv m a } , Applicative , Monad , MonadIO + , MonadUnliftIO , MonadReader PPEnv , MonadTrans ) @@ -84,6 +136,14 @@ instance HasTimeLimits UNIX (PingPong UNIX) IO where main :: IO () main = do + + setLogging @DEBUG (logPrefix "[debug] ") + setLogging @INFO (logPrefix "") + setLogging @ERROR (logPrefix "[err] ") + setLogging @WARN (logPrefix "[warn] ") + setLogging @NOTICE (logPrefix "[notice] ") + setLogging @TRACE (logPrefix "[trace] ") + liftIO $ hSetBuffering stdout LineBuffering liftIO $ hSetBuffering stderr LineBuffering @@ -91,25 +151,54 @@ main = do let soname = tmp "unix.socket" - server <- newMessagingUnix True 1.0 soname + server <- newMessagingUnixOpts [MUFork] True 1.0 soname - client <- newMessagingUnix False 1.0 soname + client1 <- newMessagingUnix False 1.0 soname + client2 <- newMessagingUnix False 1.0 soname m1 <- async $ runMessagingUnix server - m2 <- async $ runMessagingUnix client + m2 <- async $ runMessagingUnix client1 + m3 <- async $ runMessagingUnix client2 + + trs <- UIO.newTVarIO [] + tr1 <- UIO.newTVarIO [] + tr2 <- UIO.newTVarIO [] p1 <- async $ runPingPong server do runProto @UNIX - [ makeResponse (pingPongHandler 100000) + [ makeResponse (pingPongHandlerS trs 2) ] - p2 <- async $ runPingPong client do - request (msgUnixSelf server) (Ping @UNIX 0) - runProto @UNIX - [ makeResponse (pingPongHandler 100000) + -- p2 <- async $ pause @'Seconds 300 + p2 <- async $ runPingPong client1 do + -- pause @'Seconds 0.25 + -- request (msgUnixSelf server) (Ping @UNIX 0) + l <- async $ runProto @UNIX + [ makeResponse (pingPongHandler1 tr1 10) ] + link l + forM_ [1..10] $ \n-> request (msgUnixSelf server) (Ping @UNIX n) + wait l - (_,r) <- liftIO $ waitAnyCatchCancel [m1,m2,p1,p2] + -- p3 <- async $ pause @'Seconds 300 + p3 <- async $ do + runPingPong client2 do + l <- async $ runProto @UNIX + [ makeResponse (pingPongHandler2 tr2 200) + ] + link l + forM_ (take 10 [10000000..]) $ \n-> request (msgUnixSelf server) (Ping @UNIX n) + wait l + + -- p4 <- async do + pause @'Seconds 10 + UIO.readTVarIO trs >>= print . vcat . fmap (\(a,b) -> pretty (a, show b)) + UIO.readTVarIO tr1 >>= print + UIO.readTVarIO tr2 >>= print + + cancel m1 + + (_,r) <- liftIO $ waitAnyCatchCancel [m1,m2,m3,p1,p2,p3] debug (viaShow r)