diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index 951c0931..9a33bfbd 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -8,6 +8,7 @@ module HBS2.Net.Messaging.Unix ) where import HBS2.Prelude.Plated +import HBS2.System.Dir import HBS2.Net.Proto.Types import HBS2.Actors.Peer.Types import HBS2.Net.Messaging @@ -23,6 +24,7 @@ import Data.ByteString.Lazy qualified as LBS import Data.Hashable import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict (HashMap) +import Data.Maybe import Network.ByteOrder hiding (ByteString) import Network.Socket import Network.Socket.ByteString hiding (sendTo) @@ -33,6 +35,7 @@ import Data.Set qualified as Set import Lens.Micro.Platform import Control.Monad.Trans.Cont import UnliftIO +import Control.Concurrent.STM (retry) data UNIX = UNIX deriving (Eq,Ord,Show,Generic) @@ -53,10 +56,8 @@ instance Hashable (Peer UNIX) where {- HLINT ignore "Use newtype instead of data" -} data MessagingUnixOpts = - MUWatchdog Int - | MUNoFork + MUNoFork | MUDontRetry - | MUKeepAlive Int deriving (Eq,Ord,Show,Generic,Data) -- TODO: counters-to-detect-zombies @@ -83,7 +84,6 @@ data MessagingUnix = , msgUnixRecv :: TQueue (From UNIX, ByteString) , msgUnixLast :: TVar TimeSpec , msgUnixAccepts :: TVar Int - , msgSockets :: TVar (HashMap (Peer UNIX) Socket) } makeLenses 'PeerUNIX @@ -115,8 +115,6 @@ newMessagingUnixOpts opts server tsec path = do <*> liftIO newTQueueIO <*> liftIO (newTVarIO now) <*> liftIO (newTVarIO 0) - <*> liftIO (newTVarIO mempty) - data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable) @@ -157,13 +155,20 @@ runMessagingUnix env = do let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol let closeSock = liftIO . close + touch (msgUnixSockPath env) + sock <- ContT $ bracket openSock closeSock _ <- ContT $ bracket (pure forked) $ \clients -> do readTVarIO clients >>= mapM_ cancel liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) - liftIO $ listen sock 5 + liftIO $ listen sock 1024 + + void $ ContT $ withAsync do + pause @'Seconds 5 + readTVarIO forked >>= filterM (fmap isNothing . poll) + >>= atomically . writeTVar forked forever do (so, _sa) <- liftIO $ accept sock @@ -182,20 +187,23 @@ runMessagingUnix env = do else msgUnixSelf env - let writer = liftIO $ async $ pause @'Seconds 0.001 >> forever do - mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that + let writer = liftIO $ async do + -- FIXME: check! + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that + for_ mq $ \q -> do - maybe1 mq none $ \q -> do - msg <- liftIO . atomically $ readTQueue q + forever do - let len = fromIntegral $ LBS.length msg :: Int - let _bs = bytestring32 (fromIntegral len) + msg <- liftIO . atomically $ readTQueue q - liftIO $ sendAll so $ bytestring32 (fromIntegral len) + let len = fromIntegral $ LBS.length msg :: Int + let _bs = bytestring32 (fromIntegral len) - -- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs + liftIO $ sendAll so $ bytestring32 (fromIntegral len) - liftIO $ SL.sendAll so msg + -- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs + + liftIO $ sendAll so (LBS.toStrict msg) void $ ContT $ bracket ( pure so ) closeSock @@ -204,7 +212,7 @@ runMessagingUnix env = do void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that ) ( \_ -> debug $ "Client thread finished" <+> pretty that ) - void $ ContT $ bracket writer cancel + void $ ContT $ bracket writer (\x -> pause @'Seconds 0.1 >> cancel x) fix \next -> do @@ -225,8 +233,10 @@ runMessagingUnix env = do now <- getTimeCoarse -- TODO: to-remove-global-watchdog - atomically $ writeTVar (msgUnixLast env) now - atomically $ writeTVar seen now + atomically do + writeTVar (msgUnixLast env) now + writeTVar seen now + next @@ -255,21 +265,45 @@ runMessagingUnix env = do sock <- ContT $ bracket openSock closeSock - void $ ContT $ bracket (createQueues env who) dropQueuesFor + sockReady <- newTVarIO False + void $ ContT $ bracket (createQueues env who) dropQueuesFor let attemptConnect = do result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env) case result of - Right _ -> none + Right _ -> do + atomically $ writeTVar sockReady True + Left (e :: SomeException) -> do - pause (msgUnixRetryTime env) warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e - pause @'Seconds 2.5 + pause (msgUnixRetryTime env) attemptConnect attemptConnect + writer <- ContT $ liftIO . withAsync do + + forever do + + atomically do + readTVar sockReady `orElse` retry + + -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. + -- У нас один контрагент, имя сокета (файла) == адрес пира. + -- Как в TCP порт сервиса (а отвечает тот с другого порта) + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who + + maybe1 mq (err "unix: no queue!") $ \q -> do + -- если WD установлен, то просыпаемся, скажем, wd/2 и + -- шлём пустую строку серверу + -- withWD do + -- debug "FUCKING SEND" + msg <- liftIO $ atomically $ readTQueue q + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll sock ( bytestring32 (fromIntegral len) <> LBS.toStrict msg) + + reader <- ContT $ liftIO . withAsync do forever do let q = msgUnixRecv env @@ -286,57 +320,7 @@ runMessagingUnix env = do -- сообщения от кого? от **КОГО-ТО** atomically $ writeTQueue q (From who, frame) - watchdog <- ContT $ liftIO . withAsync do - let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] - case mwd of - Nothing -> forever (pause @'Seconds 600) - Just n -> forever do - - pause (TimeoutSec (realToFrac n)) - - now <- getTimeCoarse - seen <- readTVarIO tseen - - let diff = toNanoSeconds $ TimeoutTS (now - seen) - - trace $ "I'm a watchdog!" <+> pretty diff - - when ( diff > toNanoSeconds (TimeoutSec $ realToFrac n) ) do - trace "watchdog fired!" - throwIO ReadTimeoutException - - - writer <- ContT $ liftIO . withAsync do - - let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] - - let withWD m = case mwd of - Nothing -> m - Just n -> do - let nwait = max 1 (realToFrac n * 0.7) - e <- race (pause (TimeoutSec nwait)) m - case e of - Right{} -> pure () - Left{} -> do - liftIO $ sendAll sock $ bytestring32 0 - -- liftIO $ SL.sendAll sock "" - - forever do - - -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. - -- У нас один контрагент, имя сокета (файла) == адрес пира. - -- Как в TCP порт сервиса (а отвечает тот с другого порта) - mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who - - maybe1 mq none $ \q -> do - -- если WD установлен, то просыпаемся, скажем, wd/2 и - -- шлём пустую строку серверу - withWD do - msg <- liftIO $ atomically $ readTQueue q - let len = fromIntegral $ LBS.length msg :: Int - liftIO $ SL.sendAll sock $ ( LBS.fromStrict (bytestring32 (fromIntegral len)) <> msg) - - r <- waitAnyCatchCancel [reader, writer, watchdog] + r <- waitAnyCatchCancel [reader, writer] case snd r of Right{} -> pure () diff --git a/hbs2-core/lib/HBS2/Net/Proto/Service.hs b/hbs2-core/lib/HBS2/Net/Proto/Service.hs index cdd6e825..0e4a7f34 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Service.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Service.hs @@ -21,6 +21,7 @@ import Data.Kind import Data.List qualified as List import GHC.TypeLits -- import Lens.Micro.Platform +import UnliftIO import UnliftIO.Async import UnliftIO qualified as UIO import UnliftIO (TVar,TQueue,atomically) @@ -28,7 +29,9 @@ import System.Random (randomIO) import Data.Word import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap -import Control.Exception (bracket_) +-- import Control.Exception (bracket_) +import Control.Monad.Trans.Cont +import System.IO.Unsafe (unsafePerformIO) type family Input a :: Type type family Output a :: Type @@ -117,13 +120,21 @@ makeRequest rnum input = ServiceRequest rnum (serialise (fromIntegral idx :: Int where idx = findMethodIndex @method @api +rnumnum :: TVar Word64 +rnumnum = unsafePerformIO (newTVarIO 1) +{-# NOINLINE rnumnum #-} + makeRequestR :: forall api method e m . ( KnownNat (FromJust (FindMethodIndex 0 method api)) , Serialise (Input method) , MonadIO m ) => Input method -> m (ServiceProto api e) makeRequestR input = do - rnum <- liftIO $ randomIO + rnum <- atomically do + n <- readTVar rnumnum + modifyTVar rnumnum succ + pure n + pure $ ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input)) where idx = findMethodIndex @method @api @@ -172,13 +183,12 @@ runServiceClient :: forall api e m . ( MonadIO m -> m () runServiceClient caller = do - proto <- async $ runProto @e [ makeResponse (makeClient @api caller) ] - link proto - forever do - req <- getRequest caller - request @e (callPeer caller) req - - wait proto + flip runContT pure do + p <- ContT $ withAsync (runProto @e [ makeResponse (makeClient @api caller) ]) + link p + forever $ lift do + req <- getRequest caller + request @e (callPeer caller) req data Endpoint e m = forall (api :: [Type]) . ( HasProtocol e (ServiceProto api e) , HasTimeLimits e (ServiceProto api e) m @@ -276,11 +286,12 @@ callRpcWaitMay :: forall method (api :: [Type]) m e proto t . ( MonadUnliftIO m -> m (Maybe (Output method)) callRpcWaitMay t caller args = do - race (pause t) (callService @method @api @e @m caller args) - >>= \case - Right (Right x) -> pure (Just x) - _ -> pure Nothing - + flip fix 0 $ \next i -> do + race (pause t) (callService @method @api @e @m caller args) + >>= \case + Right (Right x) -> pure (Just x) + -- _ | i < 1 -> next (succ i) + _ -> pure Nothing makeClient :: forall api e m . ( MonadIO m , HasProtocol e (ServiceProto api e) diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 8ee4bf42..7b9521ff 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1144,3 +1144,19 @@ test-suite test-walk-merkle-conditional hs-source-dirs: test main-is: TestWalkMerkleConditional.hs build-depends: hbs2-tests + + +executable test-unix-messaging + import: shared-properties + import: common-deps + default-language: Haskell2010 + ghc-options: + hs-source-dirs: test + main-is: CheckUnixMessaging.hs + build-depends: + base, hbs2-core + , network + , string-conversions + , text + , time + diff --git a/hbs2-tests/test/CheckUnixMessaging.hs b/hbs2-tests/test/CheckUnixMessaging.hs new file mode 100644 index 00000000..c9acf5b8 --- /dev/null +++ b/hbs2-tests/test/CheckUnixMessaging.hs @@ -0,0 +1,138 @@ +{-# LANGUAGE ImportQualifiedPost #-} + +-- May develop an run it with command: +-- ``` +-- nix develop -c ghcid -c "cabal repl" hbs2-tests:test-unix-messaging -r=Main.main +-- ``` + +module Main where + +import Codec.Serialise +import Control.Monad +import Control.Monad.Fix +import Control.Monad.Reader hiding (reader) +import Control.Monad.Trans.Cont +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Lazy (ByteString) +import Data.Hashable +import Data.Set (Set) +import Data.Set qualified as Set +import Data.String.Conversions (cs) +import Data.Text (Text) +import Data.Text.Encoding qualified as TE +import Data.Time +import Lens.Micro.Platform +import Network.Socket +import Network.Socket.ByteString hiding (sendTo) +import Network.Socket.ByteString.Lazy qualified as SL +import UnliftIO +import UnliftIO.Async +import UnliftIO.Concurrent + +import HBS2.OrDie +import HBS2.Actors.Peer +import HBS2.Base58 +import HBS2.Data.Types.Refs +import HBS2.Merkle +import HBS2.Net.Messaging.Unix +import HBS2.Net.Proto +import HBS2.Net.Proto.Service +import HBS2.Net.Proto.Service hiding (decode, encode) +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple.ANSI +import HBS2.Storage + +soname = "/tmp/hbs2-dev.sock" + +data EchoH + +type DevAPI = '[EchoH] + +instance HasProtocol UNIX (ServiceProto DevAPI UNIX) where + type ProtocolId (ServiceProto DevAPI UNIX) = 0xDE50000 + type Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +type instance Input EchoH = Text +type instance Output EchoH = Either Text Text + +sayt :: (MonadIO m) => Text -> m () +sayt = liftIO . BS8.putStrLn . TE.encodeUtf8 + +instance (MonadIO m) => HandleMethod m EchoH where + handleMethod msg = do + now <- liftIO getCurrentTime + -- threadDelay (10 ^ 5) + let resp = (cs . show) now <> " " <> msg + -- sayt $ "Got request: " <> resp + pure $ Right $ resp + +instance + (MonadUnliftIO m) + => HasDeferred (ServiceProto DevAPI UNIX) UNIX m + where + deferred m = void (async m) + +withServer :: (() -> IO r) -> IO r +withServer = runContT do + server <- newMessagingUnixOpts [] True 0.10 soname + (link <=< ContT . withAsync) do + runMessagingUnix server + (link <=< ContT . withAsync) do + flip runReaderT server do + runProto @UNIX + [ makeResponse (makeServer @DevAPI) + ] + +withClient :: (ServiceCaller DevAPI UNIX -> IO r) -> IO r +withClient = runContT do + client <- newMessagingUnixOpts [] False 0.15 soname + (link <=< ContT . withAsync) do + runMessagingUnix client + caller <- makeServiceCaller @DevAPI @UNIX (fromString soname) + (link <=< ContT . withAsync) do + liftIO $ runReaderT (runServiceClient @DevAPI @UNIX caller) client + pure caller + +main :: IO () +main = do + + setLogging @ERROR $ toStderr . logPrefix "[error] " + setLogging @WARN $ toStderr . logPrefix "[warn] " + setLogging @NOTICE $ toStdout . logPrefix "" + setLogging @DEBUG $ toStderr . logPrefix "[debug] " + + totfuck <- newTVarIO 0 + + flip runContT pure do + void $ ContT withServer + -- pause @'Seconds 1 + s <- replicateM 16 $ lift $ async do + void $ flip runContT pure do + caller <- ContT withClient + tsucc <- newTVarIO 0 + tfail <- newTVarIO 0 + for_ [1..1000] $ \i -> do + lift (callRpcWaitMay @EchoH (TimeoutSec 2) caller ((cs . show) i)) + >>= \case + Just (Right _) -> atomically $ modifyTVar tsucc succ + e -> atomically (modifyTVar tfail succ) >> err (viaShow e) + + ok <- readTVarIO tsucc + fuck <- readTVarIO tfail + atomically $ modifyTVar totfuck (+fuck) + notice $ "Finished:" <+> "succeed" <+> pretty ok <+> "failed" <+> pretty fuck + + mapM_ wait s + + tf <- readTVarIO totfuck + + notice $ "total errors" <+> pretty tf + + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @DEBUG + +