This commit is contained in:
Dmitry Zuikov 2024-04-19 08:53:36 +03:00
parent aa269770fb
commit 4f22180ab6
12 changed files with 317 additions and 9 deletions

View File

@ -169,7 +169,7 @@ pShowRef = do
tx <- withState do
selectMaxAppliedTx >>= lift . toMPlus <&> fst
rh <- TX.readRepoHeadFromTx sto tx >>= toMPlus
(_,rh) <- TX.readRepoHeadFromTx sto tx >>= toMPlus
liftIO $ print $ vcat (fmap formatRef (_repoHeadRefs rh))
@ -229,8 +229,8 @@ pKeyShow = do
tx <- withState do
selectMaxAppliedTx >>= lift . toMPlus <&> fst
rh <- TX.readRepoHeadFromTx sto tx
>>= toMPlus
(_,rh) <- TX.readRepoHeadFromTx sto tx
>>= toMPlus
gkh <- toMPlus (_repoHeadGK0 rh)

View File

@ -177,7 +177,7 @@ main = do
r' <- runMaybeT $ withState do
tx <- selectMaxAppliedTx >>= lift . toMPlus <&> fst
rh <- TX.readRepoHeadFromTx sto tx >>= lift . toMPlus
(_,rh) <- TX.readRepoHeadFromTx sto tx >>= lift . toMPlus
pure (_repoHeadRefs rh)
let r = fromMaybe mempty r'

View File

@ -191,7 +191,9 @@ export key refs = do
tx0 <- getLastAppliedTx
rh0 <- runMaybeT ( toMPlus tx0 >>= readRepoHeadFromTx sto >>= toMPlus )
rh <- runMaybeT ( toMPlus tx0 >>= readRepoHeadFromTx sto >>= toMPlus )
let rh0 = snd <$> rh
(name,brief,mf) <- lift getManifest

View File

@ -197,10 +197,11 @@ readTx sto href = do
pure (n, rhh, rh, blkh)
readRepoHeadFromTx :: MonadIO m
=> AnyStorage
-> HashRef
-> m (Maybe RepoHead)
-> m (Maybe (HashRef, RepoHead))
readRepoHeadFromTx sto href = runMaybeT do
@ -214,6 +215,7 @@ readRepoHeadFromTx sto href = runMaybeT do
>>= toMPlus
<&> deserialiseOrFail @RepoHead
>>= toMPlus
<&> (rhh,)
data BundleMeta =

View File

@ -46,7 +46,9 @@ configParser = do
))
cmd <- subparser
( command "web" (O.info pRunWeb (progDesc "Run the web interface")) )
( command "web" (O.info pRunWeb (progDesc "Run the web interface"))
<> command "index" (O.info pRunIndex (progDesc "update index"))
)
pure $ cmd opts
@ -54,6 +56,9 @@ configParser = do
pRunWeb :: DashBoardPerks m => Parser (RunDashBoardOpts -> m ())
pRunWeb = pure $ \x -> runDashBoardM x runScotty
pRunIndex :: DashBoardPerks m => Parser (RunDashBoardOpts -> m ())
pRunIndex = pure $ \x -> runDashBoardM x do
updateIndex
{- HLINT ignore "Eta reduce" -}
{- HLINT ignore "Functor law" -}

View File

@ -14,13 +14,15 @@ module HBS2.Git.DashBoard.Prelude
, module Coerce
, module TransCont
, module TransMaybe
, module Lens.Micro.Platform
, module UnliftIO
, module Codec.Serialise
, qc, q
) where
import HBS2.Data.Types.Refs
import HBS2.Base58
import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Service hiding (encode,decode)
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.Merkle
@ -34,6 +36,7 @@ import HBS2.Peer.RPC.API.RefLog as API
import HBS2.Peer.RPC.API.Peer as API
import HBS2.Peer.RPC.API.LWWRef as API
import HBS2.Peer.Proto.RefLog as API
import HBS2.Peer.Proto.LWWRef as API
import HBS2.Peer.Proto.RefChan.Types as API
import HBS2.Peer.Proto.RefChan.RefChanUpdate as API
@ -49,5 +52,9 @@ import Data.Coerce as Coerce
import Control.Monad.Trans.Cont as TransCont
import Control.Monad.Trans.Maybe as TransMaybe
import Lens.Micro.Platform hiding (at)
import UnliftIO
import Codec.Serialise

