From 35905b94bd8c9ab736eaa36818d1a350cb5502c9 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 3 Nov 2023 16:49:29 +0300 Subject: [PATCH] Notify proto + wiping resource-t in Messaging/Unix --- hbs2-core/hbs2-core.cabal | 4 + hbs2-core/lib/HBS2/Net/Messaging/Unix.hs | 243 +++++++++++++---------- hbs2-core/lib/HBS2/Net/Proto/Notify.hs | 15 +- hbs2-core/lib/HBS2/Net/Proto/RefLog.hs | 3 + hbs2-core/lib/HBS2/Net/Proto/Service.hs | 5 +- hbs2-core/lib/HBS2/ScheduledAction.hs | 79 ++++++++ hbs2-core/test/Main.hs | 6 +- hbs2-core/test/TestScheduled.hs | 79 ++++++++ hbs2-git/reposync/ReposyncMain.hs | 112 ++++++++--- hbs2-peer/app/PeerMain.hs | 38 +++- hbs2-peer/app/RefLog.hs | 3 +- hbs2-peer/hbs2-peer.cabal | 1 + hbs2-peer/lib/HBS2/Peer/Notify.hs | 94 +++++++++ 13 files changed, 525 insertions(+), 157 deletions(-) create mode 100644 hbs2-core/lib/HBS2/ScheduledAction.hs create mode 100644 hbs2-core/test/TestScheduled.hs create mode 100644 hbs2-peer/lib/HBS2/Peer/Notify.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 92b22ccb..30eb11d0 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -79,6 +79,7 @@ library , HBS2.Base58 , HBS2.Clock , HBS2.Crypto + , HBS2.ScheduledAction , HBS2.Data.Detect , HBS2.Data.Types , HBS2.Data.Types.Crypto @@ -161,6 +162,7 @@ library , fast-logger , filelock , filepath + , exceptions , generic-lens , hashable , interpolatedstring-perl6 @@ -180,6 +182,7 @@ library , random-shuffle , resourcet , safe + , safe-exceptions , saltine ^>=0.2.0.1 , serialise , sockaddr @@ -216,6 +219,7 @@ test-suite test , FakeMessaging , HasProtocol , DialogSpec + , TestScheduled -- other-extensions: diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index c83e0a09..e4efca5e 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -15,25 +15,22 @@ import HBS2.Clock import HBS2.System.Logger.Simple -import Control.Monad.Trans.Resource import Control.Monad -import Control.Monad.Reader -import Data.ByteString qualified as BS +import Control.Monad.Reader hiding (reader) import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS -import Data.Function -import Data.Functor import Data.Hashable import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict (HashMap) import Network.ByteOrder hiding (ByteString) import Network.Socket -import Network.Socket.ByteString +import Network.Socket.ByteString hiding (sendTo) import Network.Socket.ByteString.Lazy qualified as SL import Control.Concurrent.STM.TQueue (flushTQueue) import Data.Set (Set) import Data.Set qualified as Set import Lens.Micro.Platform +import Control.Monad.Trans.Cont import UnliftIO import Streaming.Prelude qualified as S @@ -61,6 +58,8 @@ instance Hashable (Peer UNIX) where data MessagingUnixOpts = MUWatchdog Int | MUNoFork + | MUDontRetry + | MUKeepAlive Int deriving (Eq,Ord,Show,Generic,Data) -- FIXME: use-bounded-queues @@ -114,177 +113,201 @@ data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable) instance Exception ReadTimeoutException +data UnixMessagingStopped = UnixMessagingStopped deriving (Show,Typeable) + +instance Exception UnixMessagingStopped runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m () runMessagingUnix env = do if msgUnixServer env then - runServer + liftIO runServer else runClient where - runServer = forever $ handleAny cleanupAndRetry $ runResourceT do + runServer = forever $ handleAny cleanupAndRetry $ flip runContT pure $ do t0 <- getTimeCoarse atomically $ writeTVar (msgUnixLast env) t0 - sock <- liftIO $ socket AF_UNIX Stream defaultProtocol + forked <- newTVarIO (mempty :: [Async ()]) - void $ allocate (pure sock) (`shutdown` ShutdownBoth) + let fork w = do + l <- async w + atomically $ modifyTVar forked (l :) + + let doFork = not $ Set.member MUNoFork (msgUnixOpts env) + + let withSession | doFork = void . liftIO . fork + | otherwise = void . liftIO + + -- watchdog <- liftIO $ async runWatchDog + + let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol + let closeSock = liftIO . close + + sock <- ContT $ bracket openSock closeSock + + _ <- ContT $ bracket (pure forked) $ \clients -> do + readTVarIO clients >>= mapM_ cancel liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) liftIO $ listen sock 5 - -- let withSession = void . async . runResourceT + forever do + (so, sa) <- liftIO $ accept sock - let doFork = not $ Set.member MUNoFork (msgUnixOpts env) + peerNum <- atomically $ do + n <- readTVar (msgUnixAccepts env) + modifyTVar (msgUnixAccepts env) succ + pure n - let withSession | doFork = void . async . runResourceT - | otherwise = void . runResourceT + withSession $ flip runContT void do - watchdog <- async $ do + seen <- getTimeCoarse >>= newTVarIO - let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] + let that = if doFork then + msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum) + else + msgUnixSelf env - maybe1 mwd (forever (pause @'Seconds 60)) $ \wd -> do + let writer = liftIO $ async $ forever do + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that - forever do - - pause $ TimeoutSec $ realToFrac $ min (wd `div` 2) 1 - - now <- getTimeCoarse - seen <- readTVarIO (msgUnixLast env) - acc <- readTVarIO (msgUnixAccepts env) - - trace $ "watchdog" <+> pretty now <+> pretty seen <+> pretty acc - - let diff = toNanoSeconds $ TimeoutTS (now - seen) - - when ( acc > 0 && diff >= toNanoSeconds (TimeoutSec $ realToFrac wd) ) do - throwIO ReadTimeoutException - - run <- async $ forever $ runResourceT do - (so, sa) <- liftIO $ accept sock - - -- FIXME: fixing-unix-sockets - -- Вот тут: нумеруем клиентов, в PeerAddr ставим - -- строку или номер. - - peerNum <- atomically $ do - n <- readTVar (msgUnixAccepts env) - modifyTVar (msgUnixAccepts env) succ - pure n - - withSession do - - let that = if doFork then - msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum) - else - msgUnixSelf env - - void $ allocate ( createQueues env that ) dropQueuesFor - - void $ allocate (pure so) close - - writer <- async $ forever do - mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that - - maybe1 mq none $ \q -> do - msg <- liftIO . atomically $ readTQueue q + maybe1 mq none $ \q -> do + msg <- liftIO . atomically $ readTQueue q - let len = fromIntegral $ LBS.length msg :: Int - let bs = bytestring32 (fromIntegral len) + let len = fromIntegral $ LBS.length msg :: Int + let bs = bytestring32 (fromIntegral len) - liftIO $ sendAll so $ bytestring32 (fromIntegral len) + liftIO $ sendAll so $ bytestring32 (fromIntegral len) - -- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs + -- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs - liftIO $ SL.sendAll so msg + liftIO $ SL.sendAll so msg - void $ allocate (pure writer) cancel + void $ ContT $ bracket (createQueues env that) dropQueuesFor - link writer + void $ ContT $ bracket writer cancel - fix \next -> do - me <- liftIO myThreadId + void $ ContT $ bracket ( pure so ) closeSock - let mq = Just (msgUnixRecv env) + void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that ) + ( \_ -> debug $ "Client thread finished" <+> pretty that ) - -- frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral - frameLen <- liftIO $ readFromSocket so 4 <&> LBS.toStrict <&> word32 <&> fromIntegral + fix \next -> do - -- debug $ "frameLen" <+> pretty frameLen + let mq = Just (msgUnixRecv env) - frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict + -- frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral + frameLen <- liftIO $ readFromSocket so 4 <&> LBS.toStrict <&> word32 <&> fromIntegral - maybe1 mq none $ \q -> do - atomically $ writeTQueue q (From that, frame) + -- debug $ "frameLen" <+> pretty frameLen - now <- getTimeCoarse - atomically $ writeTVar (msgUnixLast env) now - next + if frameLen == 0 then do + -- answer to empty message + liftIO $ sendAll so $ bytestring32 0 + else do + frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict - (_, r) <- waitAnyCatchCancel [run, watchdog] + maybe1 mq none $ \q -> do + atomically $ writeTQueue q (From that, frame) - case r of - Left e -> throwIO e - Right{} -> pure () + now <- getTimeCoarse + -- TODO: to-remove-global-watchdog + atomically $ writeTVar (msgUnixLast env) now + atomically $ writeTVar seen now + next + handleClient | MUDontRetry `elem` msgUnixOpts env = \_ w -> handleAny throwStopped w + | otherwise = handleAny - runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do + throwStopped _ = throwIO UnixMessagingStopped + + runClient = liftIO $ forever $ handleClient logAndRetry $ flip runContT pure $ do let sa = SockAddrUnix (msgUnixSockPath env) let p = msgUnixSockPath env let who = PeerUNIX p + tseen <- getTimeCoarse >>= newTVarIO - createQueues env who + void $ ContT $ bracket (createQueues env who) dropQueuesFor - sock <- liftIO $ socket AF_UNIX Stream defaultProtocol + let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol + let closeSock = close - void $ allocate (pure sock) close + sock <- ContT $ bracket openSock closeSock let attemptConnect = do result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env) case result of - Right _ -> return () + Right _ -> none Left (e :: SomeException) -> do pause (msgUnixRetryTime env) warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e + pause @'Seconds 2.5 attemptConnect attemptConnect - -- TODO: create-queues! + reader <- ContT $ liftIO . withAsync do + forever do + let q = msgUnixRecv env - reader <- async $ do + -- Read response from server + frameLen <- liftIO $ readFromSocket sock 4 <&> LBS.toStrict <&> word32 <&> fromIntegral + + getTimeCoarse >>= (atomically . writeTVar tseen) + + when (frameLen > 0) do + frame <- liftIO $ readFromSocket sock frameLen + -- сообщения кому? **МНЕ** + -- сообщения от кого? от **КОГО-ТО** + atomically $ writeTQueue q (From who, frame) + + watchdog <- ContT $ liftIO . withAsync do + let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] + case mwd of + Nothing -> forever (pause @'Seconds 600) + Just n -> forever do + + sendTo env (To who) (From who) (mempty :: ByteString) + + now <- getTimeCoarse + seen <- readTVarIO tseen + + let diff = toNanoSeconds $ TimeoutTS (now - seen) + + trace $ "I'm a watchdog!" <+> pretty diff + + when ( diff > toNanoSeconds (TimeoutSec $ realToFrac n) ) do + trace "watchdog fired!" + throwIO ReadTimeoutException + + pause (TimeoutSec (max 1 (realToFrac n / 2))) + + writer <- ContT $ liftIO . withAsync do forever do - let q = msgUnixRecv env - -- Read response from server - frameLen <- liftIO $ readFromSocket sock 4 <&> LBS.toStrict <&> word32 <&> fromIntegral - frame <- liftIO $ readFromSocket sock frameLen + -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. + -- У нас один контрагент, имя сокета (файла) == адрес пира. + -- Как в TCP порт сервиса (а отвечает тот с другого порта) + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who - -- сообщения кому? **МНЕ** - -- сообщения от кого? от **КОГО-ТО** - atomically $ writeTQueue q (From who, frame) + maybe1 mq none $ \q -> do + msg <- liftIO . atomically $ readTQueue q + let len = fromIntegral $ LBS.length msg :: Int + liftIO $ sendAll sock $ bytestring32 (fromIntegral len) + liftIO $ SL.sendAll sock msg - forever do + r <- waitAnyCatchCancel [reader, writer, watchdog] - -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. - -- У нас один контрагент, имя сокета (файла) == адрес пира. - -- Как в TCP порт сервиса (а отвечает тот с другого порта) - mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who - - maybe1 mq none $ \q -> do - msg <- liftIO . atomically $ readTQueue q - let len = fromIntegral $ LBS.length msg :: Int - liftIO $ sendAll sock $ bytestring32 (fromIntegral len) - liftIO $ SL.sendAll sock msg - - void $ waitAnyCatchCancel [reader] + case snd r of + Right{} -> pure () + Left e -> throwIO e cleanupAndRetry e = liftIO do warn $ "MessagingUnix. client seems gone. restaring server" <+> pretty (msgUnixSelf env) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs index 64d92314..ba8e3163 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Notify.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Notify.hs @@ -20,6 +20,7 @@ import Data.Kind import Data.List qualified as List import Data.Word import Control.Concurrent.STM (flushTQueue) +import Data.Maybe import UnliftIO @@ -352,7 +353,7 @@ data SomeCallback ev = data SomeNotifySource ev = SomeNotifySource { handleCount :: TVar NotifyHandle - , listeners :: TVar (HashMap NotifyHandle (SomeCallback ev)) + , listeners :: TVar (HashMap (NotifyKey ev) [(NotifyHandle, SomeCallback ev)]) } newSomeNotifySource :: forall ev m . (MonadIO m, ForNotify ev) @@ -365,18 +366,20 @@ instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where startNotify src key fn = do ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s) - atomically $ modifyTVar (listeners src) (HashMap.insert ha (SomeCallback @ev fn)) + atomically $ modifyTVar (listeners src) (HashMap.insertWith (<>) key [(ha, SomeCallback @ev fn)]) pure ha stopNotify src ha = do - atomically $ modifyTVar (listeners src) (HashMap.delete ha) + atomically do + modifyTVar (listeners src) (HashMap.map (filter ((/= ha) . fst ))) + modifyTVar (listeners src) (HashMap.filter (not . null)) -emitNotify :: forall ev m . MonadIO m +emitNotify :: forall ev m . (ForNotify ev, MonadIO m) => SomeNotifySource ev -> (NotifyKey ev, NotifyData ev) -> m () -emitNotify src (_,d) = do - who <- readTVarIO (listeners src) <&> HashMap.toList +emitNotify src (k,d) = do + who <- readTVarIO (listeners src) <&> HashMap.lookup k <&> fromMaybe mempty for_ who $ \(h, SomeCallback cb) -> cb h d diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs b/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs index 09d96a21..5e1feb70 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefLog.hs @@ -25,6 +25,9 @@ import Type.Reflection (someTypeRep) import Lens.Micro.Platform newtype RefLogKey s = RefLogKey { fromRefLogKey :: PubKey 'Sign s } + deriving stock Generic + +instance Serialise (PubKey 'Sign s) => Serialise (RefLogKey s) deriving stock instance IsRefPubKey s => Eq (RefLogKey s) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Service.hs b/hbs2-core/lib/HBS2/Net/Proto/Service.hs index 1e0b95de..7cfa1a00 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Service.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Service.hs @@ -201,16 +201,15 @@ runServiceClientMulti :: forall e m . ( MonadIO m runServiceClientMulti endpoints = do proto <- async $ runProto @e [ makeResponse @e (makeClient x) | (Endpoint x) <- endpoints ] - link proto waiters <- forM endpoints $ \(Endpoint caller) -> async $ forever do req <- getRequest caller request @e (callPeer caller) req - mapM_ link waiters - void $ UIO.waitAnyCatchCancel $ proto : waiters + r <- UIO.waitAnyCatchCancel $ proto : waiters + either UIO.throwIO (const $ pure ()) (snd r) notifyServiceCaller :: forall api e m . MonadIO m => ServiceCaller api e diff --git a/hbs2-core/lib/HBS2/ScheduledAction.hs b/hbs2-core/lib/HBS2/ScheduledAction.hs new file mode 100644 index 00000000..66e275bb --- /dev/null +++ b/hbs2-core/lib/HBS2/ScheduledAction.hs @@ -0,0 +1,79 @@ +{-# Language TemplateHaskell #-} +module HBS2.ScheduledAction + ( Scheduled + , scheduleRunPeriod + , defScheduled + , runScheduled + , schedule + ) where + +import HBS2.Prelude.Plated +import HBS2.Clock + +import Prelude hiding (all) +import Data.Word +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Lens.Micro.Platform +import Control.Monad +import Data.List qualified as List + +import Control.Exception qualified as E + +import UnliftIO as U + +-- NOTE: scheduled-action +-- держит список действий (IO ()) +-- привязанных к временным "слотам" (секундам) с точностью до +-- секунды. +-- После наступления секунды --- выполняет список действий, +-- привязанных к слоту, удаляет действия, удаляет слот. +-- Полезно, что бы очищать данные, имеющие продолжительность +-- жизни -- всякие там кэши, хэшмапы и так далее. +-- +-- В отличие от Cache, не знает о сути действий ничего, +-- кроме того, что это IO (). +-- +-- Может быть (и должно, наверное) быть глобальным на +-- всё приложение + +type SlotNum = Word64 + +data Scheduled = + Scheduled + { _scheduleRunPeriod :: Timeout 'Seconds + , slots :: TVar (HashMap SlotNum [IO ()]) + } + +makeLenses 'Scheduled + +defScheduled :: MonadUnliftIO m => m Scheduled +defScheduled = Scheduled 10 <$> newTVarIO mempty + +runScheduled :: MonadUnliftIO m => Scheduled -> m () +runScheduled sch = forever do + pause (view scheduleRunPeriod sch) + + now <- getTimeCoarse <&> toNanoSecs <&> (/1e9) . realToFrac <&> round + + expired <- atomically do + all <- readTVar (slots sch) <&> HashMap.toList + let (rest, expired) = List.partition ( (>now) . fst) all + writeTVar (slots sch) (HashMap.fromList rest) + pure expired + + for_ expired $ \(_, all) -> do + for_ all $ \action -> do + -- TODO: error-logging-maybe + liftIO $ void $ action `E.catch` (\(_ :: E.ArithException) -> pure ()) + `E.catch` (\(_ :: E.IOException) -> pure ()) + `E.catch` (\(_ :: E.SomeException) -> pure ()) + +schedule :: forall a m . (MonadUnliftIO m, Integral a) => Scheduled -> a -> IO () -> m () +schedule s ttl what = do + now <- getTimeCoarse <&> toNanoSecs <&> (/1e9) . realToFrac <&> round + let slot = now + fromIntegral ttl + atomically $ modifyTVar (slots s) (HashMap.insertWith (<>) slot [what]) + + + diff --git a/hbs2-core/test/Main.hs b/hbs2-core/test/Main.hs index cb54f859..24365844 100644 --- a/hbs2-core/test/Main.hs +++ b/hbs2-core/test/Main.hs @@ -4,6 +4,7 @@ import TestFakeMessaging import TestActors import DialogSpec import TestFileLogger +import TestScheduled import Test.Tasty import Test.Tasty.HUnit @@ -16,7 +17,10 @@ main = testCase "testFakeMessaging1" testFakeMessaging1 , testCase "testActorsBasic" testActorsBasic , testCase "testFileLogger" testFileLogger - , testDialog + , testCase "testScheduledActions" testScheduled + + -- FIXME does-not-finish + -- , testDialog ] diff --git a/hbs2-core/test/TestScheduled.hs b/hbs2-core/test/TestScheduled.hs new file mode 100644 index 00000000..bbee96e1 --- /dev/null +++ b/hbs2-core/test/TestScheduled.hs @@ -0,0 +1,79 @@ +module TestScheduled where + +import HBS2.Prelude +import HBS2.Clock +import HBS2.ScheduledAction + +import Test.Tasty.HUnit + +import Control.Monad +import Lens.Micro.Platform +import System.IO (hPrint) +import UnliftIO +import Data.List qualified as List + +import Control.Monad.Cont + +testScheduled :: IO () +testScheduled = do + + tres <- newTVarIO mempty + + sch <- defScheduled <&> set scheduleRunPeriod 1.5 + + s <- async $ runScheduled sch + + let addAction = schedule sch + addValue values = atomically $ modifyTVar' tres (values ++) + + addAction 1 (addValue [1, 2, 3]) + addAction 2 (addValue [10, 20, 30]) + addAction 3 (addValue [100, 200, 300]) + addAction 2 do + throwIO $ userError "fail!" + + addAction 2 do + error "fail 2!" + + -- addAction 2 do + -- addValue [1 `div` 0] + + pause @'Seconds 3.5 + + cancel s + + let expected = [100,200,300,10,20,30,1,2,3] & List.sort + results <- readTVarIO tres <&> List.sort + + hPrint stderr results + + assertEqual "all-values-calculated" expected results + + +testAsync :: IO () +testAsync = do + + hSetBuffering stdout LineBuffering + hSetBuffering stderr LineBuffering + + flip runContT pure $ do + + a <- ContT $ withAsync do + forever do + pause @'Seconds 1 + print "1" + + b <- ContT $ withAsync do + forever do + pause @'Seconds 2 + print "2" + + c <- ContT $ withAsync do + pause @'Seconds 5 + print "leaving" + + pause @'Seconds 10 + + liftIO $ print "now what?" + + diff --git a/hbs2-git/reposync/ReposyncMain.hs b/hbs2-git/reposync/ReposyncMain.hs index 1f41a87b..35dc5069 100644 --- a/hbs2-git/reposync/ReposyncMain.hs +++ b/hbs2-git/reposync/ReposyncMain.hs @@ -3,11 +3,16 @@ module Main where import HBS2.Prelude.Plated import HBS2.OrDie +import HBS2.Base58 +import HBS2.Actors.Peer +import HBS2.Actors.Peer.Types +import HBS2.Net.Proto.Notify import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Net.Proto.Types import HBS2.Net.Proto.RefLog import HBS2.Peer.RPC.Client.Unix hiding (Cookie) import HBS2.Peer.RPC.API.RefLog +import HBS2.Peer.Notify import HBS2.Clock -- import HBS2Git.PrettyStuff @@ -21,6 +26,7 @@ import Data.Config.Suckless.KeyValue import Control.Monad.Catch (MonadThrow(..)) import Control.Monad.Except (runExceptT,throwError) +import Control.Monad.Cont import Control.Monad.Reader import Data.ByteString.Lazy.Char8 qualified as LBS import Data.Either @@ -37,6 +43,7 @@ import System.Directory import System.FilePath import System.Process.Typed import Text.InterpolatedString.Perl6 (qc) +import Control.Concurrent.STM (flushTQueue) import UnliftIO import Web.Scotty hiding (header,next) @@ -88,6 +95,7 @@ data ReposyncState = ReposyncState { _rpcSoname :: FilePath , _rpcRefLog :: ServiceCaller RefLogAPI UNIX + , _rpcNotifySink :: NotifySink (RefLogEvents L4Proto) UNIX , _reposyncBaseDir :: FilePath , _reposyncPort :: Int , _reposyncEntries :: TVar [RepoEntry] @@ -119,10 +127,11 @@ reposyncDefaultDir = unsafePerformIO do newState :: MonadUnliftIO m => FilePath -> ServiceCaller RefLogAPI UNIX + -> NotifySink (RefLogEvents L4Proto) UNIX -> m ReposyncState -newState so refLog = - ReposyncState so refLog reposyncDefaultDir 4017 <$> newTVarIO mempty +newState so refLog sink = + ReposyncState so refLog sink reposyncDefaultDir 4017 <$> newTVarIO mempty withConfig :: forall a m . (MonadUnliftIO m) => Maybe FilePath -> ReposyncM m a -> ReposyncM m () withConfig cfg m = do @@ -186,6 +195,7 @@ runSync = do so <- asks (view rpcSoname) refLogRPC <- asks (view rpcRefLog) + sink <- asks (view rpcNotifySink) root <- asks (view reposyncBaseDir) port <- asks (view reposyncPort) <&> fromIntegral @@ -196,40 +206,59 @@ runSync = do get "/" $ do text "This is hbs2-reposync" - r <- forM es $ \entry -> async $ void $ do - let rk = fromRefLogKey $ repoRef entry + r <- forM es $ \entry -> async $ void $ flip runContT pure do + let ref = repoRef entry + let rk = fromRefLogKey ref tv <- newTVarIO Nothing + upd <- newTQueueIO + debug $ "STARTED WITH" <+> pretty (repoPath entry) - initRepo entry + let notif = + liftIO $ async do + debug $ "Subscribed" <+> pretty ref + runNotifySink sink (RefLogNotifyKey ref) $ \(RefLogUpdateNotifyData _ h) -> do + debug $ "Got notification" <+> pretty ref <+> pretty h + atomically $ writeTQueue upd () + + void $ ContT $ bracket notif cancel + + lift $ initRepo entry + + lift $ syncRepo entry fix \next -> do - rr' <- race (pause @'Seconds 1) do + rr' <- liftIO $ race (pause @'Seconds 1) do callService @RpcRefLogGet refLogRPC rk <&> fromRight Nothing + void $ liftIO $ race (pause @'Seconds 60) (atomically (peekTQueue upd)) + pause @'Seconds 5 + liftIO $ atomically $ flushTQueue upd + rr <- either (const $ pause @'Seconds 10 >> warn "rpc call timeout" >> next) pure rr' debug $ "REFLOG VALUE:" <+> pretty rr r0 <- readTVarIO tv - if rr == r0 then do - pause @'Seconds 60 - else do - debug $ "Syncronize repoes!" <+> pretty (repoPath entry) - syncRepo entry >>= \case - Left{} -> pause @'Seconds 60 - Right{} -> do - atomically $ writeTVar tv rr - pause @'Seconds 10 + unless ( rr == r0 ) do + debug $ "Syncronize repo!" <+> pretty (repoPath entry) + fix \again -> do + lift (syncRepo entry) >>= \case + Left{} -> do + debug $ "Failed to update:" <+> pretty (repoPath entry) + pause @'Seconds 1 + again + + Right{} -> do + atomically $ writeTVar tv rr next - mapM_ waitCatch (http : r) - + void $ waitAnyCatchCancel (http : r) data SyncError = SyncError @@ -356,27 +385,46 @@ withApp cfg m = do -- lrpc = - soname <- detectRPC `orDie` "RPC not found" + forever $ handleAny cleanup $ do - client <- race ( pause @'Seconds 1) (newMessagingUnix False 1.0 soname) `orDie` "hbs2-peer rpc timeout!" + soname <- detectRPC `orDie` "RPC not found" - rpc <- makeServiceCaller (fromString soname) + let o = [MUWatchdog 20, MUDontRetry] - messaging <- async $ runMessagingUnix client - link messaging + client <- race ( pause @'Seconds 1) (newMessagingUnixOpts o False 1.0 soname) + `orDie` "hbs2-peer rpc timeout!" - let endpoints = [ Endpoint @UNIX rpc - ] + clientN <- newMessagingUnixOpts o False 1.0 soname - c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + rpc <- makeServiceCaller (fromString soname) - state <- newState soname rpc + messaging <- async $ runMessagingUnix client - r <- async $ void $ runReaderT (unReposyncM $ withConfig cfg m) state + mnotify <- async $ runMessagingUnix clientN - waitAnyCatchCancel [c1, messaging, r] + sink <- newNotifySink - notice "exiting" + wNotify <- liftIO $ async $ flip runReaderT clientN $ do + debug "notify restarted!" + runNotifyWorkerClient sink + + nProto <- liftIO $ async $ flip runReaderT clientN $ do + runProto @UNIX + [ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink) + ] + + let endpoints = [ Endpoint @UNIX rpc + ] + + c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + + state <- newState soname rpc sink + + r <- async $ void $ runReaderT (unReposyncM $ withConfig cfg m) state + + void $ waitAnyCatchCancel [c1, messaging, mnotify, nProto, wNotify, r] + + notice "exiting" setLoggingOff @DEBUG setLoggingOff @INFO @@ -385,6 +433,12 @@ withApp cfg m = do setLoggingOff @NOTICE + where + cleanup e = do + err (viaShow e) + warn "Something bad happened. Retrying..." + pause @'Seconds 2.5 + main :: IO () main = runMe . customExecParser (prefs showHelpOnError) $ info (helper <*> ((,) <$> opts <*> parser)) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 761df59d..849a95d8 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -68,6 +68,7 @@ import HBS2.Peer.RPC.API.Storage import HBS2.Peer.RPC.API.Peer import HBS2.Peer.RPC.API.RefLog import HBS2.Peer.RPC.API.RefChan +import HBS2.Peer.Notify import RPC2(RPC2Context(..)) @@ -547,6 +548,7 @@ instance ( Monad m response = lift . response + respawn :: PeerOpts -> IO () respawn opts = if not (view peerRespawn opts) then do @@ -616,6 +618,8 @@ runPeer opts = U.handle (\e -> myException e setLogging @TRACE1 tracePrefix + refChanNotifySource <- newSomeNotifySource @(RefChanEvents L4Proto) + refLogNotifySource <- newSomeNotifySource @(RefLogEvents L4Proto) let ps = mempty @@ -752,13 +756,21 @@ runPeer opts = U.handle (\e -> myException e rce <- refChanWorkerEnv conf penv denv - let refChanAdapter = RefChanAdapter - { refChanOnHead = refChanOnHeadFn rce - , refChanSubscribed = isPolledRef @e brains - , refChanWriteTran = refChanWriteTranFn rce - , refChanValidatePropose = refChanValidateTranFn @e rce - , refChanNotifyRely = refChanNotifyRelyFn @e rce - } + let refChanAdapter = + RefChanAdapter + { refChanOnHead = refChanOnHeadFn rce + , refChanSubscribed = isPolledRef @e brains + , refChanWriteTran = refChanWriteTranFn rce + , refChanValidatePropose = refChanValidateTranFn @e rce + + , refChanNotifyRely = \r u -> do + debug "refChanNotifyRely MOTHERFUCKER!" + refChanNotifyRelyFn @e rce r u + case u of + Notify rr s -> do + emitNotify refChanNotifySource (RefChanNotifyKey r, RefChanNotifyData rr s) + _ -> pure () + } rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter @@ -796,6 +808,11 @@ runPeer opts = U.handle (\e -> myException e let rwa = RefLog.RefLogWorkerAdapter { RefLog.reflogDownload = doDownload , RefLog.reflogFetch = doFetchRef + , RefLog.reflogUpdated = \(r,v) -> do + debug $ "EMIT REFLOG UPDATE" <+> pretty (AsBase58 r) + emitNotify refLogNotifySource ( RefLogNotifyKey @L4Proto r + , RefLogUpdateNotifyData @L4Proto r (HashRef v) + ) } let addNewRtt (p,rttNew) = withPeerM penv $ void $ runMaybeT do @@ -1080,12 +1097,19 @@ runPeer opts = U.handle (\e -> myException e m1 <- async $ runMessagingUnix rpcmsg rpcProto <- async $ flip runReaderT rpcctx do + env <- newNotifyEnvServer @(RefChanEvents L4Proto) refChanNotifySource + envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource + w1 <- asyncLinked $ runNotifyWorkerServer env + w2 <- asyncLinked $ runNotifyWorkerServer envrl runProto @UNIX [ makeResponse (makeServer @PeerAPI) , makeResponse (makeServer @RefLogAPI) , makeResponse (makeServer @RefChanAPI) , makeResponse (makeServer @StorageAPI) + , makeResponse (makeNotifyServer @(RefChanEvents L4Proto) env) + , makeResponse (makeNotifyServer @(RefLogEvents L4Proto) envrl) ] + mapM_ wait [w1,w2] void $ waitAnyCancel $ w <> [ loop , m1 diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index 7c50c718..0c992403 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -91,6 +91,7 @@ data RefLogWorkerAdapter e = RefLogWorkerAdapter { reflogDownload :: Hash HbSync -> IO () , reflogFetch :: PubKey 'Sign (Encryption e) -> IO () + , reflogUpdated :: (RefLogKey (Encryption e), Hash HbSync) -> IO () } reflogWorker :: forall e s m . ( e ~ L4Proto @@ -252,7 +253,7 @@ reflogWorker conf brains adapter = do pure nref -- TODO: old-root-to-delete - + reflogUpdated adapter (reflogkey, newRoot) trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty (hashObject @HbSync reflogkey) <+> pretty newRoot -- trace "I'm a reflog update worker" diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index ca12790e..234f3fb3 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -135,6 +135,7 @@ library exposed-modules: HBS2.Peer.Brains + HBS2.Peer.Notify HBS2.Peer.RPC.Class HBS2.Peer.RPC.API.Peer HBS2.Peer.RPC.API.RefLog diff --git a/hbs2-peer/lib/HBS2/Peer/Notify.hs b/hbs2-peer/lib/HBS2/Peer/Notify.hs new file mode 100644 index 00000000..6311d83b --- /dev/null +++ b/hbs2-peer/lib/HBS2/Peer/Notify.hs @@ -0,0 +1,94 @@ +{-# Language UndecidableInstances #-} +module HBS2.Peer.Notify + ( RefChanEvents(..) + , RefLogEvents(..) + , newNotifyEnvServer + , runNotifyWorkerServer + , runNotifyWorkerClient + , makeNotifyServer + , makeNotifyClient + , newSomeNotifySource + , newNotifySink + , emitNotify + , NotifyKey(..) + , NotifyData(..) + , HasProtocol(..) + ) where + +import HBS2.Prelude +import HBS2.Base58 +import HBS2.Data.Types.Refs +import HBS2.Actors.Peer.Types +import HBS2.Net.Proto.Types +import HBS2.Net.Proto.Notify +import HBS2.Net.Proto.RefChan +import HBS2.Net.Proto.RefLog +import HBS2.Net.Messaging.Unix (UNIX) +import HBS2.Data.Types.SignedBox +import HBS2.Net.Proto.Definition() + +import Codec.Serialise +import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS + + +data RefChanEvents e = + RefChanOnNotify + +instance HasProtocol UNIX (NotifyProto (RefChanEvents L4Proto) UNIX) where + type instance ProtocolId (NotifyProto (RefChanEvents L4Proto) UNIX) = 0x20e14bfa0ca1db8e + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + requestPeriodLim = NoLimit + +-- FIXME: move-this-definitions-somewhere +newtype instance NotifyKey (RefChanEvents e) = + RefChanNotifyKey (RefChanId e) + deriving (Generic) + +deriving newtype instance ForRefChans e => Hashable (NotifyKey (RefChanEvents e)) +deriving newtype instance ForRefChans e => Eq (NotifyKey (RefChanEvents e)) + +data instance NotifyData (RefChanEvents e) = + RefChanNotifyData (RefChanId e) (SignedBox BS.ByteString e) + deriving Generic + +instance ForRefChans e => Serialise (NotifyKey (RefChanEvents e)) +instance ForRefChans e => Serialise (NotifyData (RefChanEvents e)) + +data RefLogEvents s = + RefLogUpdated + +newtype instance NotifyKey (RefLogEvents e) = + RefLogNotifyKey (RefLogKey (Encryption e)) + deriving (Generic) + +data instance NotifyData (RefLogEvents e) = + RefLogUpdateNotifyData (RefLogKey (Encryption e)) HashRef + deriving (Generic) + +type ForRefLogEvents e = ( Serialise (PubKey 'Sign (Encryption e)) + , Serialise (RefLogKey (Encryption e)) + , FromStringMaybe (PubKey 'Sign (Encryption e)) + , Hashable (PubKey 'Sign (Encryption e)) + , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) + ) + +deriving newtype instance ForRefLogEvents e => Hashable (NotifyKey (RefLogEvents e)) +deriving newtype instance ForRefLogEvents e => Eq (NotifyKey (RefLogEvents e)) + +instance ForRefLogEvents e => Serialise (NotifyKey (RefLogEvents e)) + +instance ForRefLogEvents e => Serialise (NotifyData (RefLogEvents e)) + +instance ForRefLogEvents L4Proto => HasProtocol UNIX (NotifyProto (RefLogEvents L4Proto) UNIX) where + type instance ProtocolId (NotifyProto (RefLogEvents L4Proto) UNIX) = 0x65A9ECE2A182216 + type instance Encoded UNIX = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + requestPeriodLim = NoLimit + + + +