This commit is contained in:
Dmitry Zuikov 2024-03-18 12:15:56 +03:00
parent 6b8cf74411
commit 752ce1d98a
2 changed files with 93 additions and 24 deletions

View File

@ -2,15 +2,18 @@
;; ;;
(watch 30 (lwwref "BTThPdHKF8XnEq4m6wzbKHKA6geLFK4ydYhBXAqBdHSP") (watch 30 (lwwref "BTThPdHKF8XnEq4m6wzbKHKA6geLFK4ydYhBXAqBdHSP")
(run "./on-my-ref.sh")
)
(watch 10 (lwwref "BTThPdHKF8XnEq4m6wzbKHKA6geLFK4ydYhBXAqBdHSP")
(run "./on-my-ref2.sh") (run "./on-my-ref2.sh")
) )
(watch 10 (reflog "BKtvRLispCM9UuQqHaNxu4SEUzpQNQ3PeRNknecKGPZ6") (watch 30 (reflog "BKtvRLispCM9UuQqHaNxu4SEUzpQNQ3PeRNknecKGPZ6")
(run "./on-my-ref3.sh") (run "./on-my-ref3.sh")
) )
(watch 30 (lwwref "DTmSb3Au7apDTMctQn6yqs9GJ8mFW7YQXzgVqZpmkTtf")
(run "./on-my-ref4.sh")
)
(watch 10 (lwwref "Byc3XUeSbJBXVFueumkNkVJMPHbGoUdxYEJBgzJPf8io")
(run "./on-my-ref4.sh")
)

View File

