fixed Unix.hs

This commit is contained in:
voidlizard 2024-10-25 12:50:08 +03:00
parent 5a57da4334
commit fdc59927cb
4 changed files with 239 additions and 88 deletions

View File

@ -8,6 +8,7 @@ module HBS2.Net.Messaging.Unix
) where
import HBS2.Prelude.Plated
import HBS2.System.Dir
import HBS2.Net.Proto.Types
import HBS2.Actors.Peer.Types
import HBS2.Net.Messaging
@ -23,6 +24,7 @@ import Data.ByteString.Lazy qualified as LBS
import Data.Hashable
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap)
import Data.Maybe
import Network.ByteOrder hiding (ByteString)
import Network.Socket
import Network.Socket.ByteString hiding (sendTo)
@ -33,6 +35,7 @@ import Data.Set qualified as Set
import Lens.Micro.Platform
import Control.Monad.Trans.Cont
import UnliftIO
import Control.Concurrent.STM (retry)
data UNIX = UNIX
deriving (Eq,Ord,Show,Generic)
@ -53,10 +56,8 @@ instance Hashable (Peer UNIX) where
{- HLINT ignore "Use newtype instead of data" -}
data MessagingUnixOpts =
MUWatchdog Int
| MUNoFork
MUNoFork
| MUDontRetry
| MUKeepAlive Int
deriving (Eq,Ord,Show,Generic,Data)
-- TODO: counters-to-detect-zombies
@ -83,7 +84,6 @@ data MessagingUnix =
, msgUnixRecv :: TQueue (From UNIX, ByteString)
, msgUnixLast :: TVar TimeSpec
, msgUnixAccepts :: TVar Int
, msgSockets :: TVar (HashMap (Peer UNIX) Socket)
}
makeLenses 'PeerUNIX
@ -115,8 +115,6 @@ newMessagingUnixOpts opts server tsec path = do
<*> liftIO newTQueueIO
<*> liftIO (newTVarIO now)
<*> liftIO (newTVarIO 0)
<*> liftIO (newTVarIO mempty)
data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable)
@ -157,13 +155,20 @@ runMessagingUnix env = do
let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol
let closeSock = liftIO . close
touch (msgUnixSockPath env)
sock <- ContT $ bracket openSock closeSock
_ <- ContT $ bracket (pure forked) $ \clients -> do
readTVarIO clients >>= mapM_ cancel
liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env)
liftIO $ listen sock 5
liftIO $ listen sock 1024
void $ ContT $ withAsync do
pause @'Seconds 5
readTVarIO forked >>= filterM (fmap isNothing . poll)
>>= atomically . writeTVar forked
forever do
(so, _sa) <- liftIO $ accept sock
@ -182,20 +187,23 @@ runMessagingUnix env = do
else
msgUnixSelf env
let writer = liftIO $ async $ pause @'Seconds 0.001 >> forever do
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
let writer = liftIO $ async do
-- FIXME: check!
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
for_ mq $ \q -> do
maybe1 mq none $ \q -> do
msg <- liftIO . atomically $ readTQueue q
forever do
let len = fromIntegral $ LBS.length msg :: Int
let _bs = bytestring32 (fromIntegral len)
msg <- liftIO . atomically $ readTQueue q
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
let len = fromIntegral $ LBS.length msg :: Int
let _bs = bytestring32 (fromIntegral len)
-- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
liftIO $ SL.sendAll so msg
-- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs
liftIO $ sendAll so (LBS.toStrict msg)
void $ ContT $ bracket ( pure so ) closeSock
@ -204,7 +212,7 @@ runMessagingUnix env = do
void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that )
( \_ -> debug $ "Client thread finished" <+> pretty that )
void $ ContT $ bracket writer cancel
void $ ContT $ bracket writer (\x -> pause @'Seconds 0.1 >> cancel x)
fix \next -> do
@ -225,8 +233,10 @@ runMessagingUnix env = do
now <- getTimeCoarse
-- TODO: to-remove-global-watchdog
atomically $ writeTVar (msgUnixLast env) now
atomically $ writeTVar seen now
atomically do
writeTVar (msgUnixLast env) now
writeTVar seen now
next
@ -255,21 +265,45 @@ runMessagingUnix env = do
sock <- ContT $ bracket openSock closeSock
void $ ContT $ bracket (createQueues env who) dropQueuesFor
sockReady <- newTVarIO False
void $ ContT $ bracket (createQueues env who) dropQueuesFor
let attemptConnect = do
result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env)
case result of
Right _ -> none
Right _ -> do
atomically $ writeTVar sockReady True
Left (e :: SomeException) -> do
pause (msgUnixRetryTime env)
warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e
pause @'Seconds 2.5
pause (msgUnixRetryTime env)
attemptConnect
attemptConnect
writer <- ContT $ liftIO . withAsync do
forever do
atomically do
readTVar sockReady `orElse` retry
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
-- У нас один контрагент, имя сокета (файла) == адрес пира.
-- Как в TCP порт сервиса (а отвечает тот с другого порта)
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
maybe1 mq (err "unix: no queue!") $ \q -> do
-- если WD установлен, то просыпаемся, скажем, wd/2 и
-- шлём пустую строку серверу
-- withWD do
-- debug "FUCKING SEND"
msg <- liftIO $ atomically $ readTQueue q
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll sock ( bytestring32 (fromIntegral len) <> LBS.toStrict msg)
reader <- ContT $ liftIO . withAsync do
forever do
let q = msgUnixRecv env
@ -286,57 +320,7 @@ runMessagingUnix env = do
-- сообщения от кого? от **КОГО-ТО**
atomically $ writeTQueue q (From who, frame)
watchdog <- ContT $ liftIO . withAsync do
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
case mwd of
Nothing -> forever (pause @'Seconds 600)
Just n -> forever do
pause (TimeoutSec (realToFrac n))
now <- getTimeCoarse
seen <- readTVarIO tseen
let diff = toNanoSeconds $ TimeoutTS (now - seen)
trace $ "I'm a watchdog!" <+> pretty diff
when ( diff > toNanoSeconds (TimeoutSec $ realToFrac n) ) do
trace "watchdog fired!"
throwIO ReadTimeoutException
writer <- ContT $ liftIO . withAsync do
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
let withWD m = case mwd of
Nothing -> m
Just n -> do
let nwait = max 1 (realToFrac n * 0.7)
e <- race (pause (TimeoutSec nwait)) m
case e of
Right{} -> pure ()
Left{} -> do
liftIO $ sendAll sock $ bytestring32 0
-- liftIO $ SL.sendAll sock ""
forever do
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
-- У нас один контрагент, имя сокета (файла) == адрес пира.
-- Как в TCP порт сервиса (а отвечает тот с другого порта)
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
maybe1 mq none $ \q -> do
-- если WD установлен, то просыпаемся, скажем, wd/2 и
-- шлём пустую строку серверу
withWD do
msg <- liftIO $ atomically $ readTQueue q
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ SL.sendAll sock $ ( LBS.fromStrict (bytestring32 (fromIntegral len)) <> msg)
r <- waitAnyCatchCancel [reader, writer, watchdog]
r <- waitAnyCatchCancel [reader, writer]
case snd r of
Right{} -> pure ()

View File

@ -21,6 +21,7 @@ import Data.Kind
import Data.List qualified as List
import GHC.TypeLits
-- import Lens.Micro.Platform
import UnliftIO
import UnliftIO.Async
import UnliftIO qualified as UIO
import UnliftIO (TVar,TQueue,atomically)
@ -28,7 +29,9 @@ import System.Random (randomIO)
import Data.Word
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Control.Exception (bracket_)
-- import Control.Exception (bracket_)
import Control.Monad.Trans.Cont
import System.IO.Unsafe (unsafePerformIO)
type family Input a :: Type
type family Output a :: Type
@ -117,13 +120,21 @@ makeRequest rnum input = ServiceRequest rnum (serialise (fromIntegral idx :: Int
where
idx = findMethodIndex @method @api
rnumnum :: TVar Word64
rnumnum = unsafePerformIO (newTVarIO 1)
{-# NOINLINE rnumnum #-}
makeRequestR :: forall api method e m . ( KnownNat (FromJust (FindMethodIndex 0 method api))
, Serialise (Input method)
, MonadIO m
)
=> Input method -> m (ServiceProto api e)
makeRequestR input = do
rnum <- liftIO $ randomIO
rnum <- atomically do
n <- readTVar rnumnum
modifyTVar rnumnum succ
pure n
pure $ ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input))
where
idx = findMethodIndex @method @api
@ -172,13 +183,12 @@ runServiceClient :: forall api e m . ( MonadIO m
-> m ()
runServiceClient caller = do
proto <- async $ runProto @e [ makeResponse (makeClient @api caller) ]
link proto
forever do
req <- getRequest caller
request @e (callPeer caller) req
wait proto
flip runContT pure do
p <- ContT $ withAsync (runProto @e [ makeResponse (makeClient @api caller) ])
link p
forever $ lift do
req <- getRequest caller
request @e (callPeer caller) req
data Endpoint e m = forall (api :: [Type]) . ( HasProtocol e (ServiceProto api e)
, HasTimeLimits e (ServiceProto api e) m
@ -276,11 +286,12 @@ callRpcWaitMay :: forall method (api :: [Type]) m e proto t . ( MonadUnliftIO m
-> m (Maybe (Output method))
callRpcWaitMay t caller args = do
race (pause t) (callService @method @api @e @m caller args)
>>= \case
Right (Right x) -> pure (Just x)
_ -> pure Nothing
flip fix 0 $ \next i -> do
race (pause t) (callService @method @api @e @m caller args)
>>= \case
Right (Right x) -> pure (Just x)
-- _ | i < 1 -> next (succ i)
_ -> pure Nothing
makeClient :: forall api e m . ( MonadIO m
, HasProtocol e (ServiceProto api e)

View File

@ -1144,3 +1144,21 @@ executable test-walk-merkletree-cornercase
, serialise
, streaming
, text
executable test-unix-messaging
import: shared-properties
import: common-deps
default-language: Haskell2010
ghc-options:
hs-source-dirs: test
main-is: CheckUnixMessaging.hs
build-depends:
base, hbs2-core
, network
, string-conversions
, text
, time

View File

@ -0,0 +1,138 @@
{-# LANGUAGE ImportQualifiedPost #-}
-- May develop an run it with command:
-- ```
-- nix develop -c ghcid -c "cabal repl" hbs2-tests:test-unix-messaging -r=Main.main
-- ```
module Main where
import Codec.Serialise
import Control.Monad
import Control.Monad.Fix
import Control.Monad.Reader hiding (reader)
import Control.Monad.Trans.Cont
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy (ByteString)
import Data.Hashable
import Data.Set (Set)
import Data.Set qualified as Set
import Data.String.Conversions (cs)
import Data.Text (Text)
import Data.Text.Encoding qualified as TE
import Data.Time
import Lens.Micro.Platform
import Network.Socket
import Network.Socket.ByteString hiding (sendTo)
import Network.Socket.ByteString.Lazy qualified as SL
import UnliftIO
import UnliftIO.Async
import UnliftIO.Concurrent
import HBS2.OrDie
import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Data.Types.Refs
import HBS2.Merkle
import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto
import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Service hiding (decode, encode)
import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple.ANSI
import HBS2.Storage
soname = "/tmp/hbs2-dev.sock"
data EchoH
type DevAPI = '[EchoH]
instance HasProtocol UNIX (ServiceProto DevAPI UNIX) where
type ProtocolId (ServiceProto DevAPI UNIX) = 0xDE50000
type Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
type instance Input EchoH = Text
type instance Output EchoH = Either Text Text
sayt :: (MonadIO m) => Text -> m ()
sayt = liftIO . BS8.putStrLn . TE.encodeUtf8
instance (MonadIO m) => HandleMethod m EchoH where
handleMethod msg = do
now <- liftIO getCurrentTime
-- threadDelay (10 ^ 5)
let resp = (cs . show) now <> " " <> msg
-- sayt $ "Got request: " <> resp
pure $ Right $ resp
instance
(MonadUnliftIO m)
=> HasDeferred (ServiceProto DevAPI UNIX) UNIX m
where
deferred m = void (async m)
withServer :: (() -> IO r) -> IO r
withServer = runContT do
server <- newMessagingUnixOpts [] True 0.10 soname
(link <=< ContT . withAsync) do
runMessagingUnix server
(link <=< ContT . withAsync) do
flip runReaderT server do
runProto @UNIX
[ makeResponse (makeServer @DevAPI)
]
withClient :: (ServiceCaller DevAPI UNIX -> IO r) -> IO r
withClient = runContT do
client <- newMessagingUnixOpts [] False 0.15 soname
(link <=< ContT . withAsync) do
runMessagingUnix client
caller <- makeServiceCaller @DevAPI @UNIX (fromString soname)
(link <=< ContT . withAsync) do
liftIO $ runReaderT (runServiceClient @DevAPI @UNIX caller) client
pure caller
main :: IO ()
main = do
setLogging @ERROR $ toStderr . logPrefix "[error] "
setLogging @WARN $ toStderr . logPrefix "[warn] "
setLogging @NOTICE $ toStdout . logPrefix ""
setLogging @DEBUG $ toStderr . logPrefix "[debug] "
totfuck <- newTVarIO 0
flip runContT pure do
void $ ContT withServer
-- pause @'Seconds 1
s <- replicateM 16 $ lift $ async do
void $ flip runContT pure do
caller <- ContT withClient
tsucc <- newTVarIO 0
tfail <- newTVarIO 0
for_ [1..1000] $ \i -> do
lift (callRpcWaitMay @EchoH (TimeoutSec 2) caller ((cs . show) i))
>>= \case
Just (Right _) -> atomically $ modifyTVar tsucc succ
e -> atomically (modifyTVar tfail succ) >> err (viaShow e)
ok <- readTVarIO tsucc
fuck <- readTVarIO tfail
atomically $ modifyTVar totfuck (+fuck)
notice $ "Finished:" <+> "succeed" <+> pretty ok <+> "failed" <+> pretty fuck
mapM_ wait s
tf <- readTVarIO totfuck
notice $ "total errors" <+> pretty tf
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
setLoggingOff @DEBUG