mirror of https://github.com/voidlizard/hbs2
wip, import as packs
This commit is contained in:
parent
5dc82c5a81
commit
339a7dce1d
|
@ -18,13 +18,10 @@ import HBS2.Peer.RPC.API.LWWRef
|
|||
import HBS2.Peer.RPC.API.Storage
|
||||
import HBS2.Peer.RPC.Client.StorageClient
|
||||
|
||||
import HBS2.CLI.Run.Internal.Merkle (getTreeContents)
|
||||
|
||||
-- move to Data.Config.Suckless.Script.Filea sepatate library
|
||||
import HBS2.Data.Log.Structured
|
||||
|
||||
|
||||
import HBS2.CLI.Run.Internal.Merkle (createTreeWithMetadata)
|
||||
import HBS2.CLI.Run.Internal.Merkle (getTreeContents)
|
||||
import HBS2.CLI.Run.RefLog (getCredentialsForReflog,mkRefLogUpdateFrom)
|
||||
|
||||
import HBS2.System.Dir
|
||||
|
@ -33,6 +30,8 @@ import HBS2.Git3.Types
|
|||
import HBS2.Git3.Config.Local
|
||||
import HBS2.Git3.Git
|
||||
import HBS2.Git3.Export
|
||||
import HBS2.Git3.Import
|
||||
import HBS2.Git3.State.RefLog
|
||||
|
||||
import Data.Config.Suckless.Script
|
||||
import Data.Config.Suckless.Script.File
|
||||
|
@ -929,50 +928,56 @@ theDict = do
|
|||
let cpOnly = or [ True | ListVal [StringLike "--checkpoints"] <- opts ]
|
||||
let sOnly = or [ True | ListVal [StringLike "--segments"] <- opts ]
|
||||
|
||||
sto <- getStorage
|
||||
hxs <- txListAll Nothing
|
||||
|
||||
refLogAPI <- getClientAPI @RefLogAPI @UNIX
|
||||
reflog <- getGitRemoteKey >>= orThrowUser "reflog not set"
|
||||
|
||||
rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) refLogAPI reflog
|
||||
>>= orThrowUser "rpc timeout"
|
||||
>>= orThrowUser "reflog is empty"
|
||||
<&> coerce
|
||||
|
||||
hxs <- S.toList_ $ walkMerkle @[HashRef] rv (getBlock sto) $ \case
|
||||
Left{} -> throwIO MissedBlockError
|
||||
Right hs -> S.each hs
|
||||
|
||||
liftIO $ forM_ hxs $ \h -> do
|
||||
|
||||
decoded <- readTxMay sto h
|
||||
<&> \case
|
||||
Just (TxSegment x) | not cpOnly ->
|
||||
liftIO $ forM_ hxs $ \(h,tx) -> do
|
||||
let decoded = case tx of
|
||||
TxSegment x | not cpOnly ->
|
||||
Just ("S" <+> fill 44 (pretty h) <+> fill 44 (pretty x))
|
||||
|
||||
Just (TxCheckpoint n x) | not sOnly ->
|
||||
TxCheckpoint n x | not sOnly ->
|
||||
Just ("C" <+> fill 44 (pretty h) <+> pretty x <+> fill 8 (pretty n))
|
||||
|
||||
_ -> Nothing
|
||||
|
||||
forM_ decoded print
|
||||
|
||||
entry $ bindMatch "test:git:import" $ nil_ $ \syn -> lift $ connectedDo do
|
||||
|
||||
entry $ bindMatch "reflog:import:pack" $ nil_ $ \syn -> lift $ connectedDo do
|
||||
updateReflogIndex
|
||||
|
||||
refLogAPI <- getClientAPI @RefLogAPI @UNIX
|
||||
reflog <- getGitRemoteKey >>= orThrowUser "reflog not set"
|
||||
packs <- findGitDir
|
||||
>>= orThrowUser "git directory not found"
|
||||
<&> (</> "objects/pack")
|
||||
|
||||
rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) refLogAPI reflog
|
||||
>>= orThrowUser "rpc timeout"
|
||||
>>= orThrowUser "reflog is empty"
|
||||
<&> coerce @_ @HashRef
|
||||
state <- getStatePathM
|
||||
|
||||
notice $ "test:git:import" <+> pretty (AsBase58 reflog) <+> pretty rv
|
||||
let imported = state </> "imported"
|
||||
|
||||
sto <- getStorage
|
||||
none
|
||||
prev <- runMaybeT do
|
||||
f <- liftIO (try @_ @IOError (readFile imported)) >>= toMPlus
|
||||
toMPlus (fromStringMay @HashRef f)
|
||||
|
||||
excl <- maybe1 prev (pure mempty) $ \p -> do
|
||||
txListAll (Just p) <&> HS.fromList . fmap fst
|
||||
|
||||
rv <- refLogRef
|
||||
|
||||
hxs <- txListAll rv <&> filter (not . flip HS.member excl . fst)
|
||||
|
||||
forConcurrently_ hxs $ \case
|
||||
(_, TxCheckpoint{}) -> none
|
||||
(h, TxSegment tree) -> do
|
||||
s <- writeAsGitPack packs tree
|
||||
|
||||
for_ s $ \file -> do
|
||||
gitRunCommand [qc|git index-pack {file}|]
|
||||
>>= orThrowPassIO
|
||||
|
||||
notice $ "imported" <+> pretty h
|
||||
|
||||
for_ rv $ \r -> do
|
||||
liftIO $ UIO.withBinaryFileAtomic imported WriteMode $ \fh -> do
|
||||
IO.hPutStr fh (show $ pretty r)
|
||||
|
||||
exportEntries "reflog:"
|
||||
|
||||
|
|
|
@ -124,8 +124,11 @@ library
|
|||
HBS2.Git3.Types
|
||||
HBS2.Git3.Prelude
|
||||
HBS2.Git3.Export
|
||||
HBS2.Git3.Import
|
||||
HBS2.Git3.State.Types
|
||||
HBS2.Git3.State.RefLog
|
||||
HBS2.Git3.State.Index
|
||||
HBS2.Git3.State.Segment
|
||||
HBS2.Git3.Config.Local
|
||||
HBS2.Git3.Git
|
||||
HBS2.Git3.Git.Pack
|
||||
|
|
|
@ -84,3 +84,5 @@ decodeObjectSize source = run $ flip fix (source,0,0,0) $ \next (bs, i, tp, num)
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
{-# Language UndecidableInstances #-}
|
||||
{-# Language AllowAmbiguousTypes #-}
|
||||
module HBS2.Git3.Import where
|
||||
|
||||
import HBS2.Git3.Prelude
|
||||
import HBS2.Git3.State.Index
|
||||
import HBS2.Git3.Git
|
||||
import HBS2.Git3.Git.Pack
|
||||
import HBS2.CLI.Run.Internal.Merkle (getTreeContents)
|
||||
import HBS2.Git3.State.Segment
|
||||
|
||||
import HBS2.Data.Log.Structured
|
||||
|
||||
import HBS2.System.Dir
|
||||
|
||||
import Codec.Compression.Zlib qualified as Zlib
|
||||
import Data.ByteString.Lazy.Char8 qualified as LBS8
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.HashSet (HashSet)
|
||||
import Data.HashSet qualified as HS
|
||||
import System.IO.Temp as Temp
|
||||
import UnliftIO.IO.File qualified as UIO
|
||||
import Network.ByteOrder qualified as N
|
||||
|
||||
data ImportException =
|
||||
ImportInvalidSegment HashRef
|
||||
deriving stock (Show,Typeable)
|
||||
|
||||
instance Exception ImportException
|
||||
|
||||
writeAsGitPack :: forall m . (HBS2GitPerks m, HasStorage m)
|
||||
=> FilePath
|
||||
-> HashRef
|
||||
-> m (Maybe FilePath)
|
||||
|
||||
writeAsGitPack dir href = do
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
file <- liftIO $ Temp.emptyTempFile dir (show (pretty href) <> ".pack")
|
||||
|
||||
no_ <- newTVarIO 0
|
||||
|
||||
liftIO $ UIO.withBinaryFileAtomic file ReadWriteMode $ \fh -> flip runContT pure do
|
||||
|
||||
let header = BS.concat [ "PACK", N.bytestring32 2, N.bytestring32 0 ]
|
||||
|
||||
liftIO $ BS.hPutStr fh header
|
||||
|
||||
seen_ <- newTVarIO (mempty :: HashSet GitHash)
|
||||
|
||||
source <- liftIO (runExceptT (getTreeContents sto href))
|
||||
>>= orThrow MissedBlockError
|
||||
|
||||
lbs' <- decompressSegmentLBS source
|
||||
|
||||
lbs <- ContT $ maybe1 lbs' none
|
||||
|
||||
runConsumeLBS lbs $ readLogFileLBS () $ \h s obs -> do
|
||||
seen <- readTVarIO seen_ <&> HS.member h
|
||||
unless seen do
|
||||
|
||||
let (t, body) = LBS.splitAt 1 obs
|
||||
|
||||
let tp = fromStringMay @(Short GitObjectType) (LBS8.unpack t)
|
||||
& maybe Blob coerce
|
||||
|
||||
let params = Zlib.defaultCompressParams { Zlib.compressMethod = Zlib.deflateMethod }
|
||||
|
||||
let packed = Zlib.compressWith params body
|
||||
|
||||
let preamble = encodeObjectSize (gitPackTypeOf tp) (fromIntegral $ LBS.length body)
|
||||
|
||||
liftIO do
|
||||
atomically $ modifyTVar seen_ (HS.insert h)
|
||||
BS.hPutStr fh preamble
|
||||
LBS.hPutStr fh packed
|
||||
|
||||
atomically $ modifyTVar no_ succ
|
||||
|
||||
no <- readTVarIO no_
|
||||
hSeek fh AbsoluteSeek 8
|
||||
liftIO $ BS.hPutStr fh (N.bytestring32 no)
|
||||
hFlush fh
|
||||
|
||||
sz <- hFileSize fh
|
||||
hSeek fh AbsoluteSeek 0
|
||||
|
||||
sha <- liftIO $ LBS.hGetNonBlocking fh (fromIntegral sz) <&> sha1lazy
|
||||
|
||||
hSeek fh SeekFromEnd 0
|
||||
|
||||
liftIO $ BS.hPutStr fh sha
|
||||
|
||||
no <- readTVarIO no_
|
||||
|
||||
if no > 0 then do
|
||||
pure $ Just file
|
||||
else do
|
||||
rm file
|
||||
pure Nothing
|
||||
|
|
@ -53,6 +53,11 @@ import System.IO.MMap as Exported
|
|||
import GHC.Natural as Exported
|
||||
import UnliftIO as Exported
|
||||
|
||||
data RefLogNotSetException =
|
||||
RefLogNotSetException
|
||||
deriving stock (Show,Typeable)
|
||||
|
||||
instance Exception RefLogNotSetException
|
||||
|
||||
defSegmentSize :: Int
|
||||
defSegmentSize = 50 * 1024 * 1024
|
||||
|
@ -210,3 +215,8 @@ runGit3 :: Git3Perks m => Git3Env -> Git3 m b -> m b
|
|||
runGit3 env action = withGit3Env env action
|
||||
|
||||
|
||||
getStatePathM :: forall m . (HBS2GitPerks m, HasGitRemoteKey m) => m FilePath
|
||||
getStatePathM = do
|
||||
k <- getGitRemoteKey >>= orThrow RefLogNotSetException
|
||||
getStatePath (AsBase58 k)
|
||||
|
||||
|
|
|
@ -4,6 +4,8 @@ import HBS2.Git3.Prelude
|
|||
import HBS2.System.Dir
|
||||
import HBS2.CLI.Run.Internal.Merkle (getTreeContents)
|
||||
import HBS2.Git3.State.Types
|
||||
import HBS2.Git3.State.Segment
|
||||
import HBS2.Git3.State.RefLog
|
||||
import HBS2.Git3.Git
|
||||
|
||||
import HBS2.Data.Log.Structured
|
||||
|
@ -268,33 +270,6 @@ bloomFilterSize n k p
|
|||
where
|
||||
rnd x = 2 ** realToFrac (ceiling (logBase 2 x)) & round
|
||||
|
||||
data GitTx =
|
||||
TxSegment HashRef
|
||||
| TxCheckpoint Natural HashRef
|
||||
|
||||
readTxMay :: forall m . ( MonadIO m
|
||||
)
|
||||
=> AnyStorage -> HashRef -> m (Maybe GitTx)
|
||||
|
||||
readTxMay sto href = runMaybeT do
|
||||
|
||||
tx <- getBlock sto (coerce href)
|
||||
>>= toMPlus
|
||||
|
||||
RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx
|
||||
& toMPlus
|
||||
|
||||
toMPlus $
|
||||
( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromAnn )
|
||||
<|>
|
||||
( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromSeq )
|
||||
|
||||
where
|
||||
fromAnn = \case
|
||||
AnnotatedHashRef _ h -> Just (TxSegment h)
|
||||
|
||||
fromSeq = \case
|
||||
(SequentialRef n (AnnotatedHashRef _ h)) -> Just $ TxCheckpoint (fromIntegral n) h
|
||||
|
||||
updateReflogIndex :: forall m . ( Git3Perks m
|
||||
, MonadReader Git3Env m
|
||||
|
@ -344,34 +319,13 @@ updateReflogIndex = do
|
|||
Nothing -> mzero
|
||||
Just (TxCheckpoint{}) -> mzero
|
||||
Just (TxSegment href) -> do
|
||||
|
||||
-- FIXME: error logging
|
||||
chunks <- liftIO (runExceptT (getTreeContents sto href))
|
||||
source <- liftIO (runExceptT (getTreeContents sto href))
|
||||
>>= orThrow MissedBlockError
|
||||
<&> LBS.toChunks
|
||||
|
||||
what <- toMPlus =<< liftIO do
|
||||
init <- decompress
|
||||
flip fix (init, chunks, mempty :: LBS.ByteString) $ \next -> \case
|
||||
|
||||
(Consume work, [], o) -> do
|
||||
r1 <- work ""
|
||||
next (r1, [], o)
|
||||
|
||||
(Consume work, e:es, o) -> do
|
||||
r1 <- work e
|
||||
next (r1, es, o)
|
||||
|
||||
(Produce piece r, e, o) -> do
|
||||
r1 <- r
|
||||
next (r1, e, LBS.append o (LBS.fromStrict piece))
|
||||
|
||||
(ZStdS.Done bs, _, o) -> pure (Just (LBS.append o (LBS.fromStrict bs)))
|
||||
|
||||
(Error _ _, _, _) -> do
|
||||
debug $ "not a valid segment" <+> pretty h
|
||||
pure Nothing
|
||||
|
||||
guard (LBS.length what > 0)
|
||||
what <- decompressSegmentLBS source
|
||||
>>= toMPlus
|
||||
|
||||
pieces <- S.toList_ $ do
|
||||
void $ runConsumeLBS what $ readLogFileLBS () $ \o _ lbs -> do
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
module HBS2.Git3.State.RefLog where
|
||||
|
||||
import HBS2.Git3.Prelude
|
||||
|
||||
import Control.Applicative
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Maybe
|
||||
import Streaming.Prelude qualified as S
|
||||
|
||||
data GitTx =
|
||||
TxSegment HashRef
|
||||
| TxCheckpoint Natural HashRef
|
||||
|
||||
data RefLogException =
|
||||
RefLogRPCException
|
||||
deriving stock (Show, Typeable)
|
||||
|
||||
instance Exception RefLogException
|
||||
|
||||
readTxMay :: forall m . ( MonadIO m
|
||||
)
|
||||
=> AnyStorage -> HashRef -> m (Maybe GitTx)
|
||||
|
||||
readTxMay sto href = runMaybeT do
|
||||
|
||||
tx <- getBlock sto (coerce href)
|
||||
>>= toMPlus
|
||||
|
||||
RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx
|
||||
& toMPlus
|
||||
|
||||
toMPlus $
|
||||
( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromAnn )
|
||||
<|>
|
||||
( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromSeq )
|
||||
|
||||
where
|
||||
fromAnn = \case
|
||||
AnnotatedHashRef _ h -> Just (TxSegment h)
|
||||
|
||||
fromSeq = \case
|
||||
(SequentialRef n (AnnotatedHashRef _ h)) -> Just $ TxCheckpoint (fromIntegral n) h
|
||||
|
||||
refLogRef :: forall m . ( HBS2GitPerks m
|
||||
, HasStorage m
|
||||
, HasClientAPI RefLogAPI UNIX m
|
||||
, HasGitRemoteKey m
|
||||
)
|
||||
=> m (Maybe HashRef)
|
||||
|
||||
refLogRef = do
|
||||
refLogAPI <- getClientAPI @RefLogAPI @UNIX
|
||||
reflog <- getGitRemoteKey >>= orThrow RefLogNotSetException
|
||||
|
||||
callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) refLogAPI reflog
|
||||
>>= orThrow RefLogNotSetException
|
||||
|
||||
txListAll :: forall m . ( HBS2GitPerks m
|
||||
, HasStorage m
|
||||
, HasClientAPI RefLogAPI UNIX m
|
||||
, HasGitRemoteKey m
|
||||
)
|
||||
=> Maybe HashRef
|
||||
-> m [(HashRef, GitTx)]
|
||||
|
||||
txListAll mhref = do
|
||||
sto <- getStorage
|
||||
|
||||
fromMaybe mempty <$> runMaybeT do
|
||||
|
||||
rv <- case mhref of
|
||||
Just x -> pure x
|
||||
Nothing -> lift refLogRef >>= toMPlus
|
||||
|
||||
hxs <- S.toList_ $ walkMerkle @[HashRef] (coerce rv) (getBlock sto) $ \case
|
||||
Left{} -> throwIO MissedBlockError
|
||||
Right hs -> S.each hs
|
||||
|
||||
S.toList_ $ for_ hxs $ \h -> do
|
||||
tx <- liftIO (readTxMay sto h)
|
||||
maybe none (S.yield . (h,)) tx
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
module HBS2.Git3.State.Segment where
|
||||
|
||||
import HBS2.Git3.Prelude
|
||||
import Data.ByteString.Lazy ( ByteString )
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
|
||||
import Codec.Compression.Zstd.Streaming as ZStdS
|
||||
|
||||
decompressSegmentLBS :: MonadIO m => ByteString -> m (Maybe ByteString)
|
||||
decompressSegmentLBS source = runMaybeT do
|
||||
let chunks = LBS.toChunks source
|
||||
toMPlus =<< liftIO do
|
||||
init <- decompress
|
||||
flip fix (init, chunks, mempty :: LBS.ByteString) $ \next -> \case
|
||||
|
||||
(Consume work, [], o) -> do
|
||||
r1 <- work ""
|
||||
next (r1, [], o)
|
||||
|
||||
(Consume work, e:es, o) -> do
|
||||
r1 <- work e
|
||||
next (r1, es, o)
|
||||
|
||||
(Produce piece r, e, o) -> do
|
||||
r1 <- r
|
||||
next (r1, e, LBS.append o (LBS.fromStrict piece))
|
||||
|
||||
(ZStdS.Done bs, _, o) -> pure (Just (LBS.append o (LBS.fromStrict bs)))
|
||||
|
||||
(Error _ _, _, _) -> do
|
||||
pure Nothing
|
||||
|
|
@ -26,3 +26,4 @@ getStatePath p = do
|
|||
d <- getConfigPath
|
||||
pure $ d </> show (pretty p)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue