This commit is contained in:
Dmitry Zuikov 2024-04-18 13:23:23 +03:00
parent 700777a8fd
commit 78a58039b2
6 changed files with 349 additions and 15 deletions

View File

@ -45,11 +45,11 @@ instance ForGitIndex s => Pretty (GitRepoAnnounce s) where
newtype NotifyCredentials s = NotifyCredentials (PeerCredentials s) newtype NotifyCredentials s = NotifyCredentials (PeerCredentials s)
newtype GitIndexRepoName = GitIndexRepoName Text newtype GitIndexRepoName = GitIndexRepoName Text
deriving stock (Generic,Show) deriving stock (Data,Generic,Show)
deriving newtype (Serialise) deriving newtype (Serialise)
newtype GitIndexRepoBrief = GitIndexRepoBrief Text newtype GitIndexRepoBrief = GitIndexRepoBrief Text
deriving stock (Generic,Show) deriving stock (Data,Generic,Show)
deriving newtype (Serialise) deriving newtype (Serialise)
newtype GitIndexRepoManifest = GitIndexRepoManifest (Maybe Text) newtype GitIndexRepoManifest = GitIndexRepoManifest (Maybe Text)
@ -61,13 +61,13 @@ data GitIndexRepoDefineData =
{ gitIndexRepoName :: GitIndexRepoName { gitIndexRepoName :: GitIndexRepoName
, gitIndexRepoBrief :: GitIndexRepoBrief , gitIndexRepoBrief :: GitIndexRepoBrief
} }
deriving stock (Generic,Show) deriving stock (Data,Generic,Show)
data GitIndexEntry = data GitIndexEntry =
GitIndexRepoDefine GitIndexRepoDefineData GitIndexRepoDefine GitIndexRepoDefineData
| GitIndexRepoTombEntry | GitIndexRepoTombEntry
| GitIndexRepoLikes Integer | GitIndexRepoLikes Integer
deriving stock (Generic) deriving stock (Data,Generic)
data GitIndexTx s = data GitIndexTx s =
GitIndexTx GitIndexTx
@ -105,6 +105,9 @@ makeNotificationTx ncred lww lwsk forkInfo = do
makeSignedBox @s (view peerSignPk creds) (view peerSignSk creds) (LBS.toStrict $ serialise repoAnn) makeSignedBox @s (view peerSignPk creds) (view peerSignSk creds) (LBS.toStrict $ serialise repoAnn)
unpackNotificationTx :: forall s m . (ForGitIndex s, MonadError OperationError m) unpackNotificationTx :: forall s m . (ForGitIndex s, MonadError OperationError m)
=> SignedBox ByteString s => SignedBox ByteString s
-> m (GitRepoAnnounce s) -> m (GitRepoAnnounce s)

View File

@ -114,8 +114,6 @@ runDashBoardM cli m = do
-- FIXME: unix-socket-from-config -- FIXME: unix-socket-from-config
soname <- detectRPC `orDie` "hbs2-peer rpc not found" soname <- detectRPC `orDie` "hbs2-peer rpc not found"
env <- newDashBoardEnv conf dbFile
let errorPrefix = toStderr . logPrefix "[error] " let errorPrefix = toStderr . logPrefix "[error] "
let warnPrefix = toStderr . logPrefix "[warn] " let warnPrefix = toStderr . logPrefix "[warn] "
let noticePrefix = toStderr . logPrefix "" let noticePrefix = toStderr . logPrefix ""
@ -129,10 +127,6 @@ runDashBoardM cli m = do
flip runContT pure do flip runContT pure do
void $ ContT $ withAsync do
q <- withDashBoardEnv env $ asks _pipeline
forever do
liftIO (atomically $ readTQueue q) & liftIO . join
client <- liftIO $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) client <- liftIO $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
>>= orThrowUser ("can't connect to" <+> pretty soname) >>= orThrowUser ("can't connect to" <+> pretty soname)
@ -141,6 +135,7 @@ runDashBoardM cli m = do
peerAPI <- makeServiceCaller @PeerAPI (fromString soname) peerAPI <- makeServiceCaller @PeerAPI (fromString soname)
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
refChanAPI <- makeServiceCaller @RefChanAPI (fromString soname)
storageAPI <- makeServiceCaller @StorageAPI (fromString soname) storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
@ -148,12 +143,26 @@ runDashBoardM cli m = do
let endpoints = [ Endpoint @UNIX peerAPI let endpoints = [ Endpoint @UNIX peerAPI
, Endpoint @UNIX refLogAPI , Endpoint @UNIX refLogAPI
, Endpoint @UNIX refChanAPI
, Endpoint @UNIX lwwAPI , Endpoint @UNIX lwwAPI
, Endpoint @UNIX storageAPI , Endpoint @UNIX storageAPI
] ]
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
env <- newDashBoardEnv
conf
dbFile
peerAPI
refLogAPI
refChanAPI
lwwAPI
sto
void $ ContT $ withAsync do
q <- withDashBoardEnv env $ asks _pipeline
forever do
liftIO (atomically $ readTQueue q) & liftIO . join
lift $ withDashBoardEnv env (withState evolveDB >> m) lift $ withDashBoardEnv env (withState evolveDB >> m)
`finally` do `finally` do

