hbs2/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs

339 lines
10 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-}
module HBS2.Net.Messaging.Unix
( module HBS2.Net.Messaging.Unix
, module HBS2.Net.Messaging
, module HBS2.Net.Proto.Types
) where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Types
import HBS2.Actors.Peer.Types
import HBS2.Net.Messaging
import HBS2.Clock
import HBS2.System.Logger.Simple
import Control.Monad.Trans.Resource
import Control.Monad
import Control.Monad.Reader
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Function
import Data.Functor
import Data.Hashable
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap)
import Network.ByteOrder hiding (ByteString)
import Network.Socket
import Network.Socket.ByteString
import Control.Concurrent.STM.TQueue (flushTQueue)
import Data.Set (Set)
import Data.Set qualified as Set
import Lens.Micro.Platform
import UnliftIO
import Control.Concurrent (myThreadId)
data UNIX = UNIX
deriving (Eq,Ord,Show,Generic)
type PeerUnixAddr = String
instance HasPeer UNIX where
newtype instance Peer UNIX = PeerUNIX { _fromPeerUnix :: PeerUnixAddr}
deriving stock (Eq,Ord,Show,Generic)
deriving newtype (Pretty)
instance IsString (Peer UNIX) where
fromString = PeerUNIX
instance Hashable (Peer UNIX) where
hashWithSalt salt (PeerUNIX p) = hashWithSalt salt p
{- HLINT ignore "Use newtype instead of data" -}
data MessagingUnixOpts =
MUWatchdog Int
| MUFork
deriving (Eq,Ord,Show,Generic,Data)
-- FIXME: use-bounded-queues
data MessagingUnix =
MessagingUnix
{ msgUnixSockPath :: FilePath
, msgUnixServer :: Bool
, msgUnixRetryTime :: Timeout 'Seconds
, msgUnixSelf :: Peer UNIX
, msgUnixOpts :: Set MessagingUnixOpts
, msgUnixSendTo :: TVar (HashMap (Peer UNIX) (TQueue ByteString))
, msgUnixRecv :: TQueue (From UNIX, ByteString)
, msgUnixLast :: TVar TimeSpec
, msgUnixAccepts :: TVar Int
, msgSockets :: TVar (HashMap (Peer UNIX) Socket)
}
makeLenses 'PeerUNIX
newMessagingUnix :: MonadIO m
=> Bool
-> Timeout 'Seconds
-> FilePath
-> m MessagingUnix
newMessagingUnix server tsec path = do
newMessagingUnixOpts mempty server tsec path
newMessagingUnixOpts :: (MonadIO m)
=> [MessagingUnixOpts]
-> Bool
-> Timeout 'Seconds
-> FilePath
-> m MessagingUnix
newMessagingUnixOpts opts server tsec path = do
now <- getTimeCoarse
MessagingUnix path
server
tsec
(PeerUNIX path)
(Set.fromList opts)
<$> liftIO (newTVarIO mempty)
<*> liftIO newTQueueIO
<*> liftIO (newTVarIO now)
<*> liftIO (newTVarIO 0)
<*> liftIO (newTVarIO mempty)
data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable)
instance Exception ReadTimeoutException
runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m ()
runMessagingUnix env = do
if msgUnixServer env then
runServer
else
runClient
where
runServer = forever $ handleAny cleanupAndRetry $ runResourceT do
t0 <- getTimeCoarse
atomically $ writeTVar (msgUnixLast env) t0
sock <- liftIO $ socket AF_UNIX Stream defaultProtocol
void $ allocate (pure sock) (`shutdown` ShutdownBoth)
liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env)
liftIO $ listen sock 5
let withSession = void . async . runResourceT
watchdog <- async $ do
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
maybe1 mwd (forever (pause @'Seconds 60)) $ \wd -> do
forever do
pause $ TimeoutSec $ realToFrac $ min (wd `div` 2) 1
now <- getTimeCoarse
seen <- readTVarIO (msgUnixLast env)
acc <- readTVarIO (msgUnixAccepts env)
trace $ "watchdog" <+> pretty now <+> pretty seen <+> pretty acc
let diff = toNanoSeconds $ TimeoutTS (now - seen)
when ( acc > 0 && diff >= toNanoSeconds (TimeoutSec $ realToFrac wd) ) do
throwIO ReadTimeoutException
run <- async $ forever $ runResourceT do
(so, sa) <- liftIO $ accept sock
-- FIXME: fixing-unix-sockets
-- Вот тут: нумеруем клиентов, в PeerAddr ставим
-- строку или номер.
peerNum <- atomically $ do
n <- readTVar (msgUnixAccepts env)
modifyTVar (msgUnixAccepts env) succ
pure n
withSession do
let that = msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum)
void $ allocate ( createQueues env that ) dropQueuesFor
void $ allocate (pure so) close
writer <- async $ forever do
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
maybe1 mq none $ \q -> do
msg <- liftIO . atomically $ readTQueue q
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll so $ bytestring32 (fromIntegral len)
liftIO $ sendAll so $ LBS.toStrict msg
void $ allocate (pure writer) cancel
link writer
fix \next -> do
me <- liftIO myThreadId
let mq = Just (msgUnixRecv env)
frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral
frame <- liftIO $ recv so frameLen
let s = if msgUnixServer env then "S-" else "C-"
maybe1 mq none $ \q -> do
atomically $ writeTQueue q (From that, LBS.fromStrict frame)
now <- getTimeCoarse
atomically $ writeTVar (msgUnixLast env) now
next
(_, r) <- waitAnyCatchCancel [run, watchdog]
case r of
Left e -> throwIO e
Right{} -> pure ()
runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do
let sa = SockAddrUnix (msgUnixSockPath env)
let p = msgUnixSockPath env
let who = PeerUNIX p
createQueues env who
sock <- liftIO $ socket AF_UNIX Stream defaultProtocol
void $ allocate (pure sock) close
let attemptConnect = do
result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env)
case result of
Right _ -> return ()
Left (e :: SomeException) -> do
pause (msgUnixRetryTime env)
warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e
attemptConnect
attemptConnect
-- TODO: create-queues!
reader <- async $ do
forever do
let q = msgUnixRecv env
-- Read response from server
frameLen <- liftIO $ recv sock 4 <&> word32 <&> fromIntegral
frame <- liftIO $ recv sock frameLen
-- сообщения кому? **МНЕ**
-- сообщения от кого? от **КОГО-ТО**
atomically $ writeTQueue q (From who, LBS.fromStrict frame)
forever do
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
-- У нас один контрагент, имя сокета (файла) == адрес пира.
-- Как в TCP порт сервиса (а отвечает тот с другого порта)
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
maybe1 mq none $ \q -> do
msg <- liftIO . atomically $ readTQueue q
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll sock $ bytestring32 (fromIntegral len)
liftIO $ sendAll sock $ LBS.toStrict msg
void $ waitAnyCatchCancel [reader]
cleanupAndRetry e = liftIO do
warn $ "MessagingUnix. client seems gone. restaring server" <+> pretty (msgUnixSelf env)
err (viaShow e)
atomically $ writeTVar (msgUnixAccepts env) 0
liftIO $ atomically $ void $ flushTQueue (msgUnixRecv env)
dropQueues
pause (msgUnixRetryTime env)
logAndRetry :: SomeException -> IO ()
logAndRetry e = do
warn $ "MessagingUnix. runClient failed, probably server is gone. Retrying:" <+> pretty (msgUnixSelf env)
err (viaShow e)
dropQueues
pause (msgUnixRetryTime env)
dropQueues :: MonadIO m => m ()
dropQueues = do
-- liftIO $ atomically $ modifyTVar (msgUnixRecvFrom env) mempty
liftIO $ atomically $ modifyTVar (msgUnixSendTo env) mempty
-- мы не дропаем обратную очередь (принятые сообщения), потому,
-- что нет смысла. она живёт столько, сколько живёт клиент
-- очередь отправки мы удаляем, потому, что этого клиента
-- мы больше никогда не увидим, ведь они разделяются на уровне
-- сокетов и больше никак.
dropQueuesFor :: MonadIO m => Peer UNIX -> m ()
dropQueuesFor who = liftIO do
atomically do
modifyTVar (msgUnixSendTo env) (HashMap.delete who)
-- modifyTVar (msgUnixRecvFrom env) (HashMap.delete who)
createQueues :: MonadIO m => MessagingUnix -> Peer UNIX -> m (Peer UNIX)
createQueues env who = liftIO do
atomically $ do
sHere <- readTVar (msgUnixSendTo env) <&> HashMap.member who
if sHere then do
pure False
else do
sendToQ <- newTQueue
modifyTVar (msgUnixSendTo env) (HashMap.insert who sendToQ)
pure True
pure who
instance Messaging MessagingUnix UNIX ByteString where
sendTo bus (To who) (From me) msg = liftIO do
createQueues bus who
-- FIXME: handle-no-queue-for-rcpt-situation-1
mq <- atomically $ readTVar (msgUnixSendTo bus) <&> HashMap.lookup who
maybe1 mq none $ \q -> do
atomically $ writeTQueue q msg
receive bus _ = liftIO do
let q = msgUnixRecv bus
atomically $ peekTQueue q >> flushTQueue q
instance (Monad m, Messaging MessagingUnix UNIX (Encoded UNIX)) => HasFabriq UNIX (ReaderT MessagingUnix m) where
getFabriq = asks Fabriq
instance Monad m => HasOwnPeer UNIX (ReaderT MessagingUnix m) where
ownPeer = asks msgUnixSelf