mirror of https://github.com/voidlizard/hbs2
fixed Unix.hs
This commit is contained in:
parent
632d19a2a3
commit
2abfbb0fb4
|
@ -8,6 +8,7 @@ module HBS2.Net.Messaging.Unix
|
|||
) where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.System.Dir
|
||||
import HBS2.Net.Proto.Types
|
||||
import HBS2.Actors.Peer.Types
|
||||
import HBS2.Net.Messaging
|
||||
|
@ -23,6 +24,7 @@ import Data.ByteString.Lazy qualified as LBS
|
|||
import Data.Hashable
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.Maybe
|
||||
import Network.ByteOrder hiding (ByteString)
|
||||
import Network.Socket
|
||||
import Network.Socket.ByteString hiding (sendTo)
|
||||
|
@ -33,6 +35,7 @@ import Data.Set qualified as Set
|
|||
import Lens.Micro.Platform
|
||||
import Control.Monad.Trans.Cont
|
||||
import UnliftIO
|
||||
import Control.Concurrent.STM (retry)
|
||||
|
||||
data UNIX = UNIX
|
||||
deriving (Eq,Ord,Show,Generic)
|
||||
|
@ -53,10 +56,8 @@ instance Hashable (Peer UNIX) where
|
|||
|
||||
{- HLINT ignore "Use newtype instead of data" -}
|
||||
data MessagingUnixOpts =
|
||||
MUWatchdog Int
|
||||
| MUNoFork
|
||||
MUNoFork
|
||||
| MUDontRetry
|
||||
| MUKeepAlive Int
|
||||
deriving (Eq,Ord,Show,Generic,Data)
|
||||
|
||||
-- TODO: counters-to-detect-zombies
|
||||
|
@ -83,7 +84,6 @@ data MessagingUnix =
|
|||
, msgUnixRecv :: TQueue (From UNIX, ByteString)
|
||||
, msgUnixLast :: TVar TimeSpec
|
||||
, msgUnixAccepts :: TVar Int
|
||||
, msgSockets :: TVar (HashMap (Peer UNIX) Socket)
|
||||
}
|
||||
|
||||
makeLenses 'PeerUNIX
|
||||
|
@ -115,8 +115,6 @@ newMessagingUnixOpts opts server tsec path = do
|
|||
<*> liftIO newTQueueIO
|
||||
<*> liftIO (newTVarIO now)
|
||||
<*> liftIO (newTVarIO 0)
|
||||
<*> liftIO (newTVarIO mempty)
|
||||
|
||||
|
||||
data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable)
|
||||
|
||||
|
@ -157,13 +155,20 @@ runMessagingUnix env = do
|
|||
let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol
|
||||
let closeSock = liftIO . close
|
||||
|
||||
touch (msgUnixSockPath env)
|
||||
|
||||
sock <- ContT $ bracket openSock closeSock
|
||||
|
||||
_ <- ContT $ bracket (pure forked) $ \clients -> do
|
||||
readTVarIO clients >>= mapM_ cancel
|
||||
|
||||
liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env)
|
||||
liftIO $ listen sock 5
|
||||
liftIO $ listen sock 1024
|
||||
|
||||
void $ ContT $ withAsync do
|
||||
pause @'Seconds 5
|
||||
readTVarIO forked >>= filterM (fmap isNothing . poll)
|
||||
>>= atomically . writeTVar forked
|
||||
|
||||
forever do
|
||||
(so, _sa) <- liftIO $ accept sock
|
||||
|
@ -182,20 +187,23 @@ runMessagingUnix env = do
|
|||
else
|
||||
msgUnixSelf env
|
||||
|
||||
let writer = liftIO $ async $ pause @'Seconds 0.001 >> forever do
|
||||
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
|
||||
let writer = liftIO $ async do
|
||||
-- FIXME: check!
|
||||
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
|
||||
for_ mq $ \q -> do
|
||||
|
||||
maybe1 mq none $ \q -> do
|
||||
msg <- liftIO . atomically $ readTQueue q
|
||||
forever do
|
||||
|
||||
let len = fromIntegral $ LBS.length msg :: Int
|
||||
let _bs = bytestring32 (fromIntegral len)
|
||||
msg <- liftIO . atomically $ readTQueue q
|
||||
|
||||
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
|
||||
let len = fromIntegral $ LBS.length msg :: Int
|
||||
let _bs = bytestring32 (fromIntegral len)
|
||||
|
||||
-- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs
|
||||
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
|
||||
|
||||
liftIO $ SL.sendAll so msg
|
||||
-- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs
|
||||
|
||||
liftIO $ sendAll so (LBS.toStrict msg)
|
||||
|
||||
void $ ContT $ bracket ( pure so ) closeSock
|
||||
|
||||
|
@ -204,7 +212,7 @@ runMessagingUnix env = do
|
|||
void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that )
|
||||
( \_ -> debug $ "Client thread finished" <+> pretty that )
|
||||
|
||||
void $ ContT $ bracket writer cancel
|
||||
void $ ContT $ bracket writer (\x -> pause @'Seconds 0.1 >> cancel x)
|
||||
|
||||
fix \next -> do
|
||||
|
||||
|
@ -225,8 +233,10 @@ runMessagingUnix env = do
|
|||
|
||||
now <- getTimeCoarse
|
||||
-- TODO: to-remove-global-watchdog
|
||||
atomically $ writeTVar (msgUnixLast env) now
|
||||
atomically $ writeTVar seen now
|
||||
atomically do
|
||||
writeTVar (msgUnixLast env) now
|
||||
writeTVar seen now
|
||||
|
||||
next
|
||||
|
||||
|
||||
|
@ -255,21 +265,45 @@ runMessagingUnix env = do
|
|||
|
||||
sock <- ContT $ bracket openSock closeSock
|
||||
|
||||
void $ ContT $ bracket (createQueues env who) dropQueuesFor
|
||||
sockReady <- newTVarIO False
|
||||
|
||||
void $ ContT $ bracket (createQueues env who) dropQueuesFor
|
||||
|
||||
let attemptConnect = do
|
||||
result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env)
|
||||
case result of
|
||||
Right _ -> none
|
||||
Right _ -> do
|
||||
atomically $ writeTVar sockReady True
|
||||
|
||||
Left (e :: SomeException) -> do
|
||||
pause (msgUnixRetryTime env)
|
||||
warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e
|
||||
pause @'Seconds 2.5
|
||||
pause (msgUnixRetryTime env)
|
||||
attemptConnect
|
||||
|
||||
attemptConnect
|
||||
|
||||
writer <- ContT $ liftIO . withAsync do
|
||||
|
||||
forever do
|
||||
|
||||
atomically do
|
||||
readTVar sockReady `orElse` retry
|
||||
|
||||
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
|
||||
-- У нас один контрагент, имя сокета (файла) == адрес пира.
|
||||
-- Как в TCP порт сервиса (а отвечает тот с другого порта)
|
||||
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
|
||||
|
||||
maybe1 mq (err "unix: no queue!") $ \q -> do
|
||||
-- если WD установлен, то просыпаемся, скажем, wd/2 и
|
||||
-- шлём пустую строку серверу
|
||||
-- withWD do
|
||||
-- debug "FUCKING SEND"
|
||||
msg <- liftIO $ atomically $ readTQueue q
|
||||
let len = fromIntegral $ LBS.length msg :: Int
|
||||
liftIO $ sendAll sock ( bytestring32 (fromIntegral len) <> LBS.toStrict msg)
|
||||
|
||||
|
||||
reader <- ContT $ liftIO . withAsync do
|
||||
forever do
|
||||
let q = msgUnixRecv env
|
||||
|
@ -286,57 +320,7 @@ runMessagingUnix env = do
|
|||
-- сообщения от кого? от **КОГО-ТО**
|
||||
atomically $ writeTQueue q (From who, frame)
|
||||
|
||||
watchdog <- ContT $ liftIO . withAsync do
|
||||
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
|
||||
case mwd of
|
||||
Nothing -> forever (pause @'Seconds 600)
|
||||
Just n -> forever do
|
||||
|
||||
pause (TimeoutSec (realToFrac n))
|
||||
|
||||
now <- getTimeCoarse
|
||||
seen <- readTVarIO tseen
|
||||
|
||||
let diff = toNanoSeconds $ TimeoutTS (now - seen)
|
||||
|
||||
trace $ "I'm a watchdog!" <+> pretty diff
|
||||
|
||||
when ( diff > toNanoSeconds (TimeoutSec $ realToFrac n) ) do
|
||||
trace "watchdog fired!"
|
||||
throwIO ReadTimeoutException
|
||||
|
||||
|
||||
writer <- ContT $ liftIO . withAsync do
|
||||
|
||||
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
|
||||
|
||||
let withWD m = case mwd of
|
||||
Nothing -> m
|
||||
Just n -> do
|
||||
let nwait = max 1 (realToFrac n * 0.7)
|
||||
e <- race (pause (TimeoutSec nwait)) m
|
||||
case e of
|
||||
Right{} -> pure ()
|
||||
Left{} -> do
|
||||
liftIO $ sendAll sock $ bytestring32 0
|
||||
-- liftIO $ SL.sendAll sock ""
|
||||
|
||||
forever do
|
||||
|
||||
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
|
||||
-- У нас один контрагент, имя сокета (файла) == адрес пира.
|
||||
-- Как в TCP порт сервиса (а отвечает тот с другого порта)
|
||||
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
|
||||
|
||||
maybe1 mq none $ \q -> do
|
||||
-- если WD установлен, то просыпаемся, скажем, wd/2 и
|
||||
-- шлём пустую строку серверу
|
||||
withWD do
|
||||
msg <- liftIO $ atomically $ readTQueue q
|
||||
let len = fromIntegral $ LBS.length msg :: Int
|
||||
liftIO $ SL.sendAll sock $ ( LBS.fromStrict (bytestring32 (fromIntegral len)) <> msg)
|
||||
|
||||
r <- waitAnyCatchCancel [reader, writer, watchdog]
|
||||
r <- waitAnyCatchCancel [reader, writer]
|
||||
|
||||
case snd r of
|
||||
Right{} -> pure ()
|
||||
|
|
|
@ -21,6 +21,7 @@ import Data.Kind
|
|||
import Data.List qualified as List
|
||||
import GHC.TypeLits
|
||||
-- import Lens.Micro.Platform
|
||||
import UnliftIO
|
||||
import UnliftIO.Async
|
||||
import UnliftIO qualified as UIO
|
||||
import UnliftIO (TVar,TQueue,atomically)
|
||||
|
@ -28,7 +29,9 @@ import System.Random (randomIO)
|
|||
import Data.Word
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HashMap
|
||||
import Control.Exception (bracket_)
|
||||
-- import Control.Exception (bracket_)
|
||||
import Control.Monad.Trans.Cont
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
type family Input a :: Type
|
||||
type family Output a :: Type
|
||||
|
@ -117,13 +120,21 @@ makeRequest rnum input = ServiceRequest rnum (serialise (fromIntegral idx :: Int
|
|||
where
|
||||
idx = findMethodIndex @method @api
|
||||
|
||||
rnumnum :: TVar Word64
|
||||
rnumnum = unsafePerformIO (newTVarIO 1)
|
||||
{-# NOINLINE rnumnum #-}
|
||||
|
||||
makeRequestR :: forall api method e m . ( KnownNat (FromJust (FindMethodIndex 0 method api))
|
||||
, Serialise (Input method)
|
||||
, MonadIO m
|
||||
)
|
||||
=> Input method -> m (ServiceProto api e)
|
||||
makeRequestR input = do
|
||||
rnum <- liftIO $ randomIO
|
||||
rnum <- atomically do
|
||||
n <- readTVar rnumnum
|
||||
modifyTVar rnumnum succ
|
||||
pure n
|
||||
|
||||
pure $ ServiceRequest rnum (serialise (fromIntegral idx :: Int, serialise input))
|
||||
where
|
||||
idx = findMethodIndex @method @api
|
||||
|
@ -172,13 +183,12 @@ runServiceClient :: forall api e m . ( MonadIO m
|
|||
-> m ()
|
||||
|
||||
runServiceClient caller = do
|
||||
proto <- async $ runProto @e [ makeResponse (makeClient @api caller) ]
|
||||
link proto
|
||||
forever do
|
||||
req <- getRequest caller
|
||||
request @e (callPeer caller) req
|
||||
|
||||
wait proto
|
||||
flip runContT pure do
|
||||
p <- ContT $ withAsync (runProto @e [ makeResponse (makeClient @api caller) ])
|
||||
link p
|
||||
forever $ lift do
|
||||
req <- getRequest caller
|
||||
request @e (callPeer caller) req
|
||||
|
||||
data Endpoint e m = forall (api :: [Type]) . ( HasProtocol e (ServiceProto api e)
|
||||
, HasTimeLimits e (ServiceProto api e) m
|
||||
|
@ -276,11 +286,12 @@ callRpcWaitMay :: forall method (api :: [Type]) m e proto t . ( MonadUnliftIO m
|
|||
-> m (Maybe (Output method))
|
||||
|
||||
callRpcWaitMay t caller args = do
|
||||
race (pause t) (callService @method @api @e @m caller args)
|
||||
>>= \case
|
||||
Right (Right x) -> pure (Just x)
|
||||
_ -> pure Nothing
|
||||
|
||||
flip fix 0 $ \next i -> do
|
||||
race (pause t) (callService @method @api @e @m caller args)
|
||||
>>= \case
|
||||
Right (Right x) -> pure (Just x)
|
||||
-- _ | i < 1 -> next (succ i)
|
||||
_ -> pure Nothing
|
||||
|
||||
makeClient :: forall api e m . ( MonadIO m
|
||||
, HasProtocol e (ServiceProto api e)
|
||||
|
|
|
@ -1144,3 +1144,19 @@ test-suite test-walk-merkle-conditional
|
|||
hs-source-dirs: test
|
||||
main-is: TestWalkMerkleConditional.hs
|
||||
build-depends: hbs2-tests
|
||||
|
||||
|
||||
executable test-unix-messaging
|
||||
import: shared-properties
|
||||
import: common-deps
|
||||
default-language: Haskell2010
|
||||
ghc-options:
|
||||
hs-source-dirs: test
|
||||
main-is: CheckUnixMessaging.hs
|
||||
build-depends:
|
||||
base, hbs2-core
|
||||
, network
|
||||
, string-conversions
|
||||
, text
|
||||
, time
|
||||
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
{-# LANGUAGE ImportQualifiedPost #-}
|
||||
|
||||
-- May develop an run it with command:
|
||||
-- ```
|
||||
-- nix develop -c ghcid -c "cabal repl" hbs2-tests:test-unix-messaging -r=Main.main
|
||||
-- ```
|
||||
|
||||
module Main where
|
||||
|
||||
import Codec.Serialise
|
||||
import Control.Monad
|
||||
import Control.Monad.Fix
|
||||
import Control.Monad.Reader hiding (reader)
|
||||
import Control.Monad.Trans.Cont
|
||||
import Data.ByteString.Char8 qualified as BS8
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
import Data.Hashable
|
||||
import Data.Set (Set)
|
||||
import Data.Set qualified as Set
|
||||
import Data.String.Conversions (cs)
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding qualified as TE
|
||||
import Data.Time
|
||||
import Lens.Micro.Platform
|
||||
import Network.Socket
|
||||
import Network.Socket.ByteString hiding (sendTo)
|
||||
import Network.Socket.ByteString.Lazy qualified as SL
|
||||
import UnliftIO
|
||||
import UnliftIO.Async
|
||||
import UnliftIO.Concurrent
|
||||
|
||||
import HBS2.OrDie
|
||||
import HBS2.Actors.Peer
|
||||
import HBS2.Base58
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Merkle
|
||||
import HBS2.Net.Messaging.Unix
|
||||
import HBS2.Net.Proto
|
||||
import HBS2.Net.Proto.Service
|
||||
import HBS2.Net.Proto.Service hiding (decode, encode)
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
import HBS2.Storage
|
||||
|
||||
soname = "/tmp/hbs2-dev.sock"
|
||||
|
||||
data EchoH
|
||||
|
||||
type DevAPI = '[EchoH]
|
||||
|
||||
instance HasProtocol UNIX (ServiceProto DevAPI UNIX) where
|
||||
type ProtocolId (ServiceProto DevAPI UNIX) = 0xDE50000
|
||||
type Encoded UNIX = ByteString
|
||||
decode = either (const Nothing) Just . deserialiseOrFail
|
||||
encode = serialise
|
||||
|
||||
type instance Input EchoH = Text
|
||||
type instance Output EchoH = Either Text Text
|
||||
|
||||
sayt :: (MonadIO m) => Text -> m ()
|
||||
sayt = liftIO . BS8.putStrLn . TE.encodeUtf8
|
||||
|
||||
instance (MonadIO m) => HandleMethod m EchoH where
|
||||
handleMethod msg = do
|
||||
now <- liftIO getCurrentTime
|
||||
-- threadDelay (10 ^ 5)
|
||||
let resp = (cs . show) now <> " " <> msg
|
||||
-- sayt $ "Got request: " <> resp
|
||||
pure $ Right $ resp
|
||||
|
||||
instance
|
||||
(MonadUnliftIO m)
|
||||
=> HasDeferred (ServiceProto DevAPI UNIX) UNIX m
|
||||
where
|
||||
deferred m = void (async m)
|
||||
|
||||
withServer :: (() -> IO r) -> IO r
|
||||
withServer = runContT do
|
||||
server <- newMessagingUnixOpts [] True 0.10 soname
|
||||
(link <=< ContT . withAsync) do
|
||||
runMessagingUnix server
|
||||
(link <=< ContT . withAsync) do
|
||||
flip runReaderT server do
|
||||
runProto @UNIX
|
||||
[ makeResponse (makeServer @DevAPI)
|
||||
]
|
||||
|
||||
withClient :: (ServiceCaller DevAPI UNIX -> IO r) -> IO r
|
||||
withClient = runContT do
|
||||
client <- newMessagingUnixOpts [] False 0.15 soname
|
||||
(link <=< ContT . withAsync) do
|
||||
runMessagingUnix client
|
||||
caller <- makeServiceCaller @DevAPI @UNIX (fromString soname)
|
||||
(link <=< ContT . withAsync) do
|
||||
liftIO $ runReaderT (runServiceClient @DevAPI @UNIX caller) client
|
||||
pure caller
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
||||
setLogging @ERROR $ toStderr . logPrefix "[error] "
|
||||
setLogging @WARN $ toStderr . logPrefix "[warn] "
|
||||
setLogging @NOTICE $ toStdout . logPrefix ""
|
||||
setLogging @DEBUG $ toStderr . logPrefix "[debug] "
|
||||
|
||||
totfuck <- newTVarIO 0
|
||||
|
||||
flip runContT pure do
|
||||
void $ ContT withServer
|
||||
-- pause @'Seconds 1
|
||||
s <- replicateM 16 $ lift $ async do
|
||||
void $ flip runContT pure do
|
||||
caller <- ContT withClient
|
||||
tsucc <- newTVarIO 0
|
||||
tfail <- newTVarIO 0
|
||||
for_ [1..1000] $ \i -> do
|
||||
lift (callRpcWaitMay @EchoH (TimeoutSec 2) caller ((cs . show) i))
|
||||
>>= \case
|
||||
Just (Right _) -> atomically $ modifyTVar tsucc succ
|
||||
e -> atomically (modifyTVar tfail succ) >> err (viaShow e)
|
||||
|
||||
ok <- readTVarIO tsucc
|
||||
fuck <- readTVarIO tfail
|
||||
atomically $ modifyTVar totfuck (+fuck)
|
||||
notice $ "Finished:" <+> "succeed" <+> pretty ok <+> "failed" <+> pretty fuck
|
||||
|
||||
mapM_ wait s
|
||||
|
||||
tf <- readTVarIO totfuck
|
||||
|
||||
notice $ "total errors" <+> pretty tf
|
||||
|
||||
setLoggingOff @ERROR
|
||||
setLoggingOff @WARN
|
||||
setLoggingOff @NOTICE
|
||||
setLoggingOff @DEBUG
|
||||
|
||||
|
Loading…
Reference in New Issue