View File

@ -1,35 +1,209 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ViewPatterns #-}
module HBS2.Git.DashBoard.State where module HBS2.Git.DashBoard.State where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Merkle
import HBS2.Data.Types.Refs
import HBS2.Base58
import HBS2.Clock
import HBS2.Net.Auth.Schema
import HBS2.Misc.PrettyStuff
import HBS2.Net.Proto.Service
import HBS2.Storage
import HBS2.Peer.Proto.LWWRef
import HBS2.Peer.Proto.RefChan.Types
import HBS2.Peer.Proto.RefChan.RefChanUpdate
import HBS2.Peer.RPC.API.RefChan
import HBS2.Git.DashBoard.Types import HBS2.Git.DashBoard.Types
import HBS2.System.Logger.Simple.ANSI import HBS2.System.Logger.Simple.ANSI
import Data.Config.Suckless import Data.Config.Suckless
import DBPipe.SQLite import DBPipe.SQLite hiding (insert)
import DBPipe.SQLite.Generic import DBPipe.SQLite.Generic as G
import Data.Maybe
import Data.Text qualified as Text
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Data.Coerce
import Streaming.Prelude qualified as S
type MyRefChan = RefChanId L4Proto
evolveDB :: MonadIO m => DBPipeM m () evolveDB :: MonadIO m => DBPipeM m ()
evolveDB = do evolveDB = do
ddl [qc| ddl [qc|
create table if not exists project create table if not exists repo
( lww text not null ( lww text not null
, primary key (lww) , primary key (lww)
) )
|] |]
ddl [qc|
create table if not exists brief
( lww text not null
, brief text not null
, primary key (lww)
)
|]
ddl [qc|
create table if not exists name
( lww text not null
, name text not null
, primary key (lww)
)
|]
ddl [qc|
create table if not exists processed
( hash text not null
, primary key (hash)
)
|]
pure () pure ()
updateIndex :: (MonadIO m, HasConf m, MonadReader DashBoardEnv m) => m () instance ToField HashRef where
toField x = toField $ show $ pretty x
instance Pretty (AsBase58 (PubKey 'Sign s)) => ToField (LWWRefKey s) where
toField x = toField $ show $ pretty (AsBase58 x)
newtype TxHash = TxHash HashRef
deriving newtype (ToField)
newtype RepoName = RepoName Text
deriving newtype (ToField)
newtype RepoBrief = RepoBrief Text
deriving newtype (ToField)
newtype RepoLww = RepoLww (LWWRefKey 'HBS2Basic)
deriving newtype (ToField)
data TxProcessedTable
data RepoTable
data RepoNameTable
data RepoBriefTable
instance HasTableName RepoTable where
tableName = "repo"
instance HasTableName RepoNameTable where
tableName = "name"
instance HasTableName RepoBriefTable where
tableName = "brief"
instance HasTableName TxProcessedTable where
tableName = "processed"
instance HasColumnName TxHash where
columnName = "hash"
instance HasColumnName RepoLww where
columnName = "lww"
instance HasColumnName RepoName where
columnName = "name"
instance HasColumnName RepoBrief where
columnName = "brief"
instance HasPrimaryKey TxProcessedTable where
primaryKey = [G.columnName @TxHash]
instance HasPrimaryKey RepoTable where
primaryKey = [G.columnName @RepoLww]
instance HasPrimaryKey RepoNameTable where
primaryKey = [G.columnName @RepoLww]
instance HasPrimaryKey RepoBriefTable where
primaryKey = [G.columnName @RepoLww]
pattern PRefChan :: MyRefChan -> Syntax C
pattern PRefChan s <- ListVal [ SymbolVal "refchan" , asRefChan -> Just s ]
asRefChan :: Syntax C -> Maybe MyRefChan
asRefChan = \case
LitStrVal s -> fromStringMay @MyRefChan (Text.unpack s)
_ -> Nothing
getIndexEntries :: (DashBoardPerks m, HasConf m, MonadReader DashBoardEnv m) => m [MyRefChan]
getIndexEntries = do
conf <- getConf
pure [ s | ListVal [ SymbolVal "index", PRefChan s] <- conf ]
updateIndex :: (DashBoardPerks m, HasConf m, MonadReader DashBoardEnv m) => m ()
updateIndex = do updateIndex = do
debug "updateIndex" debug "updateIndex"
pure ()
rchanAPI <- asks _refChanAPI
sto <- asks _sto
flip runContT pure do
es <- lift getIndexEntries
for_ es $ \rc -> do
callCC \next -> do
debug $ red (pretty (AsBase58 rc))
h <- lift (callRpcWaitMay @RpcRefChanGet (1 :: Timeout 'Seconds) rchanAPI rc)
<&> join
>>= maybe (next ()) pure
debug $ "rechan val" <+> red (pretty h)
txs <- S.toList_ do
walkMerkle @[HashRef] (coerce h) (getBlock sto) $ \case
Left{} -> pure ()
Right hs -> mapM_ S.yield hs
for_ txs $ \txh -> void $ runMaybeT do
done <- lift $ lift $ withState do
select @(Only Int)
[qc|select 1 from processed where hash = ? limit 1|]
(Only (TxHash txh)) <&> isJust . listToMaybe
guard (not done)
tx@GitIndexTx{..} <- getBlock sto (coerce txh)
>>= toMPlus
>>= readProposeTranMay @(GitIndexTx 'HBS2Basic) @L4Proto
>>= toMPlus
lift $ lift $ withState $ transactional do
let nm = [ RepoName n | GitIndexRepoName n <- universeBi gitIndexTxPayload ] & headMay
let bri = [ RepoBrief n | GitIndexRepoBrief n <- universeBi gitIndexTxPayload ] & headMay
insert @RepoTable $ onConflictIgnore @RepoTable (Only (RepoLww gitIndexTxRef))
-- FIXME: on-conflict-update!
for_ nm $ \n -> do
insert @RepoNameTable $ onConflictIgnore @RepoNameTable (RepoLww gitIndexTxRef, n)
for_ bri $ \n -> do
insert @RepoBriefTable $ onConflictIgnore @RepoBriefTable (RepoLww gitIndexTxRef, n)
lift $ withState $ transactional do
for_ txs $ \t -> do
insert @TxProcessedTable $ onConflictIgnore @TxProcessedTable (Only (TxHash t))

View File

@ -0,0 +1,107 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
module HBS2.Git.DashBoard.Types
( module HBS2.Git.DashBoard.Types
, module HBS2.Git.Data.Tx.Index
) where
import HBS2.Prelude.Plated
import HBS2.Git.Data.Tx.Index
import HBS2.Net.Proto.Service
import HBS2.Storage
import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.RPC.API.RefChan
import HBS2.Peer.RPC.API.LWWRef
import HBS2.Peer.RPC.API.Storage
import HBS2.Peer.RPC.Client.StorageClient
import HBS2.Net.Messaging.Unix
import Data.Config.Suckless
import DBPipe.SQLite
import Control.Monad.Reader
import UnliftIO
data HttpPortOpt
data DevelopAssetsOpt
instance HasConf m => HasCfgKey HttpPortOpt a m where
key = "port"
instance HasConf m => HasCfgKey DevelopAssetsOpt a m where
key = "develop-assets"
data RunDashBoardOpts = RunDashBoardOpts
{ configPath :: Maybe FilePath }
instance Monoid RunDashBoardOpts where
mempty = RunDashBoardOpts Nothing
instance Semigroup RunDashBoardOpts where
(<>) _ b = RunDashBoardOpts { configPath = configPath b }
data DashBoardEnv =
DashBoardEnv
{ _peerAPI :: ServiceCaller PeerAPI UNIX
, _refLogAPI :: ServiceCaller RefLogAPI UNIX
, _refChanAPI :: ServiceCaller RefChanAPI UNIX
, _lwwRefAPI :: ServiceCaller LWWRefAPI UNIX
, _sto :: AnyStorage
, _dashBoardConf :: TVar [Syntax C]
, _db :: DBPipeEnv
, _pipeline :: TQueue (IO ())
}
type DashBoardPerks m = MonadUnliftIO m
newtype DashBoardM m a = DashBoardM { fromDashBoardM :: ReaderT DashBoardEnv m a }
deriving newtype
( Applicative
, Functor
, Monad
, MonadIO
, MonadUnliftIO
, MonadTrans
, MonadReader DashBoardEnv
)
instance (MonadIO m, Monad m, MonadReader DashBoardEnv m) => HasConf m where
getConf = do
asks _dashBoardConf >>= readTVarIO
newDashBoardEnv :: MonadIO m
=> [Syntax C]
-> FilePath
-> ServiceCaller PeerAPI UNIX
-> ServiceCaller RefLogAPI UNIX
-> ServiceCaller RefChanAPI UNIX
-> ServiceCaller LWWRefAPI UNIX
-> AnyStorage
-> m DashBoardEnv
newDashBoardEnv cfg dbFile peer rlog rchan lww sto = do
DashBoardEnv peer rlog rchan lww sto
<$> newTVarIO cfg
<*> newDBPipeEnv dbPipeOptsDef dbFile
<*> newTQueueIO
withDashBoardEnv :: Monad m => DashBoardEnv -> DashBoardM m a -> m a
withDashBoardEnv env m = runReaderT (fromDashBoardM m) env
withState :: (MonadIO m, MonadReader DashBoardEnv m) => DBPipeM m a -> m a
withState f = do
asks _db >>= flip withDB f
addJob :: (DashBoardPerks m, MonadReader DashBoardEnv m) => IO () -> m ()
addJob f = do
q <- asks _pipeline
atomically $ writeTQueue q f

View File

@ -29,6 +29,7 @@ import Codec.Serialise
import Control.Monad.Identity import Control.Monad.Identity
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet) import Data.HashSet (HashSet)
@ -563,6 +564,33 @@ refChanRequestProto self adapter msg = do
lift $ emit RefChanRequestEventKey (RefChanRequestEvent @e chan val) lift $ emit RefChanRequestEventKey (RefChanRequestEvent @e chan val)
debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val
-- case s of
-- Accept{} -> pure ()
-- Propose _ box -> do
-- (_, ProposeTran _ pbox :: ProposeTran L4Proto) <- toMPlus $ unboxSignedBox0 box
-- (_, bs2) <- toMPlus $ unboxSignedBox0 pbox
-- liftIO $ BS.putStr bs2
readProposeTranMay :: forall p e s m . ( Monad m
, ForRefChans e
, Signatures (Encryption e)
, s ~ Encryption e
, Serialise p
)
=> LBS.ByteString
-> m (Maybe p)
readProposeTranMay lbs = runMaybeT do
updTx <- deserialiseOrFail @(RefChanUpdate e) lbs & toMPlus
box <- case updTx of
Accept{} -> mzero
Propose _ box -> pure box
(_, ProposeTran _ pbox :: ProposeTran e) <- toMPlus $ unboxSignedBox0 @_ @s box
(_, bs2) <- toMPlus $ unboxSignedBox0 pbox
deserialiseOrFail @p (LBS.fromStrict bs2) & toMPlus
makeProposeTran :: forall e s m . ( MonadIO m makeProposeTran :: forall e s m . ( MonadIO m
, ForRefChans e , ForRefChans e

View File

@ -92,6 +92,19 @@ runWithAsync = do
void $ waitAnyCatchCancel [t1,q,pysh] void $ waitAnyCatchCancel [t1,q,pysh]
testCont :: IO ()
testCont = do
flip runContT pure do
for_ [1..10] $ \i -> do
callCC \next -> do
when (even i) do
next ()
liftIO $ print i
main :: IO () main :: IO ()
main = do main = do
print "1" print "1"