mirror of https://github.com/voidlizard/hbs2
whole log segments compression.
NOTE: DO NOT USE THIS COMMIT FOR OLDER REFLOGS. BECAUSE THEY IT WILL BREAK FOR THE OLDER hbs2-git versions. Start a new reflog instead. Hopefully, it's a last (ha!) breaking change for a... while.
This commit is contained in:
parent
22bb266f02
commit
0b5f98c004
|
@ -227,11 +227,11 @@
|
||||||
},
|
},
|
||||||
"nixpkgs_2": {
|
"nixpkgs_2": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1676209454,
|
"lastModified": 1687946342,
|
||||||
"narHash": "sha256-alj9mBkV9U6tTPDK026671D2pesLSYZZc9j5dBZJ9f0=",
|
"narHash": "sha256-vRxti8pOuXS0rJmqjbD8ueEEFXWSK22ISHoCWkhgzzg=",
|
||||||
"owner": "nixos",
|
"owner": "nixos",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "8c619a1f3cedd16ea172146e30645e703d21bfc1",
|
"rev": "1c851e8c92b76a00ce84167984a7ec7ba2b1f29c",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
|
|
@ -61,6 +61,10 @@ common shared-properties
|
||||||
, bytestring
|
, bytestring
|
||||||
, cache
|
, cache
|
||||||
, containers
|
, containers
|
||||||
|
, streaming
|
||||||
|
, streaming-bytestring
|
||||||
|
, streaming-commons
|
||||||
|
, streaming-utils
|
||||||
, cryptonite
|
, cryptonite
|
||||||
, directory
|
, directory
|
||||||
, exceptions
|
, exceptions
|
||||||
|
|
|
@ -48,6 +48,7 @@ import System.IO hiding (hClose,hPrint,hPutStrLn,hFlush)
|
||||||
import System.IO.Temp
|
import System.IO.Temp
|
||||||
import Control.Monad.Trans.Resource
|
import Control.Monad.Trans.Resource
|
||||||
import Data.List.Split (chunksOf)
|
import Data.List.Split (chunksOf)
|
||||||
|
import Codec.Compression.GZip
|
||||||
|
|
||||||
class ExportRepoOps a where
|
class ExportRepoOps a where
|
||||||
|
|
||||||
|
@ -86,6 +87,8 @@ exportRefDeleted _ repo ref = do
|
||||||
dbPath <- makeDbPath repo
|
dbPath <- makeDbPath repo
|
||||||
db <- dbEnv dbPath
|
db <- dbEnv dbPath
|
||||||
|
|
||||||
|
let opts = ()
|
||||||
|
|
||||||
-- это "ненормальный" лог, т.е удаление ссылки в текущем контексте
|
-- это "ненормальный" лог, т.е удаление ссылки в текущем контексте
|
||||||
-- мы удаляем ссылку "там", то есть нам нужно "то" значение ссылки
|
-- мы удаляем ссылку "там", то есть нам нужно "то" значение ссылки
|
||||||
-- удалить её локально мы можем и так, просто гитом.
|
-- удалить её локально мы можем и так, просто гитом.
|
||||||
|
@ -106,8 +109,8 @@ exportRefDeleted _ repo ref = do
|
||||||
let ha = gitHashObject (GitObject Blob repoHeadStr)
|
let ha = gitHashObject (GitObject Blob repoHeadStr)
|
||||||
let headEntry = GitLogEntry GitLogEntryHead (Just ha) ( fromIntegral $ LBS.length repoHeadStr )
|
let headEntry = GitLogEntry GitLogEntryHead (Just ha) ( fromIntegral $ LBS.length repoHeadStr )
|
||||||
|
|
||||||
let content = gitRepoLogMakeEntry ctxHead ctxBs
|
let content = gitRepoLogMakeEntry opts ctxHead ctxBs
|
||||||
<> gitRepoLogMakeEntry headEntry repoHeadStr
|
<> gitRepoLogMakeEntry opts headEntry repoHeadStr
|
||||||
|
|
||||||
-- FIXME: remove-code-dup
|
-- FIXME: remove-code-dup
|
||||||
let meta = fromString $ show
|
let meta = fromString $ show
|
||||||
|
@ -164,11 +167,21 @@ writeLogSegments onProgress val objs chunkSize trailing = do
|
||||||
remote <- asks $ view exportRepo
|
remote <- asks $ view exportRepo
|
||||||
readGit <- asks $ view exportReadObject
|
readGit <- asks $ view exportReadObject
|
||||||
|
|
||||||
|
let opts = CompressWholeLog
|
||||||
|
|
||||||
|
-- TODO: options-for-compression-level
|
||||||
|
-- помним, что всё иммутабельное. как один раз запостим,
|
||||||
|
-- такое и будет жить всегда
|
||||||
|
let compressOpts = defaultCompressParams { compressLevel = bestSpeed }
|
||||||
|
|
||||||
-- FIXME: fix-code-dup
|
-- FIXME: fix-code-dup
|
||||||
let meta = fromString $ show
|
let meta = fromString $ show
|
||||||
$ "hbs2-git" <> line
|
$ "hbs2-git"
|
||||||
|
<> line
|
||||||
<> "type:" <+> "hbs2-git-push-log"
|
<> "type:" <+> "hbs2-git-push-log"
|
||||||
<> line
|
<> line
|
||||||
|
<> "flags:" <+> "gz:sgmt"
|
||||||
|
<> line
|
||||||
|
|
||||||
let segments = chunksOf chunkSize objs
|
let segments = chunksOf chunkSize objs
|
||||||
let totalSegments = length segments
|
let totalSegments = length segments
|
||||||
|
@ -188,7 +201,7 @@ writeLogSegments onProgress val objs chunkSize trailing = do
|
||||||
GitObject tp o <- liftIO $ readGit d `orDie` [qc|error reading object {pretty d}|]
|
GitObject tp o <- liftIO $ readGit d `orDie` [qc|error reading object {pretty d}|]
|
||||||
|
|
||||||
let entry = GitLogEntry ( gitLogEntryTypeOf tp ) (Just d) ( fromIntegral $ LBS.length o )
|
let entry = GitLogEntry ( gitLogEntryTypeOf tp ) (Just d) ( fromIntegral $ LBS.length o )
|
||||||
gitRepoLogWriteEntry fh entry o
|
gitRepoLogWriteEntry opts fh entry o
|
||||||
liftIO $ atomically $ modifyTVar written (HashSet.insert d)
|
liftIO $ atomically $ modifyTVar written (HashSet.insert d)
|
||||||
|
|
||||||
-- gitRepoLogWriteEntry fh ctx ctxBs
|
-- gitRepoLogWriteEntry fh ctx ctxBs
|
||||||
|
@ -197,13 +210,16 @@ writeLogSegments onProgress val objs chunkSize trailing = do
|
||||||
|
|
||||||
when (segmentIndex == totalSegments) $ do
|
when (segmentIndex == totalSegments) $ do
|
||||||
for_ trailing $ \(e, bs) -> do
|
for_ trailing $ \(e, bs) -> do
|
||||||
gitRepoLogWriteEntry fh e bs
|
gitRepoLogWriteEntry opts fh e bs
|
||||||
|
|
||||||
-- finalize log section
|
-- finalize log section
|
||||||
hClose fh
|
hClose fh
|
||||||
|
|
||||||
content <- liftIO $ LBS.readFile fpath
|
content <- liftIO $ LBS.readFile fpath
|
||||||
logMerkle <- lift $ storeObject meta content `orDie` [qc|Can't store push log|]
|
|
||||||
|
let gzipped = compressWith compressOpts content
|
||||||
|
|
||||||
|
logMerkle <- lift $ storeObject meta gzipped `orDie` [qc|Can't store push log|]
|
||||||
|
|
||||||
trace $ "PUSH LOG HASH: " <+> pretty logMerkle
|
trace $ "PUSH LOG HASH: " <+> pretty logMerkle
|
||||||
trace $ "POSTING REFERENCE UPDATE TRANSACTION" <+> pretty remote <+> pretty logMerkle
|
trace $ "POSTING REFERENCE UPDATE TRANSACTION" <+> pretty remote <+> pretty logMerkle
|
||||||
|
@ -301,7 +317,7 @@ exportRefOnly _ remote rfrom ref val = do
|
||||||
let (commits, others) = List.partition (\e -> fst (onEntryType e) == 2) notTrees
|
let (commits, others) = List.partition (\e -> fst (onEntryType e) == 2) notTrees
|
||||||
|
|
||||||
-- FIXME: hbs2-git-size-hardcode-to-args
|
-- FIXME: hbs2-git-size-hardcode-to-args
|
||||||
let batch = 10000
|
let batch = 20000
|
||||||
let objects = blobs <> trees <> others <> commits
|
let objects = blobs <> trees <> others <> commits
|
||||||
mon <- newProgressMonitor "write objects" (length objects)
|
mon <- newProgressMonitor "write objects" (length objects)
|
||||||
|
|
||||||
|
@ -333,7 +349,13 @@ exportRefOnly _ remote rfrom ref val = do
|
||||||
-- что бы оставить совместимость
|
-- что бы оставить совместимость
|
||||||
pure $ lastMay logz
|
pure $ lastMay logz
|
||||||
|
|
||||||
runExport :: forall m . (MonadIO m, MonadUnliftIO m, MonadCatch m, HasProgress (App m), MonadMask (App m))
|
runExport :: forall m . ( MonadIO m
|
||||||
|
, MonadUnliftIO m
|
||||||
|
, MonadCatch m
|
||||||
|
, HasProgress (App m)
|
||||||
|
, MonadMask (App m)
|
||||||
|
)
|
||||||
|
|
||||||
=> Maybe FilePath -> RepoRef -> App m ()
|
=> Maybe FilePath -> RepoRef -> App m ()
|
||||||
runExport fp repo = do
|
runExport fp repo = do
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,22 @@ import Data.HashSet qualified as HashSet
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
|
|
||||||
|
class HasGitLogOptions a where
|
||||||
|
compressEntries :: a -> Bool
|
||||||
|
compressWholeLog :: a -> Bool
|
||||||
|
|
||||||
|
|
||||||
|
-- | default GitLogOptions
|
||||||
|
instance HasGitLogOptions () where
|
||||||
|
compressEntries = const True
|
||||||
|
compressWholeLog = const False
|
||||||
|
|
||||||
|
data CompressWholeLog = CompressWholeLog
|
||||||
|
|
||||||
|
instance HasGitLogOptions CompressWholeLog where
|
||||||
|
compressEntries = const False
|
||||||
|
compressWholeLog = const True
|
||||||
|
|
||||||
data GitLogEntryType = GitLogEntryCommit
|
data GitLogEntryType = GitLogEntryCommit
|
||||||
| GitLogEntryBlob
|
| GitLogEntryBlob
|
||||||
| GitLogEntryTree
|
| GitLogEntryTree
|
||||||
|
@ -128,7 +144,8 @@ gitRepoLogScan r fn cb = do
|
||||||
where
|
where
|
||||||
go _ 0 = pure ()
|
go _ 0 = pure ()
|
||||||
go h size = do
|
go h size = do
|
||||||
es <- liftIO $ LBS.hGet h entryHeadSize <&> deserialise @GitLogEntry
|
ss <- liftIO $ LBS.hGet h entryHeadSize
|
||||||
|
let es = deserialise @GitLogEntry ss
|
||||||
let esize = es ^. gitLogEntrySize
|
let esize = es ^. gitLogEntrySize
|
||||||
let consumed = entryHeadSize + fromIntegral esize
|
let consumed = entryHeadSize + fromIntegral esize
|
||||||
if r then do
|
if r then do
|
||||||
|
@ -139,30 +156,43 @@ gitRepoLogScan r fn cb = do
|
||||||
cb es Nothing
|
cb es Nothing
|
||||||
go h ( max 0 (size - consumed) )
|
go h ( max 0 (size - consumed) )
|
||||||
|
|
||||||
gitRepoLogWriteHead :: forall m . MonadIO m => Handle -> GitLogHeadEntry -> m ()
|
gitRepoLogWriteHead :: forall o m . (HasGitLogOptions o, MonadIO m)
|
||||||
gitRepoLogWriteHead fh e = do
|
=> o
|
||||||
|
-> Handle
|
||||||
|
-> GitLogHeadEntry
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
gitRepoLogWriteHead opt fh e = do
|
||||||
let s = serialise e
|
let s = serialise e
|
||||||
let entry = GitLogEntry GitLogHead Nothing (fromIntegral $ LBS.length s)
|
let entry = GitLogEntry GitLogHead Nothing (fromIntegral $ LBS.length s)
|
||||||
gitRepoLogWriteEntry fh entry s
|
gitRepoLogWriteEntry opt fh entry s
|
||||||
|
|
||||||
gitRepoLogMakeEntry :: GitLogEntry -> ByteString -> ByteString
|
|
||||||
gitRepoLogMakeEntry entry' o = bs <> ss
|
|
||||||
|
gitRepoLogMakeEntry :: forall o . (HasGitLogOptions o)
|
||||||
|
=> o
|
||||||
|
-> GitLogEntry
|
||||||
|
-> ByteString
|
||||||
|
-> ByteString
|
||||||
|
|
||||||
|
gitRepoLogMakeEntry opts entry' o = bs <> ss
|
||||||
where
|
where
|
||||||
bs = LBS.take entryHeadSize $ serialise entry <> LBS.replicate entryHeadSize 0
|
|
||||||
ss = compressWith co o
|
ss = compressWith co o
|
||||||
entry = entry' & set gitLogEntrySize (fromIntegral $ LBS.length ss)
|
entry = entry' & set gitLogEntrySize (fromIntegral $ LBS.length ss)
|
||||||
co = defaultCompressParams { compressLevel = bestSpeed }
|
bs = LBS.take entryHeadSize $ serialise entry <> LBS.replicate entryHeadSize 0
|
||||||
|
co | compressEntries opts = defaultCompressParams { compressLevel = bestSpeed }
|
||||||
|
| otherwise = defaultCompressParams { compressLevel = noCompression }
|
||||||
|
|
||||||
-- TODO: use-gitRepoLogMakeEntry-in-body
|
gitRepoLogWriteEntry :: forall o m . (MonadIO m, HasGitLogOptions o)
|
||||||
gitRepoLogWriteEntry :: forall m . MonadIO m => Handle -> GitLogEntry -> ByteString -> m ()
|
=> o
|
||||||
gitRepoLogWriteEntry fh entry' o = do
|
-> Handle
|
||||||
let ss = compressWith co o
|
-> GitLogEntry
|
||||||
let entry = entry' & set gitLogEntrySize (fromIntegral $ LBS.length ss)
|
-> ByteString
|
||||||
let bs = LBS.take entryHeadSize $ serialise entry <> LBS.replicate entryHeadSize 0
|
-> m ()
|
||||||
liftIO $ LBS.hPutStr fh bs
|
|
||||||
liftIO $ LBS.hPutStr fh ss
|
gitRepoLogWriteEntry opts fh entry' o = do
|
||||||
where
|
let entryWithSize = gitRepoLogMakeEntry opts entry' o
|
||||||
co = defaultCompressParams { compressLevel = bestSpeed }
|
liftIO $ LBS.hPutStr fh entryWithSize
|
||||||
|
|
||||||
gitRepoMakeIndex :: FilePath -> IO (HashSet GitHash)
|
gitRepoMakeIndex :: FilePath -> IO (HashSet GitHash)
|
||||||
gitRepoMakeIndex fp = do
|
gitRepoMakeIndex fp = do
|
||||||
|
|
|
@ -35,6 +35,12 @@ import UnliftIO.IO
|
||||||
import System.IO (openBinaryFile)
|
import System.IO (openBinaryFile)
|
||||||
import System.FilePath.Posix
|
import System.FilePath.Posix
|
||||||
import Data.HashMap.Strict qualified as HashMap
|
import Data.HashMap.Strict qualified as HashMap
|
||||||
|
import Data.Text qualified as Text
|
||||||
|
import Data.Config.Suckless
|
||||||
|
import Data.Either
|
||||||
|
|
||||||
|
import Streaming.ByteString qualified as SB
|
||||||
|
import Streaming.Zip qualified as SZip
|
||||||
|
|
||||||
data RunImportOpts =
|
data RunImportOpts =
|
||||||
RunImportOpts
|
RunImportOpts
|
||||||
|
@ -54,6 +60,18 @@ walkHashes q h = walkMerkle h (readBlock . HashRef) $ \(hr :: Either (Hash HbSyn
|
||||||
Right (hrr :: [HashRef]) -> do
|
Right (hrr :: [HashRef]) -> do
|
||||||
forM_ hrr $ liftIO . atomically . Q.writeTQueue q
|
forM_ hrr $ liftIO . atomically . Q.writeTQueue q
|
||||||
|
|
||||||
|
blockSource :: (MonadIO m, HasCatAPI m) => HashRef -> SB.ByteStream m Integer
|
||||||
|
blockSource h = do
|
||||||
|
tsize <- liftIO $ newTVarIO 0
|
||||||
|
deepScan ScanDeep (const none) (fromHashRef h) (lift . readBlock . HashRef) $ \ha -> do
|
||||||
|
sec <- lift $ readBlock (HashRef ha) `orDie` [qc|missed block {pretty ha}|]
|
||||||
|
-- skip merkle tree head block, write only the data
|
||||||
|
liftIO $ atomically $ modifyTVar tsize (+ LBS.length sec)
|
||||||
|
when (h /= HashRef ha) do
|
||||||
|
SB.fromLazy sec
|
||||||
|
|
||||||
|
liftIO $ readTVarIO tsize <&> fromIntegral
|
||||||
|
|
||||||
importRefLogNew :: ( MonadIO m
|
importRefLogNew :: ( MonadIO m
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
, MonadCatch m
|
, MonadCatch m
|
||||||
|
@ -117,32 +135,49 @@ importRefLogNew force ref = runResourceT do
|
||||||
let (SequentialRef _ (AnnotatedHashRef _ h)) = payload
|
let (SequentialRef _ (AnnotatedHashRef _ h)) = payload
|
||||||
trace $ "PUSH LOG HASH" <+> pretty h
|
trace $ "PUSH LOG HASH" <+> pretty h
|
||||||
|
|
||||||
|
treeBs <- MaybeT $ lift $ readBlock h
|
||||||
|
|
||||||
|
let something = tryDetect (fromHashRef h) treeBs
|
||||||
|
let meta = mconcat $ rights [ parseTop (Text.unpack s) | ShortMetadata s <- universeBi something ]
|
||||||
|
|
||||||
|
-- TODO: check-if-it-is-hbs2-git-log
|
||||||
|
|
||||||
|
let flags = mconcat [ Text.splitOn ":" (Text.pack (show $ pretty s))
|
||||||
|
| (ListVal (Key "flags:" [SymbolVal s]) ) <- meta
|
||||||
|
]
|
||||||
|
|
||||||
|
let gzipped = "gz" `elem` flags
|
||||||
|
|
||||||
|
debug $ "FOUND LOG METADATA " <+> pretty flags
|
||||||
|
<+> pretty "gzipped:" <+> pretty gzipped
|
||||||
|
|
||||||
here <- withDB db $ stateGetLogImported h
|
here <- withDB db $ stateGetLogImported h
|
||||||
|
|
||||||
unless (here && not force) do
|
unless (here && not force) do
|
||||||
|
|
||||||
lift $ deepScan ScanDeep (const none) (fromHashRef h) (lift . readBlock . HashRef) $ \ha -> do
|
sz <- if gzipped then do
|
||||||
sec <- lift $ readBlock (HashRef ha) `orDie` [qc|missed block {pretty ha}|]
|
SB.toHandle fh $ SZip.gunzip (blockSource h)
|
||||||
-- skip merkle tree head block, write only the data
|
else
|
||||||
when (h /= HashRef ha) do
|
SB.toHandle fh (blockSource h)
|
||||||
liftIO $ LBS.hPutStr fh sec
|
|
||||||
|
|
||||||
release keyFh
|
release keyFh
|
||||||
|
|
||||||
|
let fpathReal = fpath
|
||||||
|
|
||||||
tnum <- liftIO $ newTVarIO 0
|
tnum <- liftIO $ newTVarIO 0
|
||||||
liftIO $ gitRepoLogScan True fpath $ \_ _ -> do
|
liftIO $ gitRepoLogScan True fpathReal $ \_ _ -> do
|
||||||
liftIO $ atomically $ modifyTVar tnum succ
|
liftIO $ atomically $ modifyTVar tnum succ
|
||||||
|
|
||||||
num <- liftIO $ readTVarIO tnum
|
num <- liftIO $ readTVarIO tnum
|
||||||
trace $ "LOG ENTRY COUNT" <+> pretty num
|
trace $ "LOG ENTRY COUNT" <+> pretty num
|
||||||
|
|
||||||
let pref = take 16 (show (pretty e))
|
let pref = take 16 (show (pretty e))
|
||||||
sz <- liftIO $ getFileSize fpath <&> realToFrac
|
let name = [qc|import {pref}... {realToFrac sz / (1024*1024) :: Fixed E3}|]
|
||||||
let name = [qc|import {pref}... {sz / (1024*1024) :: Fixed E3}|]
|
|
||||||
|
|
||||||
oMon <- newProgressMonitor name num
|
oMon <- newProgressMonitor name num
|
||||||
|
|
||||||
lift $ gitRepoLogScan True fpath $ \entry s -> do
|
lift $ gitRepoLogScan True fpathReal $ \entry s -> do
|
||||||
|
|
||||||
updateProgress oMon 1
|
updateProgress oMon 1
|
||||||
|
|
||||||
lbs <- pure s `orDie` [qc|git object not read from log|]
|
lbs <- pure s `orDie` [qc|git object not read from log|]
|
||||||
|
|
Loading…
Reference in New Issue