View File

@ -6,21 +6,25 @@
module HBS2.Git.DashBoard.State
( module HBS2.Git.DashBoard.State
, Only(..)
, transactional
) where
import HBS2.Git.DashBoard.Prelude
import HBS2.Git.DashBoard.Types
import HBS2.Git.Data.Tx.Git
import DBPipe.SQLite hiding (insert)
import DBPipe.SQLite.Generic as G
import Lucid.Base
import Data.Text qualified as Text
import Data.Word
type MyRefChan = RefChanId L4Proto
evolveDB :: MonadIO m => DBPipeM m ()
evolveDB :: DashBoardPerks m => DBPipeM m ()
evolveDB = do
ddl [qc|
@ -55,6 +59,10 @@ evolveDB = do
)
|]
createRepoHeadTable
createRepoListView
ddl [qc|
create table if not exists processed
( hash text not null
@ -93,6 +101,21 @@ newtype RepoLww = RepoLww (LWWRefKey 'HBS2Basic)
newtype RepoChannel = RepoChannel MyRefChan
newtype RepoHeadRef = RepoHeadRef HashRef
deriving stock (Generic)
deriving newtype (ToField)
newtype RepoHeadSeq = RepoHeadSeq Word64
deriving stock (Generic)
deriving newtype (ToField)
newtype RepoHeadGK0 = RepoHeadGK0 (Maybe HashRef)
deriving stock (Generic)
deriving newtype (ToField)
instance ToField RepoChannel where
toField (RepoChannel x) = toField $ show $ pretty (AsBase58 x)
@ -185,6 +208,95 @@ selectRepoList = withState do
createRepoListView :: DashBoardPerks m => DBPipeM m ()
createRepoListView = do
ddl [qc|
drop view if exists repolistview
|]
ddl [qc|
create view repolistview as
with repolist as (
select
r.lww,
0 as seq,
coalesce(n.name, r.lww) as name,
coalesce(b.brief, '') as brief
from repo r
left join name n on r.lww = n.lww
left join brief b on r.lww = b.lww
union
select
lww,
seq,
name,
brief
from repohead
),
ranked_repos as (
select
lww,
seq,
name,
brief,
row_number() over (partition by lww order by seq desc) as rn
from repolist
)
select lww, seq, name, brief
from ranked_repos
where rn = 1;
|]
createRepoHeadTable :: DashBoardPerks m => DBPipeM m ()
createRepoHeadTable = do
ddl [qc|
create table if not exists repohead
( lww text not null
, repohead text not null
, seq integer not null
, gk0 text null
, name text
, brief text
, primary key (lww,repohead)
)
|]
data RepoHeadTable
instance HasTableName RepoHeadTable where
tableName = "repohead"
instance HasPrimaryKey RepoHeadTable where
primaryKey = ["lww", "repohead"]
instance HasColumnName RepoHeadRef where
columnName = "repohead"
instance HasColumnName RepoHeadSeq where
columnName = "seq"
instance HasColumnName RepoHeadGK0 where
columnName = "gk0"
insertRepoHead :: (DashBoardPerks m, MonadReader DashBoardEnv m)
=> LWWRefKey 'HBS2Basic
-> HashRef
-> RepoHead
-> DBPipeM m ()
insertRepoHead lww href rh = do
insert @RepoHeadTable $ onConflictIgnore @RepoHeadTable
( RepoLww lww
, RepoHeadRef href
, RepoHeadSeq (_repoHeadTime rh)
, RepoHeadGK0 (_repoHeadGK0 rh)
, RepoName (_repoHeadName rh)
, RepoBrief (_repoHeadBrief rh)
)
pure ()

View File

@ -2,7 +2,152 @@ module HBS2.Git.DashBoard.State.Index.Peer where
import HBS2.Git.DashBoard.Prelude
import HBS2.Git.DashBoard.Types
import HBS2.Git.DashBoard.State
import HBS2.Git.Data.LWWBlock
import HBS2.Git.Data.Tx.Git
import Streaming.Prelude qualified as S
{- HLINT ignore "Functor law" -}
updateIndexFromPeer :: (DashBoardPerks m, HasConf m, MonadReader DashBoardEnv m) => m ()
updateIndexFromPeer = do
debug "updateIndexFromPeer"
peer <- asks _peerAPI
reflog <- asks _refLogAPI
sto <- asks _sto
polls <- callRpcWaitMay @RpcPollList2 (TimeoutSec 1) peer (Just "lwwref", Nothing)
<&> join . maybeToList
<&> fmap (LWWRefKey @HBS2Basic . view _1)
repos <- S.toList_ $ forM_ polls $ \r -> void $ runMaybeT do
(lw,blk) <- readLWWBlock sto r >>= toMPlus
let rk = lwwRefLogPubKey blk
lift $ S.yield (r,RefLogKey @'HBS2Basic rk,blk)
for_ repos $ \(lw,rk,LWWBlockData{..}) -> do
mhead <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflog (coerce rk)
<&> join
for_ mhead $ \mh -> do
txs <- S.toList_ $ do
walkMerkle @[HashRef] (fromHashRef mh) (getBlock sto) $ \case
Left{} -> do
pure ()
Right hxs -> do
for_ hxs $ \htx -> void $ runMaybeT do
-- done <- liftIO $ withDB db (isTxProcessed (HashVal htx))
-- done1 <- liftIO $ withDB db (isTxProcessed (processedRepoTx (gitLwwRef,htx)))
-- guard (not done && not done1)
getBlock sto (fromHashRef htx) >>= toMPlus
<&> deserialiseOrFail @(RefLogUpdate L4Proto)
>>= toMPlus
>>= unpackTx
>>= \(n,h,blk) -> lift (S.yield (n,htx,blk))
headz <- S.toList_ do
for_ txs $ \(n,tx,blk) -> void $ runMaybeT do
(rhh, rhead) <- readRepoHeadFromTx sto tx >>= toMPlus
debug $ yellow "found repo head" <+> pretty rhh <+> pretty "for" <+> pretty lw
lift $ S.yield (lw, rhh, rhead)
withState $ transactional do
for_ headz $ \(l, rh, rhead) -> do
insertRepoHead l rh rhead
-- db <- asks _db
-- facts <- S.toList_ do
-- for_ repos $ \(lw,rk,LWWBlockData{..}) -> do
-- mhead <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflog (coerce rk)
-- <&> join
-- for_ mhead $ \mh -> do
-- txs <- S.toList_ $ do
-- walkMerkle @[HashRef] (fromHashRef mh) (getBlock sto) $ \case
-- Left{} -> do
-- pure ()
-- Right hxs -> do
-- for_ hxs $ \htx -> void $ runMaybeT do
-- -- done <- liftIO $ withDB db (isTxProcessed (HashVal htx))
-- -- done1 <- liftIO $ withDB db (isTxProcessed (processedRepoTx (gitLwwRef,htx)))
-- -- guard (not done && not done1)
-- getBlock sto (fromHashRef htx) >>= toMPlus
-- <&> deserialiseOrFail @(RefLogUpdate L4Proto)
-- >>= toMPlus
-- >>= unpackTx
-- >>= \(n,h,blk) -> lift (S.yield (n,htx,blk))
-- relAlready <- lift $ withDB db do
-- -- FIXME: uncomment-for-speedup
-- done <- isGitRepoBundleProcessed mh >> pure False
-- unless done do
-- transactional do
-- for_ txs $ \(n,_,bu) -> do
-- refs <- fromRight mempty <$> readBundleRefs sto bu
-- for_ refs $ \r -> do
-- debug $ red "bundle-fact" <+> pretty lw <+> pretty r
-- insertRepoBundleFact (GitRepoBundle (GitLwwRef lw) (GitBundle r))
-- insertGitRepoBundleProcessed mh
-- pure done
-- -- let tx' = maximumByMay (comparing (view _1)) txs
-- for_ txs $ \(n,tx,blk) -> void $ runMaybeT do
-- liftIO $ withDB db do
-- transactional do
-- for_ [ t | (i,t,_) <- txs, i < n ] $ \tran -> do
-- insertTxProcessed (HashVal tran)
-- (rhh,RepoHeadSimple{..}) <- readRepoHeadFromTx sto tx
-- >>= toMPlus
-- let name = Text.take 256 $ _repoHeadName
-- let brief = Text.take 1024 $ _repoHeadBrief
-- let manifest = _repoManifest
-- lift $ S.yield $ GitRepoFacts
-- (GitLwwRef lw)
-- (GitLwwSeq lwwRefSeed)
-- (GitRefLog rk)
-- (GitTx tx)
-- (GitRepoHeadRef rhh)
-- (GitRepoHeadSeq (fromIntegral n))
-- (GitName (Just name))
-- (GitBrief (Just brief))
-- (GitEncrypted _repoHeadGK0)
-- [GitRepoExtendedManifest (GitManifest manifest)]
-- -- yield repo relation facts by common bundles
-- unless relAlready do
-- what <- withDB db do
-- select_ @_ @(GitLwwRef, GitLwwRef) [qc|
-- select distinct
-- b1.lwwref
-- , b2.lwwref
-- from gitrepobundle b1 join gitrepobundle b2 on b1.bundle = b2.bundle
-- where b1.lwwref <> b2.lwwref
-- |]
-- let r = HM.fromListWith (<>) [ (a, HS.singleton b) | (a,b) <- what ]
-- & HM.toList
-- for_ r $ \(lww, rel) -> do
-- lift $ S.yield $ GitRepoRelatedFact lww rel
-- liftIO $ withDB db (insertTxProcessed (HashVal tx))

View File

@ -256,6 +256,22 @@ instance ( Hashable (Peer e)
where
postprocess = mapMaybe (\(r,t,i) -> (,t,i) <$> fromStringMay r )
listPolledRefsFiltered brains (t, p) = liftIO do
debug $ red "brains: listPolledRefsFiltered" <+> pretty (t,p)
let conn = view brainsDb brains
let sql = [qc|
select ref, type, interval
from {poll_table}
where coalesce(type = ?, true)
limit ?
offset ?
|]
query conn sql (t, lim, off ) <&> postprocess
where
postprocess = mapMaybe (\(r,t1,i) -> (,t1,i) <$> fromStringMay r )
off = maybe 0 fst p
lim = maybe 1000 snd p
isPolledRef brains tp ref = do
cached <- liftIO $ readTVarIO (_brainsPolled brains) <&> HashSet.member (ref,tp)

View File

@ -18,6 +18,12 @@ instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcP
debug $ "rpc.pollList"
listPolledRefs @L4Proto brains Nothing
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollList2 where
handleMethod filt = do
brains <- getRpcContext @PeerAPI <&> rpcBrains
debug $ "rpc.pollList2" <+> pretty filt
listPolledRefsFiltered @L4Proto brains filt
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcPollAdd where

View File

@ -18,6 +18,13 @@ class HasBrains e a where
listPolledRefs :: MonadIO m => a -> Maybe String -> m [(PubKey 'Sign (Encryption e), String, Int)]
listPolledRefs _ _ = pure mempty
listPolledRefsFiltered :: MonadIO m
=> a
-> (Maybe String, Maybe (Int, Int))
-> m [(PubKey 'Sign (Encryption e), String, Int)]
listPolledRefsFiltered _ _ = pure mempty
isPolledRef :: MonadIO m => a -> String -> PubKey 'Sign (Encryption e) -> m Bool
isPolledRef _ _ _ = pure False
@ -159,6 +166,7 @@ data SomeBrains e = forall a . HasBrains e a => SomeBrains a
instance HasBrains e (SomeBrains e) where
listPolledRefs (SomeBrains a) = listPolledRefs @e a
listPolledRefsFiltered (SomeBrains a) = listPolledRefsFiltered @e a
isPolledRef (SomeBrains a) = isPolledRef @e a
delPolledRef (SomeBrains a) = delPolledRef @e a
addPolledRef (SomeBrains a) = addPolledRef @e a

View File

@ -25,6 +25,7 @@ data RpcLogLevel
data RpcDie
data RpcPollList
data RpcPollList2
data RpcPollAdd
data RpcPollDel
@ -50,6 +51,7 @@ type PeerAPI = '[ RpcPoke
, RpcDownloadDel
, RpcByPassInfo
, RpcPerformGC
, RpcPollList2
]
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
@ -87,6 +89,9 @@ type instance Output RpcFetch = ()
type instance Input RpcPollList= ()
type instance Output RpcPollList = [(PubKey 'Sign 'HBS2Basic, String, Int)]
type instance Input RpcPollList2 = (Maybe String, Maybe (Int,Int))
type instance Output RpcPollList2 = [(PubKey 'Sign 'HBS2Basic, String, Int)]
type instance Input RpcDownloadList = ()
type instance Output RpcDownloadList = [(HashRef, Integer)]