Notify proto + wiping resource-t in Messaging/Unix

This commit is contained in:
Dmitry Zuikov 2023-11-03 16:49:29 +03:00
parent 88df87a1d5
commit 35905b94bd
13 changed files with 525 additions and 157 deletions

View File

@ -79,6 +79,7 @@ library
, HBS2.Base58 , HBS2.Base58
, HBS2.Clock , HBS2.Clock
, HBS2.Crypto , HBS2.Crypto
, HBS2.ScheduledAction
, HBS2.Data.Detect , HBS2.Data.Detect
, HBS2.Data.Types , HBS2.Data.Types
, HBS2.Data.Types.Crypto , HBS2.Data.Types.Crypto
@ -161,6 +162,7 @@ library
, fast-logger , fast-logger
, filelock , filelock
, filepath , filepath
, exceptions
, generic-lens , generic-lens
, hashable , hashable
, interpolatedstring-perl6 , interpolatedstring-perl6
@ -180,6 +182,7 @@ library
, random-shuffle , random-shuffle
, resourcet , resourcet
, safe , safe
, safe-exceptions
, saltine ^>=0.2.0.1 , saltine ^>=0.2.0.1
, serialise , serialise
, sockaddr , sockaddr
@ -216,6 +219,7 @@ test-suite test
, FakeMessaging , FakeMessaging
, HasProtocol , HasProtocol
, DialogSpec , DialogSpec
, TestScheduled
-- other-extensions: -- other-extensions:

View File

