diff --git a/fixer-config-example.scm b/fixer-config-example.scm index abdb5cc8..96d17053 100644 --- a/fixer-config-example.scm +++ b/fixer-config-example.scm @@ -2,15 +2,18 @@ ;; (watch 30 (lwwref "BTThPdHKF8XnEq4m6wzbKHKA6geLFK4ydYhBXAqBdHSP") - (run "./on-my-ref.sh") -) - -(watch 10 (lwwref "BTThPdHKF8XnEq4m6wzbKHKA6geLFK4ydYhBXAqBdHSP") (run "./on-my-ref2.sh") ) -(watch 10 (reflog "BKtvRLispCM9UuQqHaNxu4SEUzpQNQ3PeRNknecKGPZ6") +(watch 30 (reflog "BKtvRLispCM9UuQqHaNxu4SEUzpQNQ3PeRNknecKGPZ6") (run "./on-my-ref3.sh") ) +(watch 30 (lwwref "DTmSb3Au7apDTMctQn6yqs9GJ8mFW7YQXzgVqZpmkTtf") + (run "./on-my-ref4.sh") +) + +(watch 10 (lwwref "Byc3XUeSbJBXVFueumkNkVJMPHbGoUdxYEJBgzJPf8io") + (run "./on-my-ref4.sh") +) diff --git a/hbs2-fixer/app/Main.hs b/hbs2-fixer/app/Main.hs index a133ebf1..5088601f 100644 --- a/hbs2-fixer/app/Main.hs +++ b/hbs2-fixer/app/Main.hs @@ -11,8 +11,18 @@ import HBS2.Polling import HBS2.Misc.PrettyStuff import HBS2.System.Dir import HBS2.System.Logger.Simple.ANSI hiding (info) +import HBS2.Net.Messaging.Unix + +import HBS2.Net.Proto.Service import HBS2.Peer.Proto.LWWRef +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.API.RefLog +import HBS2.Peer.RPC.API.LWWRef +import HBS2.Peer.RPC.API.Storage +import HBS2.Peer.RPC.Client.StorageClient + +import HBS2.Peer.CLI.Detect import HBS2.Peer.Proto.RefLog import Data.Config.Suckless @@ -44,18 +54,23 @@ type RLWW = LWWRefKey HBS2Basic type RRefLog = RefLogKey HBS2Basic newtype instance Watcher RRefLog = - WatchRefLog ( RRefLog -> [Syntax C] -> IO () ) + WatchRefLog [Syntax C] + deriving newtype (Semigroup,Monoid) newtype instance (Watcher (LWWRefKey HBS2Basic)) = - WatchLWWRef ( LWWRefKey HBS2Basic -> [Syntax C] -> IO () ) + WatchLWWRef [Syntax C] + deriving newtype (Semigroup,Monoid) data FixerEnv = FixerEnv { _configFile :: Maybe FilePath + , _lwwAPI :: ServiceCaller LWWRefAPI UNIX + , _refLogAPI :: ServiceCaller RefLogAPI UNIX + , _peerAPI :: ServiceCaller PeerAPI UNIX , _config :: TVar Config - , _onRefLog :: TVar ( HashMap RRefLog (NominalDiffTime, [Watcher RRefLog]) ) - , _onLww :: TVar ( HashMap RLWW (NominalDiffTime, [Watcher RLWW])) + , _onRefLog :: TVar ( HashMap RRefLog (NominalDiffTime, Watcher RRefLog) ) + , _onLww :: TVar ( HashMap RLWW (NominalDiffTime, Watcher RLWW)) , _refLogLast :: TVar ( HashMap RRefLog HashRef ) - , _lwwLast :: TVar ( HashMap RRefLog HashRef ) + , _lwwLast :: TVar ( HashMap RLWW HashRef ) } makeLenses ''FixerEnv @@ -83,6 +98,7 @@ withConfig cfgPath m = do local (set config tsyn . set configFile (Just configPath)) (void m) + withApp :: Maybe FilePath -> FixerM IO () -> IO () withApp cfgPath action = do setLogging @DEBUG debugPrefix @@ -91,20 +107,47 @@ withApp cfgPath action = do setLogging @WARN warnPrefix setLogging @NOTICE noticePrefix - env <- FixerEnv Nothing - <$> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty + soname <- detectRPC + `orDie` "can't detect RPC" - runReaderT (runFixerM $ withConfig cfgPath action) env - `finally` do - setLoggingOff @DEBUG - setLoggingOff @INFO - setLoggingOff @ERROR - setLoggingOff @WARN - setLoggingOff @NOTICE + flip runContT pure do + + client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) + >>= orThrowUser ("can't connect to" <+> pretty soname) + + void $ ContT $ withAsync $ runMessagingUnix client + + peerAPI <- makeServiceCaller @PeerAPI (fromString soname) + refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) + -- storageAPI <- makeServiceCaller @StorageAPI (fromString soname) + lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) + + let endpoints = [ Endpoint @UNIX peerAPI + , Endpoint @UNIX refLogAPI + , Endpoint @UNIX lwwAPI + -- , Endpoint @UNIX storageAPI + ] + + void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + + + env <- FixerEnv Nothing + lwwAPI + refLogAPI + peerAPI + <$> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + <*> newTVarIO mempty + + lift $ runReaderT (runFixerM $ withConfig cfgPath action) env + `finally` do + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE where debugPrefix = toStdout . logPrefix "[debug] " @@ -159,6 +202,8 @@ mainLoop = forever $ do -- poll reflogs void $ ContT $ withAsync do + api <- asks _refLogAPI + rlo <- pure $ asks _onRefLog >>= readTVarIO <&> HM.toList @@ -166,6 +211,7 @@ mainLoop = forever $ do polling (Polling 1 1) rlo $ \ref -> do debug $ red "POLL REFLOG" <+> pretty ref + liftIO $ oneSec $ void $ callService @RpcRefLogFetch api (fromRefLogKey ref) pure () pure () @@ -173,6 +219,9 @@ mainLoop = forever $ do -- poll lww void $ ContT $ withAsync do + api <- asks _lwwAPI + olds <- asks _lwwLast + lww <- pure $ asks _onLww >>= readTVarIO <&> HM.toList @@ -180,10 +229,22 @@ mainLoop = forever $ do polling (Polling 1 1) lww $ \ref -> do debug $ red "POLL LWWREF" <+> pretty ref - pure () + liftIO $ oneSec $ void $ callService @RpcLWWRefFetch api ref + + liftIO (oneSec $ callService @RpcLWWRefGet api ref) >>= \case + Right (Right (Just LWWRef{..})) -> do + old <- readTVarIO olds <&> HM.lookup ref + unless (old == Just lwwValue) do + debug $ green "CHANGED" <+> pretty ref <+> pretty lwwValue + atomically $ modifyTVar olds (HM.insert ref lwwValue) + + _ -> pure () + forever $ pause @'Seconds 60 +oneSec :: MonadUnliftIO m => m b -> m (Either () b) +oneSec = race (pause @'Seconds 1) updateFromConfig :: MonadIO m => Config -> FixerM m () updateFromConfig conf = do @@ -202,6 +263,9 @@ updateFromConfig conf = do lww <- asks _onLww lwLast <- asks _lwwLast + peerAPI <- asks _peerAPI + + updates <- S.toList_ $ for_ w $ \(who,sec,what) -> do case who of @@ -211,11 +275,13 @@ updateFromConfig conf = do let k' = fromStringMay @RLWW (Text.unpack r) debug $ red $ "SET LWWREF WATCHER" <+> pretty sec <+> pretty k' <+> pretty what for_ k' $ \k -> do + liftIO $ void $ oneSec $ callService @RpcPollAdd peerAPI (fromLwwRefKey k, "lwwref", 60 * fromIntegral sec) S.yield $ modifyTVar lww (HM.insert k (fromIntegral sec, mempty)) "reflog" -> do let k' = fromStringMay @RRefLog (Text.unpack r) debug $ red $ "SET LWWREF WATCHER" <+> pretty sec <+> pretty k' <+> pretty what for_ k' $ \k -> do + liftIO $ void $ oneSec $ callService @RpcPollAdd peerAPI (fromRefLogKey k, "reflog", 60 * fromIntegral sec) S.yield $ modifyTVar rlo (HM.insert k (fromIntegral sec, mempty)) _ -> pure ()