This commit is contained in:
Dmitry Zuikov 2024-09-10 13:34:12 +03:00
parent 7ae26ef108
commit 44c0da9d03
3 changed files with 101 additions and 14 deletions

View File

@ -423,6 +423,9 @@ runTop forms = do
entry $ bindMatch "fixme:refchan:export" $ nil_ $ const do entry $ bindMatch "fixme:refchan:export" $ nil_ $ const do
void $ lift $ refchanExport void $ lift $ refchanExport
entry $ bindMatch "fixme:refchan:import" $ nil_ $ const do
void $ lift $ refchanImport
entry $ bindMatch "git:blobs" $ \_ -> do entry $ bindMatch "git:blobs" $ \_ -> do
blobs <- lift (listBlobs Nothing) blobs <- lift (listBlobs Nothing)

View File

@ -19,6 +19,8 @@ import HBS2.Merkle
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Storage import HBS2.Storage
import HBS2.Storage.Compact import HBS2.Storage.Compact
import HBS2.Peer.Proto.RefChan
import HBS2.Peer.RPC.Client.RefChan
import HBS2.Storage.Operations.ByteString import HBS2.Storage.Operations.ByteString
import HBS2.System.Dir import HBS2.System.Dir
import HBS2.Net.Auth.Credentials import HBS2.Net.Auth.Credentials
@ -57,8 +59,8 @@ import Lens.Micro.Platform
import System.Process.Typed import System.Process.Typed
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import System.IO.Temp as Temp import Control.Monad.Except
import System.IO qualified as IO import Control.Concurrent.STM (flushTQueue)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import System.Directory (getModificationTime) import System.Directory (getModificationTime)
@ -335,18 +337,6 @@ cat_ hash = do
liftIO $ action dict fallback liftIO $ action dict fallback
data FixmeExported =
FixmeExported
{ exportedKey :: FixmeKey
, exportedWeight :: Word64
, exportedName :: FixmeAttrName
, exportedValue :: FixmeAttrVal
}
deriving stock Generic
instance FromRow FixmeExported
instance ToRow FixmeExported
instance Serialise FixmeExported
refchanExport :: FixmePerks m => FixmeM m () refchanExport :: FixmePerks m => FixmeM m ()
refchanExport = do refchanExport = do
@ -382,6 +372,8 @@ refchanExport = do
let chu = chunksOf 10000 what let chu = chunksOf 10000 what
for_ chu $ \x -> do for_ chu $ \x -> do
-- FIXME: encrypt-tree
h <- writeAsMerkle sto (serialise x) h <- writeAsMerkle sto (serialise x)
let tx = AnnotatedHashRef Nothing (HashRef h) let tx = AnnotatedHashRef Nothing (HashRef h)
@ -399,3 +391,57 @@ refchanExport = do
when (isNothing r) do when (isNothing r) do
err $ red "hbs2-peer rpc calling timeout" err $ red "hbs2-peer rpc calling timeout"
refchanImport :: FixmePerks m => FixmeM m ()
refchanImport = do
sto <- getStorage
rchanAPI <- getClientAPI @RefChanAPI @UNIX
chan <- asks fixmeEnvRefChan
>>= readTVarIO
>>= orThrowUser "refchan not set"
ttsmap <- newTVarIO HM.empty
tq <- newTQueueIO
walkRefChanTx @UNIX (const $ pure True) chan $ \txh u -> do
case u of
A (AcceptTran (Just ts) _ what) -> do
debug $ red "ACCEPT" <+> pretty ts <+> pretty what
atomically $ modifyTVar ttsmap (HM.insertWith max what (coerce @_ @Word64 ts))
A _ -> none
P orig (ProposeTran _ box) -> void $ runMaybeT do
(_, bs) <- unboxSignedBox0 box & toMPlus
AnnotatedHashRef _ href <- deserialiseOrFail @AnnotatedHashRef (LBS.fromStrict bs)
& toMPlus . either (const Nothing) Just
-- FIXME: decrypt-tree
what <- runExceptT (readFromMerkle sto (SimpleKey (coerce href)))
<&> either (const Nothing) Just
>>= toMPlus
exported <- deserialiseOrFail @[FixmeExported] what
& toMPlus
for_ exported $ \exported -> do
atomically $ writeTQueue tq (orig, exported)
imported <- atomically $ flushTQueue tq
withState $ transactional do
for_ imported $ \(h, i) -> do
w <- readTVarIO ttsmap <&> fromMaybe (exportedWeight i) . HM.lookup h
let item = i { exportedWeight = w }
notice $ "import" <+> pretty (exportedKey item) <+> pretty (exportedWeight item)
insertFixmeExported item

View File

@ -6,11 +6,13 @@ module Fixme.State
, cleanupDatabase , cleanupDatabase
, listFixme , listFixme
, insertFixme , insertFixme
, insertFixmeExported
, modifyFixme , modifyFixme
, insertScanned , insertScanned
, selectIsAlreadyScanned , selectIsAlreadyScanned
, selectFixmeKey , selectFixmeKey
, getFixme , getFixme
, FixmeExported(..)
, HasPredicate(..) , HasPredicate(..)
, SelectPredicate(..) , SelectPredicate(..)
) where ) where
@ -373,3 +375,39 @@ insertFixme fme = do
lift $ insert sql (o,w,"fixme-text",txt) lift $ insert sql (o,w,"fixme-text",txt)
data FixmeExported =
FixmeExported
{ exportedKey :: FixmeKey
, exportedWeight :: Word64
, exportedName :: FixmeAttrName
, exportedValue :: FixmeAttrVal
}
deriving stock Generic
instance FromRow FixmeExported
instance ToRow FixmeExported
instance Serialise FixmeExported
insertFixmeExported :: FixmePerks m => FixmeExported -> DBPipeM m ()
insertFixmeExported item = do
let sql = [qc|
insert into object (o, w, k, v)
values (?, ?, ?, ?)
on conflict (o, k)
do update set
v = case
when excluded.w > object.w and (excluded.v <> object.v) then excluded.v
else object.v
end,
w = case
when excluded.w > object.w and (excluded.v <> object.v) then excluded.w
else object.w
end
|]
insert sql item