@ -15,25 +15,22 @@ import HBS2.Clock
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import Control.Monad.Trans.Resource
import Control.Monad import Control.Monad
import Control.Monad.Reader import Control.Monad.Reader hiding (reader)
import Data.ByteString qualified as BS
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.Function
import Data.Functor
import Data.Hashable import Data.Hashable
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Network.ByteOrder hiding (ByteString) import Network.ByteOrder hiding (ByteString)
import Network.Socket import Network.Socket
import Network.Socket.ByteString import Network.Socket.ByteString hiding (sendTo)
import Network.Socket.ByteString.Lazy qualified as SL import Network.Socket.ByteString.Lazy qualified as SL
import Control.Concurrent.STM.TQueue (flushTQueue) import Control.Concurrent.STM.TQueue (flushTQueue)
import Data.Set (Set) import Data.Set (Set)
import Data.Set qualified as Set import Data.Set qualified as Set
import Lens.Micro.Platform import Lens.Micro.Platform
import Control.Monad.Trans.Cont
import UnliftIO import UnliftIO
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
@ -61,6 +58,8 @@ instance Hashable (Peer UNIX) where
data MessagingUnixOpts = data MessagingUnixOpts =
MUWatchdog Int MUWatchdog Int
| MUNoFork | MUNoFork
| MUDontRetry
| MUKeepAlive Int
deriving (Eq,Ord,Show,Generic,Data) deriving (Eq,Ord,Show,Generic,Data)
-- FIXME: use-bounded-queues -- FIXME: use-bounded-queues
@ -114,177 +113,201 @@ data ReadTimeoutException = ReadTimeoutException deriving (Show, Typeable)
instance Exception ReadTimeoutException instance Exception ReadTimeoutException
data UnixMessagingStopped = UnixMessagingStopped deriving (Show,Typeable)
instance Exception UnixMessagingStopped
runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m () runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m ()
runMessagingUnix env = do runMessagingUnix env = do
if msgUnixServer env then if msgUnixServer env then
runServer liftIO runServer
else else
runClient runClient
where where
runServer = forever $ handleAny cleanupAndRetry $ runResourceT do runServer = forever $ handleAny cleanupAndRetry $ flip runContT pure $ do
t0 <- getTimeCoarse t0 <- getTimeCoarse
atomically $ writeTVar (msgUnixLast env) t0 atomically $ writeTVar (msgUnixLast env) t0
sock <- liftIO $ socket AF_UNIX Stream defaultProtocol forked <- newTVarIO (mempty :: [Async ()])
void $ allocate (pure sock) (`shutdown` ShutdownBoth) let fork w = do
l <- async w
atomically $ modifyTVar forked (l :)
let doFork = not $ Set.member MUNoFork (msgUnixOpts env)
let withSession | doFork = void . liftIO . fork
| otherwise = void . liftIO
-- watchdog <- liftIO $ async runWatchDog
let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol
let closeSock = liftIO . close
sock <- ContT $ bracket openSock closeSock
_ <- ContT $ bracket (pure forked) $ \clients -> do
readTVarIO clients >>= mapM_ cancel
liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env) liftIO $ bind sock $ SockAddrUnix (msgUnixSockPath env)
liftIO $ listen sock 5 liftIO $ listen sock 5
-- let withSession = void . async . runResourceT forever do
(so, sa) <- liftIO $ accept sock
let doFork = not $ Set.member MUNoFork (msgUnixOpts env) peerNum <- atomically $ do
n <- readTVar (msgUnixAccepts env)
modifyTVar (msgUnixAccepts env) succ
pure n
let withSession | doFork = void . async . runResourceT withSession $ flip runContT void do
| otherwise = void . runResourceT
watchdog <- async $ do seen <- getTimeCoarse >>= newTVarIO
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ] let that = if doFork then
msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum)
else
msgUnixSelf env
maybe1 mwd (forever (pause @'Seconds 60)) $ \wd -> do let writer = liftIO $ async $ forever do
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
forever do maybe1 mq none $ \q -> do
msg <- liftIO . atomically $ readTQueue q
pause $ TimeoutSec $ realToFrac $ min (wd `div` 2) 1
now <- getTimeCoarse
seen <- readTVarIO (msgUnixLast env)
acc <- readTVarIO (msgUnixAccepts env)
trace $ "watchdog" <+> pretty now <+> pretty seen <+> pretty acc
let diff = toNanoSeconds $ TimeoutTS (now - seen)
when ( acc > 0 && diff >= toNanoSeconds (TimeoutSec $ realToFrac wd) ) do
throwIO ReadTimeoutException
run <- async $ forever $ runResourceT do
(so, sa) <- liftIO $ accept sock
-- FIXME: fixing-unix-sockets
-- Вот тут: нумеруем клиентов, в PeerAddr ставим
-- строку или номер.
peerNum <- atomically $ do
n <- readTVar (msgUnixAccepts env)
modifyTVar (msgUnixAccepts env) succ
pure n
withSession do
let that = if doFork then
msgUnixSelf env & over fromPeerUnix (<> "#" <> show peerNum)
else
msgUnixSelf env
void $ allocate ( createQueues env that ) dropQueuesFor
void $ allocate (pure so) close
writer <- async $ forever do
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
maybe1 mq none $ \q -> do
msg <- liftIO . atomically $ readTQueue q
let len = fromIntegral $ LBS.length msg :: Int let len = fromIntegral $ LBS.length msg :: Int
let bs = bytestring32 (fromIntegral len) let bs = bytestring32 (fromIntegral len)
liftIO $ sendAll so $ bytestring32 (fromIntegral len) liftIO $ sendAll so $ bytestring32 (fromIntegral len)
-- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs -- debug $ "sendAll" <+> pretty len <+> pretty (LBS.length msg) <+> viaShow bs
liftIO $ SL.sendAll so msg liftIO $ SL.sendAll so msg
void $ allocate (pure writer) cancel void $ ContT $ bracket (createQueues env that) dropQueuesFor
link writer void $ ContT $ bracket writer cancel
fix \next -> do void $ ContT $ bracket ( pure so ) closeSock
me <- liftIO myThreadId
let mq = Just (msgUnixRecv env) void $ ContT $ bracket ( debug $ "Client thread started" <+> pretty that )
( \_ -> debug $ "Client thread finished" <+> pretty that )
-- frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral fix \next -> do
frameLen <- liftIO $ readFromSocket so 4 <&> LBS.toStrict <&> word32 <&> fromIntegral
-- debug $ "frameLen" <+> pretty frameLen let mq = Just (msgUnixRecv env)
frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict -- frameLen <- liftIO $ recv so 4 <&> word32 <&> fromIntegral
frameLen <- liftIO $ readFromSocket so 4 <&> LBS.toStrict <&> word32 <&> fromIntegral
maybe1 mq none $ \q -> do -- debug $ "frameLen" <+> pretty frameLen
atomically $ writeTQueue q (From that, frame)
now <- getTimeCoarse if frameLen == 0 then do
atomically $ writeTVar (msgUnixLast env) now -- answer to empty message
next liftIO $ sendAll so $ bytestring32 0
else do
frame <- liftIO $ readFromSocket so frameLen -- <&> LBS.toStrict
(_, r) <- waitAnyCatchCancel [run, watchdog] maybe1 mq none $ \q -> do
atomically $ writeTQueue q (From that, frame)
case r of now <- getTimeCoarse
Left e -> throwIO e -- TODO: to-remove-global-watchdog
Right{} -> pure () atomically $ writeTVar (msgUnixLast env) now
atomically $ writeTVar seen now
next
handleClient | MUDontRetry `elem` msgUnixOpts env = \_ w -> handleAny throwStopped w
| otherwise = handleAny
runClient = liftIO $ forever $ handleAny logAndRetry $ runResourceT do throwStopped _ = throwIO UnixMessagingStopped
runClient = liftIO $ forever $ handleClient logAndRetry $ flip runContT pure $ do
let sa = SockAddrUnix (msgUnixSockPath env) let sa = SockAddrUnix (msgUnixSockPath env)
let p = msgUnixSockPath env let p = msgUnixSockPath env
let who = PeerUNIX p let who = PeerUNIX p
tseen <- getTimeCoarse >>= newTVarIO
createQueues env who void $ ContT $ bracket (createQueues env who) dropQueuesFor
sock <- liftIO $ socket AF_UNIX Stream defaultProtocol let openSock = liftIO $ socket AF_UNIX Stream defaultProtocol
let closeSock = close
void $ allocate (pure sock) close sock <- ContT $ bracket openSock closeSock
let attemptConnect = do let attemptConnect = do
result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env) result <- liftIO $ try $ connect sock $ SockAddrUnix (msgUnixSockPath env)
case result of case result of
Right _ -> return () Right _ -> none
Left (e :: SomeException) -> do Left (e :: SomeException) -> do
pause (msgUnixRetryTime env) pause (msgUnixRetryTime env)
warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e warn $ "MessagingUnix. failed to connect" <+> pretty sa <+> viaShow e
pause @'Seconds 2.5
attemptConnect attemptConnect
attemptConnect attemptConnect
-- TODO: create-queues! reader <- ContT $ liftIO . withAsync do
forever do
let q = msgUnixRecv env
reader <- async $ do -- Read response from server
frameLen <- liftIO $ readFromSocket sock 4 <&> LBS.toStrict <&> word32 <&> fromIntegral
getTimeCoarse >>= (atomically . writeTVar tseen)
when (frameLen > 0) do
frame <- liftIO $ readFromSocket sock frameLen
-- сообщения кому? **МНЕ**
-- сообщения от кого? от **КОГО-ТО**
atomically $ writeTQueue q (From who, frame)
watchdog <- ContT $ liftIO . withAsync do
let mwd = headMay [ n | MUWatchdog n <- Set.toList (msgUnixOpts env) ]
case mwd of
Nothing -> forever (pause @'Seconds 600)
Just n -> forever do
sendTo env (To who) (From who) (mempty :: ByteString)
now <- getTimeCoarse
seen <- readTVarIO tseen
let diff = toNanoSeconds $ TimeoutTS (now - seen)
trace $ "I'm a watchdog!" <+> pretty diff
when ( diff > toNanoSeconds (TimeoutSec $ realToFrac n) ) do
trace "watchdog fired!"
throwIO ReadTimeoutException
pause (TimeoutSec (max 1 (realToFrac n / 2)))
writer <- ContT $ liftIO . withAsync do
forever do forever do
let q = msgUnixRecv env
-- Read response from server -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
frameLen <- liftIO $ readFromSocket sock 4 <&> LBS.toStrict <&> word32 <&> fromIntegral -- У нас один контрагент, имя сокета (файла) == адрес пира.
frame <- liftIO $ readFromSocket sock frameLen -- Как в TCP порт сервиса (а отвечает тот с другого порта)
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
-- сообщения кому? **МНЕ** maybe1 mq none $ \q -> do
-- сообщения от кого? от **КОГО-ТО** msg <- liftIO . atomically $ readTQueue q
atomically $ writeTQueue q (From who, frame) let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll sock $ bytestring32 (fromIntegral len)
liftIO $ SL.sendAll sock msg
forever do r <- waitAnyCatchCancel [reader, writer, watchdog]
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. case snd r of
-- У нас один контрагент, имя сокета (файла) == адрес пира. Right{} -> pure ()
-- Как в TCP порт сервиса (а отвечает тот с другого порта) Left e -> throwIO e
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
maybe1 mq none $ \q -> do
msg <- liftIO . atomically $ readTQueue q
let len = fromIntegral $ LBS.length msg :: Int
liftIO $ sendAll sock $ bytestring32 (fromIntegral len)
liftIO $ SL.sendAll sock msg
void $ waitAnyCatchCancel [reader]
cleanupAndRetry e = liftIO do cleanupAndRetry e = liftIO do
warn $ "MessagingUnix. client seems gone. restaring server" <+> pretty (msgUnixSelf env) warn $ "MessagingUnix. client seems gone. restaring server" <+> pretty (msgUnixSelf env)

View File

@ -20,6 +20,7 @@ import Data.Kind
import Data.List qualified as List import Data.List qualified as List
import Data.Word import Data.Word
import Control.Concurrent.STM (flushTQueue) import Control.Concurrent.STM (flushTQueue)
import Data.Maybe
import UnliftIO import UnliftIO
@ -352,7 +353,7 @@ data SomeCallback ev =
data SomeNotifySource ev = data SomeNotifySource ev =
SomeNotifySource SomeNotifySource
{ handleCount :: TVar NotifyHandle { handleCount :: TVar NotifyHandle
, listeners :: TVar (HashMap NotifyHandle (SomeCallback ev)) , listeners :: TVar (HashMap (NotifyKey ev) [(NotifyHandle, SomeCallback ev)])
} }
newSomeNotifySource :: forall ev m . (MonadIO m, ForNotify ev) newSomeNotifySource :: forall ev m . (MonadIO m, ForNotify ev)
@ -365,18 +366,20 @@ instance ForNotify ev => NotifySource ev (SomeNotifySource ev) where
startNotify src key fn = do startNotify src key fn = do
ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s) ha <- atomically $ stateTVar (handleCount src) $ \s -> (s, succ s)
atomically $ modifyTVar (listeners src) (HashMap.insert ha (SomeCallback @ev fn)) atomically $ modifyTVar (listeners src) (HashMap.insertWith (<>) key [(ha, SomeCallback @ev fn)])
pure ha pure ha
stopNotify src ha = do stopNotify src ha = do
atomically $ modifyTVar (listeners src) (HashMap.delete ha) atomically do
modifyTVar (listeners src) (HashMap.map (filter ((/= ha) . fst )))
modifyTVar (listeners src) (HashMap.filter (not . null))
emitNotify :: forall ev m . MonadIO m emitNotify :: forall ev m . (ForNotify ev, MonadIO m)
=> SomeNotifySource ev => SomeNotifySource ev
-> (NotifyKey ev, NotifyData ev) -> (NotifyKey ev, NotifyData ev)
-> m () -> m ()
emitNotify src (_,d) = do emitNotify src (k,d) = do
who <- readTVarIO (listeners src) <&> HashMap.toList who <- readTVarIO (listeners src) <&> HashMap.lookup k <&> fromMaybe mempty
for_ who $ \(h, SomeCallback cb) -> cb h d for_ who $ \(h, SomeCallback cb) -> cb h d

View File

@ -25,6 +25,9 @@ import Type.Reflection (someTypeRep)
import Lens.Micro.Platform import Lens.Micro.Platform
newtype RefLogKey s = RefLogKey { fromRefLogKey :: PubKey 'Sign s } newtype RefLogKey s = RefLogKey { fromRefLogKey :: PubKey 'Sign s }
deriving stock Generic
instance Serialise (PubKey 'Sign s) => Serialise (RefLogKey s)
deriving stock instance IsRefPubKey s => Eq (RefLogKey s) deriving stock instance IsRefPubKey s => Eq (RefLogKey s)

View File

@ -201,16 +201,15 @@ runServiceClientMulti :: forall e m . ( MonadIO m
runServiceClientMulti endpoints = do runServiceClientMulti endpoints = do
proto <- async $ runProto @e [ makeResponse @e (makeClient x) | (Endpoint x) <- endpoints ] proto <- async $ runProto @e [ makeResponse @e (makeClient x) | (Endpoint x) <- endpoints ]
link proto
waiters <- forM endpoints $ \(Endpoint caller) -> async $ forever do waiters <- forM endpoints $ \(Endpoint caller) -> async $ forever do
req <- getRequest caller req <- getRequest caller
request @e (callPeer caller) req request @e (callPeer caller) req
mapM_ link waiters
void $ UIO.waitAnyCatchCancel $ proto : waiters r <- UIO.waitAnyCatchCancel $ proto : waiters
either UIO.throwIO (const $ pure ()) (snd r)
notifyServiceCaller :: forall api e m . MonadIO m notifyServiceCaller :: forall api e m . MonadIO m
=> ServiceCaller api e => ServiceCaller api e

View File

@ -0,0 +1,79 @@
{-# Language TemplateHaskell #-}
module HBS2.ScheduledAction
( Scheduled
, scheduleRunPeriod
, defScheduled
, runScheduled
, schedule
) where
import HBS2.Prelude.Plated
import HBS2.Clock
import Prelude hiding (all)
import Data.Word
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Lens.Micro.Platform
import Control.Monad
import Data.List qualified as List
import Control.Exception qualified as E
import UnliftIO as U
-- NOTE: scheduled-action
-- держит список действий (IO ())
-- привязанных к временным "слотам" (секундам) с точностью до
-- секунды.
-- После наступления секунды --- выполняет список действий,
-- привязанных к слоту, удаляет действия, удаляет слот.
-- Полезно, что бы очищать данные, имеющие продолжительность
-- жизни -- всякие там кэши, хэшмапы и так далее.
--
-- В отличие от Cache, не знает о сути действий ничего,
-- кроме того, что это IO ().
--
-- Может быть (и должно, наверное) быть глобальным на
-- всё приложение
type SlotNum = Word64
data Scheduled =
Scheduled
{ _scheduleRunPeriod :: Timeout 'Seconds
, slots :: TVar (HashMap SlotNum [IO ()])
}
makeLenses 'Scheduled
defScheduled :: MonadUnliftIO m => m Scheduled
defScheduled = Scheduled 10 <$> newTVarIO mempty
runScheduled :: MonadUnliftIO m => Scheduled -> m ()
runScheduled sch = forever do
pause (view scheduleRunPeriod sch)
now <- getTimeCoarse <&> toNanoSecs <&> (/1e9) . realToFrac <&> round
expired <- atomically do
all <- readTVar (slots sch) <&> HashMap.toList
let (rest, expired) = List.partition ( (>now) . fst) all
writeTVar (slots sch) (HashMap.fromList rest)
pure expired
for_ expired $ \(_, all) -> do
for_ all $ \action -> do
-- TODO: error-logging-maybe
liftIO $ void $ action `E.catch` (\(_ :: E.ArithException) -> pure ())
`E.catch` (\(_ :: E.IOException) -> pure ())
`E.catch` (\(_ :: E.SomeException) -> pure ())
schedule :: forall a m . (MonadUnliftIO m, Integral a) => Scheduled -> a -> IO () -> m ()
schedule s ttl what = do
now <- getTimeCoarse <&> toNanoSecs <&> (/1e9) . realToFrac <&> round
let slot = now + fromIntegral ttl
atomically $ modifyTVar (slots s) (HashMap.insertWith (<>) slot [what])

View File

@ -4,6 +4,7 @@ import TestFakeMessaging
import TestActors import TestActors
import DialogSpec import DialogSpec
import TestFileLogger import TestFileLogger
import TestScheduled
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
@ -16,7 +17,10 @@ main =
testCase "testFakeMessaging1" testFakeMessaging1 testCase "testFakeMessaging1" testFakeMessaging1
, testCase "testActorsBasic" testActorsBasic , testCase "testActorsBasic" testActorsBasic
, testCase "testFileLogger" testFileLogger , testCase "testFileLogger" testFileLogger
, testDialog , testCase "testScheduledActions" testScheduled
-- FIXME does-not-finish
-- , testDialog
] ]

View File

@ -0,0 +1,79 @@
module TestScheduled where
import HBS2.Prelude
import HBS2.Clock
import HBS2.ScheduledAction
import Test.Tasty.HUnit
import Control.Monad
import Lens.Micro.Platform
import System.IO (hPrint)
import UnliftIO
import Data.List qualified as List
import Control.Monad.Cont
testScheduled :: IO ()
testScheduled = do
tres <- newTVarIO mempty
sch <- defScheduled <&> set scheduleRunPeriod 1.5
s <- async $ runScheduled sch
let addAction = schedule sch
addValue values = atomically $ modifyTVar' tres (values ++)
addAction 1 (addValue [1, 2, 3])
addAction 2 (addValue [10, 20, 30])
addAction 3 (addValue [100, 200, 300])
addAction 2 do
throwIO $ userError "fail!"
addAction 2 do
error "fail 2!"
-- addAction 2 do
-- addValue [1 `div` 0]
pause @'Seconds 3.5
cancel s
let expected = [100,200,300,10,20,30,1,2,3] & List.sort
results <- readTVarIO tres <&> List.sort
hPrint stderr results
assertEqual "all-values-calculated" expected results
testAsync :: IO ()
testAsync = do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
flip runContT pure $ do
a <- ContT $ withAsync do
forever do
pause @'Seconds 1
print "1"
b <- ContT $ withAsync do
forever do
pause @'Seconds 2
print "2"
c <- ContT $ withAsync do
pause @'Seconds 5
print "leaving"
pause @'Seconds 10
liftIO $ print "now what?"

View File

@ -3,11 +3,16 @@ module Main where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.OrDie import HBS2.OrDie
import HBS2.Base58
import HBS2.Actors.Peer
import HBS2.Actors.Peer.Types
import HBS2.Net.Proto.Notify
import HBS2.Data.Types.Refs (HashRef(..)) import HBS2.Data.Types.Refs (HashRef(..))
import HBS2.Net.Proto.Types import HBS2.Net.Proto.Types
import HBS2.Net.Proto.RefLog import HBS2.Net.Proto.RefLog
import HBS2.Peer.RPC.Client.Unix hiding (Cookie) import HBS2.Peer.RPC.Client.Unix hiding (Cookie)
import HBS2.Peer.RPC.API.RefLog import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.Notify
import HBS2.Clock import HBS2.Clock
-- import HBS2Git.PrettyStuff -- import HBS2Git.PrettyStuff
@ -21,6 +26,7 @@ import Data.Config.Suckless.KeyValue
import Control.Monad.Catch (MonadThrow(..)) import Control.Monad.Catch (MonadThrow(..))
import Control.Monad.Except (runExceptT,throwError) import Control.Monad.Except (runExceptT,throwError)
import Control.Monad.Cont
import Control.Monad.Reader import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as LBS import Data.ByteString.Lazy.Char8 qualified as LBS
import Data.Either import Data.Either
@ -37,6 +43,7 @@ import System.Directory
import System.FilePath import System.FilePath
import System.Process.Typed import System.Process.Typed
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
import Control.Concurrent.STM (flushTQueue)
import UnliftIO import UnliftIO
import Web.Scotty hiding (header,next) import Web.Scotty hiding (header,next)
@ -88,6 +95,7 @@ data ReposyncState =
ReposyncState ReposyncState
{ _rpcSoname :: FilePath { _rpcSoname :: FilePath
, _rpcRefLog :: ServiceCaller RefLogAPI UNIX , _rpcRefLog :: ServiceCaller RefLogAPI UNIX
, _rpcNotifySink :: NotifySink (RefLogEvents L4Proto) UNIX
, _reposyncBaseDir :: FilePath , _reposyncBaseDir :: FilePath
, _reposyncPort :: Int , _reposyncPort :: Int
, _reposyncEntries :: TVar [RepoEntry] , _reposyncEntries :: TVar [RepoEntry]
@ -119,10 +127,11 @@ reposyncDefaultDir = unsafePerformIO do
newState :: MonadUnliftIO m newState :: MonadUnliftIO m
=> FilePath => FilePath
-> ServiceCaller RefLogAPI UNIX -> ServiceCaller RefLogAPI UNIX
-> NotifySink (RefLogEvents L4Proto) UNIX
-> m ReposyncState -> m ReposyncState
newState so refLog = newState so refLog sink =
ReposyncState so refLog reposyncDefaultDir 4017 <$> newTVarIO mempty ReposyncState so refLog sink reposyncDefaultDir 4017 <$> newTVarIO mempty
withConfig :: forall a m . (MonadUnliftIO m) => Maybe FilePath -> ReposyncM m a -> ReposyncM m () withConfig :: forall a m . (MonadUnliftIO m) => Maybe FilePath -> ReposyncM m a -> ReposyncM m ()
withConfig cfg m = do withConfig cfg m = do
@ -186,6 +195,7 @@ runSync = do
so <- asks (view rpcSoname) so <- asks (view rpcSoname)
refLogRPC <- asks (view rpcRefLog) refLogRPC <- asks (view rpcRefLog)
sink <- asks (view rpcNotifySink)
root <- asks (view reposyncBaseDir) root <- asks (view reposyncBaseDir)
port <- asks (view reposyncPort) <&> fromIntegral port <- asks (view reposyncPort) <&> fromIntegral
@ -196,40 +206,59 @@ runSync = do
get "/" $ do get "/" $ do
text "This is hbs2-reposync" text "This is hbs2-reposync"
r <- forM es $ \entry -> async $ void $ do r <- forM es $ \entry -> async $ void $ flip runContT pure do
let rk = fromRefLogKey $ repoRef entry let ref = repoRef entry
let rk = fromRefLogKey ref
tv <- newTVarIO Nothing tv <- newTVarIO Nothing
upd <- newTQueueIO
debug $ "STARTED WITH" <+> pretty (repoPath entry) debug $ "STARTED WITH" <+> pretty (repoPath entry)
initRepo entry let notif =
liftIO $ async do
debug $ "Subscribed" <+> pretty ref
runNotifySink sink (RefLogNotifyKey ref) $ \(RefLogUpdateNotifyData _ h) -> do
debug $ "Got notification" <+> pretty ref <+> pretty h
atomically $ writeTQueue upd ()
void $ ContT $ bracket notif cancel
lift $ initRepo entry
lift $ syncRepo entry
fix \next -> do fix \next -> do
rr' <- race (pause @'Seconds 1) do rr' <- liftIO $ race (pause @'Seconds 1) do
callService @RpcRefLogGet refLogRPC rk callService @RpcRefLogGet refLogRPC rk
<&> fromRight Nothing <&> fromRight Nothing
void $ liftIO $ race (pause @'Seconds 60) (atomically (peekTQueue upd))
pause @'Seconds 5
liftIO $ atomically $ flushTQueue upd
rr <- either (const $ pause @'Seconds 10 >> warn "rpc call timeout" >> next) pure rr' rr <- either (const $ pause @'Seconds 10 >> warn "rpc call timeout" >> next) pure rr'
debug $ "REFLOG VALUE:" <+> pretty rr debug $ "REFLOG VALUE:" <+> pretty rr
r0 <- readTVarIO tv r0 <- readTVarIO tv
if rr == r0 then do unless ( rr == r0 ) do
pause @'Seconds 60 debug $ "Syncronize repo!" <+> pretty (repoPath entry)
else do fix \again -> do
debug $ "Syncronize repoes!" <+> pretty (repoPath entry) lift (syncRepo entry) >>= \case
syncRepo entry >>= \case Left{} -> do
Left{} -> pause @'Seconds 60 debug $ "Failed to update:" <+> pretty (repoPath entry)
Right{} -> do pause @'Seconds 1
atomically $ writeTVar tv rr again
pause @'Seconds 10
Right{} -> do
atomically $ writeTVar tv rr
next next
mapM_ waitCatch (http : r) void $ waitAnyCatchCancel (http : r)
data SyncError = SyncError data SyncError = SyncError
@ -356,27 +385,46 @@ withApp cfg m = do
-- lrpc = -- lrpc =
soname <- detectRPC `orDie` "RPC not found" forever $ handleAny cleanup $ do
client <- race ( pause @'Seconds 1) (newMessagingUnix False 1.0 soname) `orDie` "hbs2-peer rpc timeout!" soname <- detectRPC `orDie` "RPC not found"
rpc <- makeServiceCaller (fromString soname) let o = [MUWatchdog 20, MUDontRetry]
messaging <- async $ runMessagingUnix client client <- race ( pause @'Seconds 1) (newMessagingUnixOpts o False 1.0 soname)
link messaging `orDie` "hbs2-peer rpc timeout!"
let endpoints = [ Endpoint @UNIX rpc clientN <- newMessagingUnixOpts o False 1.0 soname
]
c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client rpc <- makeServiceCaller (fromString soname)
state <- newState soname rpc messaging <- async $ runMessagingUnix client
r <- async $ void $ runReaderT (unReposyncM $ withConfig cfg m) state mnotify <- async $ runMessagingUnix clientN
waitAnyCatchCancel [c1, messaging, r] sink <- newNotifySink
notice "exiting" wNotify <- liftIO $ async $ flip runReaderT clientN $ do
debug "notify restarted!"
runNotifyWorkerClient sink
nProto <- liftIO $ async $ flip runReaderT clientN $ do
runProto @UNIX
[ makeResponse (makeNotifyClient @(RefLogEvents L4Proto) sink)
]
let endpoints = [ Endpoint @UNIX rpc
]
c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
state <- newState soname rpc sink
r <- async $ void $ runReaderT (unReposyncM $ withConfig cfg m) state
void $ waitAnyCatchCancel [c1, messaging, mnotify, nProto, wNotify, r]
notice "exiting"
setLoggingOff @DEBUG setLoggingOff @DEBUG
setLoggingOff @INFO setLoggingOff @INFO
@ -385,6 +433,12 @@ withApp cfg m = do
setLoggingOff @NOTICE setLoggingOff @NOTICE
where
cleanup e = do
err (viaShow e)
warn "Something bad happened. Retrying..."
pause @'Seconds 2.5
main :: IO () main :: IO ()
main = runMe . customExecParser (prefs showHelpOnError) $ main = runMe . customExecParser (prefs showHelpOnError) $
info (helper <*> ((,) <$> opts <*> parser)) info (helper <*> ((,) <$> opts <*> parser))

View File

@ -68,6 +68,7 @@ import HBS2.Peer.RPC.API.Storage
import HBS2.Peer.RPC.API.Peer import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.API.RefLog import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.RPC.API.RefChan import HBS2.Peer.RPC.API.RefChan
import HBS2.Peer.Notify
import RPC2(RPC2Context(..)) import RPC2(RPC2Context(..))
@ -547,6 +548,7 @@ instance ( Monad m
response = lift . response response = lift . response
respawn :: PeerOpts -> IO () respawn :: PeerOpts -> IO ()
respawn opts = respawn opts =
if not (view peerRespawn opts) then do if not (view peerRespawn opts) then do
@ -616,6 +618,8 @@ runPeer opts = U.handle (\e -> myException e
setLogging @TRACE1 tracePrefix setLogging @TRACE1 tracePrefix
refChanNotifySource <- newSomeNotifySource @(RefChanEvents L4Proto)
refLogNotifySource <- newSomeNotifySource @(RefLogEvents L4Proto)
let ps = mempty let ps = mempty
@ -752,13 +756,21 @@ runPeer opts = U.handle (\e -> myException e
rce <- refChanWorkerEnv conf penv denv rce <- refChanWorkerEnv conf penv denv
let refChanAdapter = RefChanAdapter let refChanAdapter =
{ refChanOnHead = refChanOnHeadFn rce RefChanAdapter
, refChanSubscribed = isPolledRef @e brains { refChanOnHead = refChanOnHeadFn rce
, refChanWriteTran = refChanWriteTranFn rce , refChanSubscribed = isPolledRef @e brains
, refChanValidatePropose = refChanValidateTranFn @e rce , refChanWriteTran = refChanWriteTranFn rce
, refChanNotifyRely = refChanNotifyRelyFn @e rce , refChanValidatePropose = refChanValidateTranFn @e rce
}
, refChanNotifyRely = \r u -> do
debug "refChanNotifyRely MOTHERFUCKER!"
refChanNotifyRelyFn @e rce r u
case u of
Notify rr s -> do
emitNotify refChanNotifySource (RefChanNotifyKey r, RefChanNotifyData rr s)
_ -> pure ()
}
rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter
@ -796,6 +808,11 @@ runPeer opts = U.handle (\e -> myException e
let rwa = RefLog.RefLogWorkerAdapter let rwa = RefLog.RefLogWorkerAdapter
{ RefLog.reflogDownload = doDownload { RefLog.reflogDownload = doDownload
, RefLog.reflogFetch = doFetchRef , RefLog.reflogFetch = doFetchRef
, RefLog.reflogUpdated = \(r,v) -> do
debug $ "EMIT REFLOG UPDATE" <+> pretty (AsBase58 r)
emitNotify refLogNotifySource ( RefLogNotifyKey @L4Proto r
, RefLogUpdateNotifyData @L4Proto r (HashRef v)
)
} }
let addNewRtt (p,rttNew) = withPeerM penv $ void $ runMaybeT do let addNewRtt (p,rttNew) = withPeerM penv $ void $ runMaybeT do
@ -1080,12 +1097,19 @@ runPeer opts = U.handle (\e -> myException e
m1 <- async $ runMessagingUnix rpcmsg m1 <- async $ runMessagingUnix rpcmsg
rpcProto <- async $ flip runReaderT rpcctx do rpcProto <- async $ flip runReaderT rpcctx do
env <- newNotifyEnvServer @(RefChanEvents L4Proto) refChanNotifySource
envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource
w1 <- asyncLinked $ runNotifyWorkerServer env
w2 <- asyncLinked $ runNotifyWorkerServer envrl
runProto @UNIX runProto @UNIX
[ makeResponse (makeServer @PeerAPI) [ makeResponse (makeServer @PeerAPI)
, makeResponse (makeServer @RefLogAPI) , makeResponse (makeServer @RefLogAPI)
, makeResponse (makeServer @RefChanAPI) , makeResponse (makeServer @RefChanAPI)
, makeResponse (makeServer @StorageAPI) , makeResponse (makeServer @StorageAPI)
, makeResponse (makeNotifyServer @(RefChanEvents L4Proto) env)
, makeResponse (makeNotifyServer @(RefLogEvents L4Proto) envrl)
] ]
mapM_ wait [w1,w2]
void $ waitAnyCancel $ w <> [ loop void $ waitAnyCancel $ w <> [ loop
, m1 , m1

View File

@ -91,6 +91,7 @@ data RefLogWorkerAdapter e =
RefLogWorkerAdapter RefLogWorkerAdapter
{ reflogDownload :: Hash HbSync -> IO () { reflogDownload :: Hash HbSync -> IO ()
, reflogFetch :: PubKey 'Sign (Encryption e) -> IO () , reflogFetch :: PubKey 'Sign (Encryption e) -> IO ()
, reflogUpdated :: (RefLogKey (Encryption e), Hash HbSync) -> IO ()
} }
reflogWorker :: forall e s m . ( e ~ L4Proto reflogWorker :: forall e s m . ( e ~ L4Proto
@ -252,7 +253,7 @@ reflogWorker conf brains adapter = do
pure nref pure nref
-- TODO: old-root-to-delete -- TODO: old-root-to-delete
reflogUpdated adapter (reflogkey, newRoot)
trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty (hashObject @HbSync reflogkey) <+> pretty newRoot trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty (hashObject @HbSync reflogkey) <+> pretty newRoot
-- trace "I'm a reflog update worker" -- trace "I'm a reflog update worker"

View File

@ -135,6 +135,7 @@ library
exposed-modules: exposed-modules:
HBS2.Peer.Brains HBS2.Peer.Brains
HBS2.Peer.Notify
HBS2.Peer.RPC.Class HBS2.Peer.RPC.Class
HBS2.Peer.RPC.API.Peer HBS2.Peer.RPC.API.Peer
HBS2.Peer.RPC.API.RefLog HBS2.Peer.RPC.API.RefLog

View File

@ -0,0 +1,94 @@
{-# Language UndecidableInstances #-}
module HBS2.Peer.Notify
( RefChanEvents(..)
, RefLogEvents(..)
, newNotifyEnvServer
, runNotifyWorkerServer
, runNotifyWorkerClient
, makeNotifyServer
, makeNotifyClient
, newSomeNotifySource
, newNotifySink
, emitNotify
, NotifyKey(..)
, NotifyData(..)
, HasProtocol(..)
) where
import HBS2.Prelude
import HBS2.Base58
import HBS2.Data.Types.Refs
import HBS2.Actors.Peer.Types
import HBS2.Net.Proto.Types
import HBS2.Net.Proto.Notify
import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.RefLog
import HBS2.Net.Messaging.Unix (UNIX)
import HBS2.Data.Types.SignedBox
import HBS2.Net.Proto.Definition()
import Codec.Serialise
import Data.ByteString.Lazy (ByteString)
import Data.ByteString qualified as BS
data RefChanEvents e =
RefChanOnNotify
instance HasProtocol UNIX (NotifyProto (RefChanEvents L4Proto) UNIX) where
type instance ProtocolId (NotifyProto (RefChanEvents L4Proto) UNIX) = 0x20e14bfa0ca1db8e
type instance Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
requestPeriodLim = NoLimit
-- FIXME: move-this-definitions-somewhere
newtype instance NotifyKey (RefChanEvents e) =
RefChanNotifyKey (RefChanId e)
deriving (Generic)
deriving newtype instance ForRefChans e => Hashable (NotifyKey (RefChanEvents e))
deriving newtype instance ForRefChans e => Eq (NotifyKey (RefChanEvents e))
data instance NotifyData (RefChanEvents e) =
RefChanNotifyData (RefChanId e) (SignedBox BS.ByteString e)
deriving Generic
instance ForRefChans e => Serialise (NotifyKey (RefChanEvents e))
instance ForRefChans e => Serialise (NotifyData (RefChanEvents e))
data RefLogEvents s =
RefLogUpdated
newtype instance NotifyKey (RefLogEvents e) =
RefLogNotifyKey (RefLogKey (Encryption e))
deriving (Generic)
data instance NotifyData (RefLogEvents e) =
RefLogUpdateNotifyData (RefLogKey (Encryption e)) HashRef
deriving (Generic)
type ForRefLogEvents e = ( Serialise (PubKey 'Sign (Encryption e))
, Serialise (RefLogKey (Encryption e))
, FromStringMaybe (PubKey 'Sign (Encryption e))
, Hashable (PubKey 'Sign (Encryption e))
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
)
deriving newtype instance ForRefLogEvents e => Hashable (NotifyKey (RefLogEvents e))
deriving newtype instance ForRefLogEvents e => Eq (NotifyKey (RefLogEvents e))
instance ForRefLogEvents e => Serialise (NotifyKey (RefLogEvents e))
instance ForRefLogEvents e => Serialise (NotifyData (RefLogEvents e))
instance ForRefLogEvents L4Proto => HasProtocol UNIX (NotifyProto (RefLogEvents L4Proto) UNIX) where
type instance ProtocolId (NotifyProto (RefLogEvents L4Proto) UNIX) = 0x65A9ECE2A182216
type instance Encoded UNIX = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
requestPeriodLim = NoLimit