From 78a58039b2496453a3c3e7d2e9739fc9f86b5283 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 18 Apr 2024 13:23:23 +0300 Subject: [PATCH] wip --- .../HBS2/Git/Data/Tx/Index.hs | 11 +- hbs2-git/hbs2-git-dashboard/GitDashBoard.hs | 21 +- .../src/HBS2/Git/DashBoard/State.hs | 184 +++++++++++++++++- .../src/HBS2/Git/DashBoard/Types.hs | 107 ++++++++++ .../HBS2/Peer/Proto/RefChan/RefChanUpdate.hs | 28 +++ hbs2-tests/test/playground/Main.hs | 13 ++ 6 files changed, 349 insertions(+), 15 deletions(-) create mode 100644 hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/Types.hs diff --git a/hbs2-git/hbs2-git-client-lib/HBS2/Git/Data/Tx/Index.hs b/hbs2-git/hbs2-git-client-lib/HBS2/Git/Data/Tx/Index.hs index c38dd666..432ec254 100644 --- a/hbs2-git/hbs2-git-client-lib/HBS2/Git/Data/Tx/Index.hs +++ b/hbs2-git/hbs2-git-client-lib/HBS2/Git/Data/Tx/Index.hs @@ -45,11 +45,11 @@ instance ForGitIndex s => Pretty (GitRepoAnnounce s) where newtype NotifyCredentials s = NotifyCredentials (PeerCredentials s) newtype GitIndexRepoName = GitIndexRepoName Text - deriving stock (Generic,Show) + deriving stock (Data,Generic,Show) deriving newtype (Serialise) newtype GitIndexRepoBrief = GitIndexRepoBrief Text - deriving stock (Generic,Show) + deriving stock (Data,Generic,Show) deriving newtype (Serialise) newtype GitIndexRepoManifest = GitIndexRepoManifest (Maybe Text) @@ -61,13 +61,13 @@ data GitIndexRepoDefineData = { gitIndexRepoName :: GitIndexRepoName , gitIndexRepoBrief :: GitIndexRepoBrief } - deriving stock (Generic,Show) + deriving stock (Data,Generic,Show) data GitIndexEntry = GitIndexRepoDefine GitIndexRepoDefineData | GitIndexRepoTombEntry | GitIndexRepoLikes Integer - deriving stock (Generic) + deriving stock (Data,Generic) data GitIndexTx s = GitIndexTx @@ -105,6 +105,9 @@ makeNotificationTx ncred lww lwsk forkInfo = do makeSignedBox @s (view peerSignPk creds) (view peerSignSk creds) (LBS.toStrict $ serialise repoAnn) + + + unpackNotificationTx :: forall s m . (ForGitIndex s, MonadError OperationError m) => SignedBox ByteString s -> m (GitRepoAnnounce s) diff --git a/hbs2-git/hbs2-git-dashboard/GitDashBoard.hs b/hbs2-git/hbs2-git-dashboard/GitDashBoard.hs index 75abbf3f..ff34b851 100644 --- a/hbs2-git/hbs2-git-dashboard/GitDashBoard.hs +++ b/hbs2-git/hbs2-git-dashboard/GitDashBoard.hs @@ -114,8 +114,6 @@ runDashBoardM cli m = do -- FIXME: unix-socket-from-config soname <- detectRPC `orDie` "hbs2-peer rpc not found" - env <- newDashBoardEnv conf dbFile - let errorPrefix = toStderr . logPrefix "[error] " let warnPrefix = toStderr . logPrefix "[warn] " let noticePrefix = toStderr . logPrefix "" @@ -129,10 +127,6 @@ runDashBoardM cli m = do flip runContT pure do - void $ ContT $ withAsync do - q <- withDashBoardEnv env $ asks _pipeline - forever do - liftIO (atomically $ readTQueue q) & liftIO . join client <- liftIO $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) >>= orThrowUser ("can't connect to" <+> pretty soname) @@ -141,6 +135,7 @@ runDashBoardM cli m = do peerAPI <- makeServiceCaller @PeerAPI (fromString soname) refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) + refChanAPI <- makeServiceCaller @RefChanAPI (fromString soname) storageAPI <- makeServiceCaller @StorageAPI (fromString soname) lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) @@ -148,12 +143,26 @@ runDashBoardM cli m = do let endpoints = [ Endpoint @UNIX peerAPI , Endpoint @UNIX refLogAPI + , Endpoint @UNIX refChanAPI , Endpoint @UNIX lwwAPI , Endpoint @UNIX storageAPI ] void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + env <- newDashBoardEnv + conf + dbFile + peerAPI + refLogAPI + refChanAPI + lwwAPI + sto + + void $ ContT $ withAsync do + q <- withDashBoardEnv env $ asks _pipeline + forever do + liftIO (atomically $ readTQueue q) & liftIO . join lift $ withDashBoardEnv env (withState evolveDB >> m) `finally` do diff --git a/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/State.hs b/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/State.hs index fd468398..013d4464 100644 --- a/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/State.hs +++ b/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/State.hs @@ -1,35 +1,209 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE ViewPatterns #-} module HBS2.Git.DashBoard.State where import HBS2.Prelude.Plated +import HBS2.Merkle +import HBS2.Data.Types.Refs +import HBS2.Base58 +import HBS2.Clock +import HBS2.Net.Auth.Schema +import HBS2.Misc.PrettyStuff +import HBS2.Net.Proto.Service +import HBS2.Storage + +import HBS2.Peer.Proto.LWWRef +import HBS2.Peer.Proto.RefChan.Types +import HBS2.Peer.Proto.RefChan.RefChanUpdate +import HBS2.Peer.RPC.API.RefChan import HBS2.Git.DashBoard.Types import HBS2.System.Logger.Simple.ANSI import Data.Config.Suckless -import DBPipe.SQLite -import DBPipe.SQLite.Generic +import DBPipe.SQLite hiding (insert) +import DBPipe.SQLite.Generic as G +import Data.Maybe +import Data.Text qualified as Text import Text.InterpolatedString.Perl6 (qc) import Control.Monad.Reader +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Data.Coerce +import Streaming.Prelude qualified as S + +type MyRefChan = RefChanId L4Proto evolveDB :: MonadIO m => DBPipeM m () evolveDB = do ddl [qc| - create table if not exists project + create table if not exists repo ( lww text not null , primary key (lww) ) |] + ddl [qc| + create table if not exists brief + ( lww text not null + , brief text not null + , primary key (lww) + ) + |] + + ddl [qc| + create table if not exists name + ( lww text not null + , name text not null + , primary key (lww) + ) + |] + + ddl [qc| + create table if not exists processed + ( hash text not null + , primary key (hash) + ) + |] + pure () -updateIndex :: (MonadIO m, HasConf m, MonadReader DashBoardEnv m) => m () +instance ToField HashRef where + toField x = toField $ show $ pretty x + +instance Pretty (AsBase58 (PubKey 'Sign s)) => ToField (LWWRefKey s) where + toField x = toField $ show $ pretty (AsBase58 x) + +newtype TxHash = TxHash HashRef + deriving newtype (ToField) + +newtype RepoName = RepoName Text + deriving newtype (ToField) + +newtype RepoBrief = RepoBrief Text + deriving newtype (ToField) + +newtype RepoLww = RepoLww (LWWRefKey 'HBS2Basic) + deriving newtype (ToField) + +data TxProcessedTable +data RepoTable +data RepoNameTable +data RepoBriefTable + +instance HasTableName RepoTable where + tableName = "repo" + +instance HasTableName RepoNameTable where + tableName = "name" + +instance HasTableName RepoBriefTable where + tableName = "brief" + +instance HasTableName TxProcessedTable where + tableName = "processed" + +instance HasColumnName TxHash where + columnName = "hash" + +instance HasColumnName RepoLww where + columnName = "lww" + +instance HasColumnName RepoName where + columnName = "name" + +instance HasColumnName RepoBrief where + columnName = "brief" + +instance HasPrimaryKey TxProcessedTable where + primaryKey = [G.columnName @TxHash] + +instance HasPrimaryKey RepoTable where + primaryKey = [G.columnName @RepoLww] + +instance HasPrimaryKey RepoNameTable where + primaryKey = [G.columnName @RepoLww] + +instance HasPrimaryKey RepoBriefTable where + primaryKey = [G.columnName @RepoLww] + + +pattern PRefChan :: MyRefChan -> Syntax C +pattern PRefChan s <- ListVal [ SymbolVal "refchan" , asRefChan -> Just s ] + +asRefChan :: Syntax C -> Maybe MyRefChan +asRefChan = \case + LitStrVal s -> fromStringMay @MyRefChan (Text.unpack s) + _ -> Nothing + +getIndexEntries :: (DashBoardPerks m, HasConf m, MonadReader DashBoardEnv m) => m [MyRefChan] +getIndexEntries = do + conf <- getConf + + pure [ s | ListVal [ SymbolVal "index", PRefChan s] <- conf ] + + +updateIndex :: (DashBoardPerks m, HasConf m, MonadReader DashBoardEnv m) => m () updateIndex = do debug "updateIndex" - pure () + rchanAPI <- asks _refChanAPI + sto <- asks _sto + + flip runContT pure do + + es <- lift getIndexEntries + + for_ es $ \rc -> do + callCC \next -> do + debug $ red (pretty (AsBase58 rc)) + + h <- lift (callRpcWaitMay @RpcRefChanGet (1 :: Timeout 'Seconds) rchanAPI rc) + <&> join + >>= maybe (next ()) pure + + debug $ "rechan val" <+> red (pretty h) + + txs <- S.toList_ do + walkMerkle @[HashRef] (coerce h) (getBlock sto) $ \case + Left{} -> pure () + Right hs -> mapM_ S.yield hs + + for_ txs $ \txh -> void $ runMaybeT do + + done <- lift $ lift $ withState do + select @(Only Int) + [qc|select 1 from processed where hash = ? limit 1|] + (Only (TxHash txh)) <&> isJust . listToMaybe + + guard (not done) + + tx@GitIndexTx{..} <- getBlock sto (coerce txh) + >>= toMPlus + >>= readProposeTranMay @(GitIndexTx 'HBS2Basic) @L4Proto + >>= toMPlus + + lift $ lift $ withState $ transactional do + let nm = [ RepoName n | GitIndexRepoName n <- universeBi gitIndexTxPayload ] & headMay + let bri = [ RepoBrief n | GitIndexRepoBrief n <- universeBi gitIndexTxPayload ] & headMay + + insert @RepoTable $ onConflictIgnore @RepoTable (Only (RepoLww gitIndexTxRef)) + + -- FIXME: on-conflict-update! + for_ nm $ \n -> do + insert @RepoNameTable $ onConflictIgnore @RepoNameTable (RepoLww gitIndexTxRef, n) + + for_ bri $ \n -> do + insert @RepoBriefTable $ onConflictIgnore @RepoBriefTable (RepoLww gitIndexTxRef, n) + + lift $ withState $ transactional do + for_ txs $ \t -> do + insert @TxProcessedTable $ onConflictIgnore @TxProcessedTable (Only (TxHash t)) diff --git a/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/Types.hs b/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/Types.hs new file mode 100644 index 00000000..9f48b7d4 --- /dev/null +++ b/hbs2-git/hbs2-git-dashboard/src/HBS2/Git/DashBoard/Types.hs @@ -0,0 +1,107 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +module HBS2.Git.DashBoard.Types + ( module HBS2.Git.DashBoard.Types + , module HBS2.Git.Data.Tx.Index + ) where + +import HBS2.Prelude.Plated + +import HBS2.Git.Data.Tx.Index + +import HBS2.Net.Proto.Service +import HBS2.Storage +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.API.RefLog +import HBS2.Peer.RPC.API.RefChan +import HBS2.Peer.RPC.API.LWWRef +import HBS2.Peer.RPC.API.Storage +import HBS2.Peer.RPC.Client.StorageClient +import HBS2.Net.Messaging.Unix + +import Data.Config.Suckless + +import DBPipe.SQLite +import Control.Monad.Reader + +import UnliftIO + +data HttpPortOpt + +data DevelopAssetsOpt + +instance HasConf m => HasCfgKey HttpPortOpt a m where + key = "port" + + +instance HasConf m => HasCfgKey DevelopAssetsOpt a m where + key = "develop-assets" + +data RunDashBoardOpts = RunDashBoardOpts + { configPath :: Maybe FilePath } + +instance Monoid RunDashBoardOpts where + mempty = RunDashBoardOpts Nothing + +instance Semigroup RunDashBoardOpts where + (<>) _ b = RunDashBoardOpts { configPath = configPath b } + + +data DashBoardEnv = + DashBoardEnv + { _peerAPI :: ServiceCaller PeerAPI UNIX + , _refLogAPI :: ServiceCaller RefLogAPI UNIX + , _refChanAPI :: ServiceCaller RefChanAPI UNIX + , _lwwRefAPI :: ServiceCaller LWWRefAPI UNIX + , _sto :: AnyStorage + , _dashBoardConf :: TVar [Syntax C] + , _db :: DBPipeEnv + , _pipeline :: TQueue (IO ()) + } + +type DashBoardPerks m = MonadUnliftIO m + +newtype DashBoardM m a = DashBoardM { fromDashBoardM :: ReaderT DashBoardEnv m a } + deriving newtype + ( Applicative + , Functor + , Monad + , MonadIO + , MonadUnliftIO + , MonadTrans + , MonadReader DashBoardEnv + ) + +instance (MonadIO m, Monad m, MonadReader DashBoardEnv m) => HasConf m where + getConf = do + asks _dashBoardConf >>= readTVarIO + +newDashBoardEnv :: MonadIO m + => [Syntax C] + -> FilePath + -> ServiceCaller PeerAPI UNIX + -> ServiceCaller RefLogAPI UNIX + -> ServiceCaller RefChanAPI UNIX + -> ServiceCaller LWWRefAPI UNIX + -> AnyStorage + -> m DashBoardEnv +newDashBoardEnv cfg dbFile peer rlog rchan lww sto = do + DashBoardEnv peer rlog rchan lww sto + <$> newTVarIO cfg + <*> newDBPipeEnv dbPipeOptsDef dbFile + <*> newTQueueIO + +withDashBoardEnv :: Monad m => DashBoardEnv -> DashBoardM m a -> m a +withDashBoardEnv env m = runReaderT (fromDashBoardM m) env + +withState :: (MonadIO m, MonadReader DashBoardEnv m) => DBPipeM m a -> m a +withState f = do + asks _db >>= flip withDB f + + +addJob :: (DashBoardPerks m, MonadReader DashBoardEnv m) => IO () -> m () +addJob f = do + q <- asks _pipeline + atomically $ writeTQueue q f + diff --git a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs index e4666cf8..7fe938c7 100644 --- a/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs +++ b/hbs2-peer/lib/HBS2/Peer/Proto/RefChan/RefChanUpdate.hs @@ -29,6 +29,7 @@ import Codec.Serialise import Control.Monad.Identity import Control.Monad.Trans.Maybe import Data.ByteString (ByteString) +import Data.ByteString.Lazy qualified as LBS import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.HashSet (HashSet) @@ -563,6 +564,33 @@ refChanRequestProto self adapter msg = do lift $ emit RefChanRequestEventKey (RefChanRequestEvent @e chan val) debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val + -- case s of + -- Accept{} -> pure () + -- Propose _ box -> do + -- (_, ProposeTran _ pbox :: ProposeTran L4Proto) <- toMPlus $ unboxSignedBox0 box + -- (_, bs2) <- toMPlus $ unboxSignedBox0 pbox + -- liftIO $ BS.putStr bs2 + +readProposeTranMay :: forall p e s m . ( Monad m + , ForRefChans e + , Signatures (Encryption e) + , s ~ Encryption e + , Serialise p + ) + => LBS.ByteString + -> m (Maybe p) +readProposeTranMay lbs = runMaybeT do + + updTx <- deserialiseOrFail @(RefChanUpdate e) lbs & toMPlus + + box <- case updTx of + Accept{} -> mzero + Propose _ box -> pure box + + (_, ProposeTran _ pbox :: ProposeTran e) <- toMPlus $ unboxSignedBox0 @_ @s box + (_, bs2) <- toMPlus $ unboxSignedBox0 pbox + + deserialiseOrFail @p (LBS.fromStrict bs2) & toMPlus makeProposeTran :: forall e s m . ( MonadIO m , ForRefChans e diff --git a/hbs2-tests/test/playground/Main.hs b/hbs2-tests/test/playground/Main.hs index 1c1525d7..426307b8 100644 --- a/hbs2-tests/test/playground/Main.hs +++ b/hbs2-tests/test/playground/Main.hs @@ -92,6 +92,19 @@ runWithAsync = do void $ waitAnyCatchCancel [t1,q,pysh] +testCont :: IO () +testCont = do + + flip runContT pure do + for_ [1..10] $ \i -> do + callCC \next -> do + + when (even i) do + next () + + liftIO $ print i + + main :: IO () main = do print "1"