@ -11,8 +11,18 @@ import HBS2.Polling
import HBS2.Misc.PrettyStuff import HBS2.Misc.PrettyStuff
import HBS2.System.Dir import HBS2.System.Dir
import HBS2.System.Logger.Simple.ANSI hiding (info) import HBS2.System.Logger.Simple.ANSI hiding (info)
import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto.Service
import HBS2.Peer.Proto.LWWRef import HBS2.Peer.Proto.LWWRef
import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.RPC.API.LWWRef
import HBS2.Peer.RPC.API.Storage
import HBS2.Peer.RPC.Client.StorageClient
import HBS2.Peer.CLI.Detect
import HBS2.Peer.Proto.RefLog import HBS2.Peer.Proto.RefLog
import Data.Config.Suckless import Data.Config.Suckless
@ -44,18 +54,23 @@ type RLWW = LWWRefKey HBS2Basic
type RRefLog = RefLogKey HBS2Basic type RRefLog = RefLogKey HBS2Basic
newtype instance Watcher RRefLog = newtype instance Watcher RRefLog =
WatchRefLog ( RRefLog -> [Syntax C] -> IO () ) WatchRefLog [Syntax C]
deriving newtype (Semigroup,Monoid)
newtype instance (Watcher (LWWRefKey HBS2Basic)) = newtype instance (Watcher (LWWRefKey HBS2Basic)) =
WatchLWWRef ( LWWRefKey HBS2Basic -> [Syntax C] -> IO () ) WatchLWWRef [Syntax C]
deriving newtype (Semigroup,Monoid)
data FixerEnv = FixerEnv data FixerEnv = FixerEnv
{ _configFile :: Maybe FilePath { _configFile :: Maybe FilePath
, _lwwAPI :: ServiceCaller LWWRefAPI UNIX
, _refLogAPI :: ServiceCaller RefLogAPI UNIX
, _peerAPI :: ServiceCaller PeerAPI UNIX
, _config :: TVar Config , _config :: TVar Config
, _onRefLog :: TVar ( HashMap RRefLog (NominalDiffTime, [Watcher RRefLog]) ) , _onRefLog :: TVar ( HashMap RRefLog (NominalDiffTime, Watcher RRefLog) )
, _onLww :: TVar ( HashMap RLWW (NominalDiffTime, [Watcher RLWW])) , _onLww :: TVar ( HashMap RLWW (NominalDiffTime, Watcher RLWW))
, _refLogLast :: TVar ( HashMap RRefLog HashRef ) , _refLogLast :: TVar ( HashMap RRefLog HashRef )
, _lwwLast :: TVar ( HashMap RRefLog HashRef ) , _lwwLast :: TVar ( HashMap RLWW HashRef )
} }
makeLenses ''FixerEnv makeLenses ''FixerEnv
@ -83,6 +98,7 @@ withConfig cfgPath m = do
local (set config tsyn . set configFile (Just configPath)) (void m) local (set config tsyn . set configFile (Just configPath)) (void m)
withApp :: Maybe FilePath -> FixerM IO () -> IO () withApp :: Maybe FilePath -> FixerM IO () -> IO ()
withApp cfgPath action = do withApp cfgPath action = do
setLogging @DEBUG debugPrefix setLogging @DEBUG debugPrefix
@ -91,20 +107,47 @@ withApp cfgPath action = do
setLogging @WARN warnPrefix setLogging @WARN warnPrefix
setLogging @NOTICE noticePrefix setLogging @NOTICE noticePrefix
env <- FixerEnv Nothing soname <- detectRPC
<$> newTVarIO mempty `orDie` "can't detect RPC"
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
runReaderT (runFixerM $ withConfig cfgPath action) env flip runContT pure do
`finally` do
setLoggingOff @DEBUG client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
setLoggingOff @INFO >>= orThrowUser ("can't connect to" <+> pretty soname)
setLoggingOff @ERROR
setLoggingOff @WARN void $ ContT $ withAsync $ runMessagingUnix client
setLoggingOff @NOTICE
peerAPI <- makeServiceCaller @PeerAPI (fromString soname)
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
-- storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
let endpoints = [ Endpoint @UNIX peerAPI
, Endpoint @UNIX refLogAPI
, Endpoint @UNIX lwwAPI
-- , Endpoint @UNIX storageAPI
]
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
env <- FixerEnv Nothing
lwwAPI
refLogAPI
peerAPI
<$> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
lift $ runReaderT (runFixerM $ withConfig cfgPath action) env
`finally` do
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
where where
debugPrefix = toStdout . logPrefix "[debug] " debugPrefix = toStdout . logPrefix "[debug] "
@ -159,6 +202,8 @@ mainLoop = forever $ do
-- poll reflogs -- poll reflogs
void $ ContT $ withAsync do void $ ContT $ withAsync do
api <- asks _refLogAPI
rlo <- pure $ asks _onRefLog rlo <- pure $ asks _onRefLog
>>= readTVarIO >>= readTVarIO
<&> HM.toList <&> HM.toList
@ -166,6 +211,7 @@ mainLoop = forever $ do
polling (Polling 1 1) rlo $ \ref -> do polling (Polling 1 1) rlo $ \ref -> do
debug $ red "POLL REFLOG" <+> pretty ref debug $ red "POLL REFLOG" <+> pretty ref
liftIO $ oneSec $ void $ callService @RpcRefLogFetch api (fromRefLogKey ref)
pure () pure ()
pure () pure ()
@ -173,6 +219,9 @@ mainLoop = forever $ do
-- poll lww -- poll lww
void $ ContT $ withAsync do void $ ContT $ withAsync do
api <- asks _lwwAPI
olds <- asks _lwwLast
lww <- pure $ asks _onLww lww <- pure $ asks _onLww
>>= readTVarIO >>= readTVarIO
<&> HM.toList <&> HM.toList
@ -180,10 +229,22 @@ mainLoop = forever $ do
polling (Polling 1 1) lww $ \ref -> do polling (Polling 1 1) lww $ \ref -> do
debug $ red "POLL LWWREF" <+> pretty ref debug $ red "POLL LWWREF" <+> pretty ref
pure () liftIO $ oneSec $ void $ callService @RpcLWWRefFetch api ref
liftIO (oneSec $ callService @RpcLWWRefGet api ref) >>= \case
Right (Right (Just LWWRef{..})) -> do
old <- readTVarIO olds <&> HM.lookup ref
unless (old == Just lwwValue) do
debug $ green "CHANGED" <+> pretty ref <+> pretty lwwValue
atomically $ modifyTVar olds (HM.insert ref lwwValue)
_ -> pure ()
forever $ pause @'Seconds 60 forever $ pause @'Seconds 60
oneSec :: MonadUnliftIO m => m b -> m (Either () b)
oneSec = race (pause @'Seconds 1)
updateFromConfig :: MonadIO m => Config -> FixerM m () updateFromConfig :: MonadIO m => Config -> FixerM m ()
updateFromConfig conf = do updateFromConfig conf = do
@ -202,6 +263,9 @@ updateFromConfig conf = do
lww <- asks _onLww lww <- asks _onLww
lwLast <- asks _lwwLast lwLast <- asks _lwwLast
peerAPI <- asks _peerAPI
updates <- S.toList_ $ for_ w $ \(who,sec,what) -> do updates <- S.toList_ $ for_ w $ \(who,sec,what) -> do
case who of case who of
@ -211,11 +275,13 @@ updateFromConfig conf = do
let k' = fromStringMay @RLWW (Text.unpack r) let k' = fromStringMay @RLWW (Text.unpack r)
debug $ red $ "SET LWWREF WATCHER" <+> pretty sec <+> pretty k' <+> pretty what debug $ red $ "SET LWWREF WATCHER" <+> pretty sec <+> pretty k' <+> pretty what
for_ k' $ \k -> do for_ k' $ \k -> do
liftIO $ void $ oneSec $ callService @RpcPollAdd peerAPI (fromLwwRefKey k, "lwwref", 60 * fromIntegral sec)
S.yield $ modifyTVar lww (HM.insert k (fromIntegral sec, mempty)) S.yield $ modifyTVar lww (HM.insert k (fromIntegral sec, mempty))
"reflog" -> do "reflog" -> do
let k' = fromStringMay @RRefLog (Text.unpack r) let k' = fromStringMay @RRefLog (Text.unpack r)
debug $ red $ "SET LWWREF WATCHER" <+> pretty sec <+> pretty k' <+> pretty what debug $ red $ "SET LWWREF WATCHER" <+> pretty sec <+> pretty k' <+> pretty what
for_ k' $ \k -> do for_ k' $ \k -> do
liftIO $ void $ oneSec $ callService @RpcPollAdd peerAPI (fromRefLogKey k, "reflog", 60 * fromIntegral sec)
S.yield $ modifyTVar rlo (HM.insert k (fromIntegral sec, mempty)) S.yield $ modifyTVar rlo (HM.insert k (fromIntegral sec, mempty))
_ -> pure () _ -> pure ()