From 0b5f98c004b6bd9f45b5c25f29759bb5fa4e3621 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 29 Jun 2023 18:59:12 +0300 Subject: [PATCH] whole log segments compression. NOTE: DO NOT USE THIS COMMIT FOR OLDER REFLOGS. BECAUSE THEY IT WILL BREAK FOR THE OLDER hbs2-git versions. Start a new reflog instead. Hopefully, it's a last (ha!) breaking change for a... while. --- flake.lock | 6 +-- hbs2-git/hbs2-git.cabal | 4 ++ hbs2-git/lib/HBS2Git/Export.hs | 40 ++++++++++++++---- hbs2-git/lib/HBS2Git/GitRepoLog.hs | 66 ++++++++++++++++++++++-------- hbs2-git/lib/HBS2Git/Import.hs | 53 ++++++++++++++++++++---- 5 files changed, 130 insertions(+), 39 deletions(-) diff --git a/flake.lock b/flake.lock index c41c492b..d13cdda3 100644 --- a/flake.lock +++ b/flake.lock @@ -227,11 +227,11 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1676209454, - "narHash": "sha256-alj9mBkV9U6tTPDK026671D2pesLSYZZc9j5dBZJ9f0=", + "lastModified": 1687946342, + "narHash": "sha256-vRxti8pOuXS0rJmqjbD8ueEEFXWSK22ISHoCWkhgzzg=", "owner": "nixos", "repo": "nixpkgs", - "rev": "8c619a1f3cedd16ea172146e30645e703d21bfc1", + "rev": "1c851e8c92b76a00ce84167984a7ec7ba2b1f29c", "type": "github" }, "original": { diff --git a/hbs2-git/hbs2-git.cabal b/hbs2-git/hbs2-git.cabal index 603d9a21..9722a39a 100644 --- a/hbs2-git/hbs2-git.cabal +++ b/hbs2-git/hbs2-git.cabal @@ -61,6 +61,10 @@ common shared-properties , bytestring , cache , containers + , streaming + , streaming-bytestring + , streaming-commons + , streaming-utils , cryptonite , directory , exceptions diff --git a/hbs2-git/lib/HBS2Git/Export.hs b/hbs2-git/lib/HBS2Git/Export.hs index b20cb68e..2a0b8328 100644 --- a/hbs2-git/lib/HBS2Git/Export.hs +++ b/hbs2-git/lib/HBS2Git/Export.hs @@ -48,6 +48,7 @@ import System.IO hiding (hClose,hPrint,hPutStrLn,hFlush) import System.IO.Temp import Control.Monad.Trans.Resource import Data.List.Split (chunksOf) +import Codec.Compression.GZip class ExportRepoOps a where @@ -86,6 +87,8 @@ exportRefDeleted _ repo ref = do dbPath <- makeDbPath repo db <- dbEnv dbPath + let opts = () + -- это "ненормальный" лог, т.е удаление ссылки в текущем контексте -- мы удаляем ссылку "там", то есть нам нужно "то" значение ссылки -- удалить её локально мы можем и так, просто гитом. @@ -106,8 +109,8 @@ exportRefDeleted _ repo ref = do let ha = gitHashObject (GitObject Blob repoHeadStr) let headEntry = GitLogEntry GitLogEntryHead (Just ha) ( fromIntegral $ LBS.length repoHeadStr ) - let content = gitRepoLogMakeEntry ctxHead ctxBs - <> gitRepoLogMakeEntry headEntry repoHeadStr + let content = gitRepoLogMakeEntry opts ctxHead ctxBs + <> gitRepoLogMakeEntry opts headEntry repoHeadStr -- FIXME: remove-code-dup let meta = fromString $ show @@ -164,10 +167,20 @@ writeLogSegments onProgress val objs chunkSize trailing = do remote <- asks $ view exportRepo readGit <- asks $ view exportReadObject + let opts = CompressWholeLog + + -- TODO: options-for-compression-level + -- помним, что всё иммутабельное. как один раз запостим, + -- такое и будет жить всегда + let compressOpts = defaultCompressParams { compressLevel = bestSpeed } + -- FIXME: fix-code-dup let meta = fromString $ show - $ "hbs2-git" <> line - <> "type:" <+> "hbs2-git-push-log" + $ "hbs2-git" + <> line + <> "type:" <+> "hbs2-git-push-log" + <> line + <> "flags:" <+> "gz:sgmt" <> line let segments = chunksOf chunkSize objs @@ -188,7 +201,7 @@ writeLogSegments onProgress val objs chunkSize trailing = do GitObject tp o <- liftIO $ readGit d `orDie` [qc|error reading object {pretty d}|] let entry = GitLogEntry ( gitLogEntryTypeOf tp ) (Just d) ( fromIntegral $ LBS.length o ) - gitRepoLogWriteEntry fh entry o + gitRepoLogWriteEntry opts fh entry o liftIO $ atomically $ modifyTVar written (HashSet.insert d) -- gitRepoLogWriteEntry fh ctx ctxBs @@ -197,13 +210,16 @@ writeLogSegments onProgress val objs chunkSize trailing = do when (segmentIndex == totalSegments) $ do for_ trailing $ \(e, bs) -> do - gitRepoLogWriteEntry fh e bs + gitRepoLogWriteEntry opts fh e bs -- finalize log section hClose fh content <- liftIO $ LBS.readFile fpath - logMerkle <- lift $ storeObject meta content `orDie` [qc|Can't store push log|] + + let gzipped = compressWith compressOpts content + + logMerkle <- lift $ storeObject meta gzipped `orDie` [qc|Can't store push log|] trace $ "PUSH LOG HASH: " <+> pretty logMerkle trace $ "POSTING REFERENCE UPDATE TRANSACTION" <+> pretty remote <+> pretty logMerkle @@ -301,7 +317,7 @@ exportRefOnly _ remote rfrom ref val = do let (commits, others) = List.partition (\e -> fst (onEntryType e) == 2) notTrees -- FIXME: hbs2-git-size-hardcode-to-args - let batch = 10000 + let batch = 20000 let objects = blobs <> trees <> others <> commits mon <- newProgressMonitor "write objects" (length objects) @@ -333,7 +349,13 @@ exportRefOnly _ remote rfrom ref val = do -- что бы оставить совместимость pure $ lastMay logz -runExport :: forall m . (MonadIO m, MonadUnliftIO m, MonadCatch m, HasProgress (App m), MonadMask (App m)) +runExport :: forall m . ( MonadIO m + , MonadUnliftIO m + , MonadCatch m + , HasProgress (App m) + , MonadMask (App m) + ) + => Maybe FilePath -> RepoRef -> App m () runExport fp repo = do diff --git a/hbs2-git/lib/HBS2Git/GitRepoLog.hs b/hbs2-git/lib/HBS2Git/GitRepoLog.hs index 0df8dee8..1630281b 100644 --- a/hbs2-git/lib/HBS2Git/GitRepoLog.hs +++ b/hbs2-git/lib/HBS2Git/GitRepoLog.hs @@ -23,6 +23,22 @@ import Data.HashSet qualified as HashSet import Control.Concurrent.STM import Data.Maybe +class HasGitLogOptions a where + compressEntries :: a -> Bool + compressWholeLog :: a -> Bool + + +-- | default GitLogOptions +instance HasGitLogOptions () where + compressEntries = const True + compressWholeLog = const False + +data CompressWholeLog = CompressWholeLog + +instance HasGitLogOptions CompressWholeLog where + compressEntries = const False + compressWholeLog = const True + data GitLogEntryType = GitLogEntryCommit | GitLogEntryBlob | GitLogEntryTree @@ -128,7 +144,8 @@ gitRepoLogScan r fn cb = do where go _ 0 = pure () go h size = do - es <- liftIO $ LBS.hGet h entryHeadSize <&> deserialise @GitLogEntry + ss <- liftIO $ LBS.hGet h entryHeadSize + let es = deserialise @GitLogEntry ss let esize = es ^. gitLogEntrySize let consumed = entryHeadSize + fromIntegral esize if r then do @@ -139,30 +156,43 @@ gitRepoLogScan r fn cb = do cb es Nothing go h ( max 0 (size - consumed) ) -gitRepoLogWriteHead :: forall m . MonadIO m => Handle -> GitLogHeadEntry -> m () -gitRepoLogWriteHead fh e = do +gitRepoLogWriteHead :: forall o m . (HasGitLogOptions o, MonadIO m) + => o + -> Handle + -> GitLogHeadEntry + -> m () + +gitRepoLogWriteHead opt fh e = do let s = serialise e let entry = GitLogEntry GitLogHead Nothing (fromIntegral $ LBS.length s) - gitRepoLogWriteEntry fh entry s + gitRepoLogWriteEntry opt fh entry s -gitRepoLogMakeEntry :: GitLogEntry -> ByteString -> ByteString -gitRepoLogMakeEntry entry' o = bs <> ss + + +gitRepoLogMakeEntry :: forall o . (HasGitLogOptions o) + => o + -> GitLogEntry + -> ByteString + -> ByteString + +gitRepoLogMakeEntry opts entry' o = bs <> ss where - bs = LBS.take entryHeadSize $ serialise entry <> LBS.replicate entryHeadSize 0 ss = compressWith co o entry = entry' & set gitLogEntrySize (fromIntegral $ LBS.length ss) - co = defaultCompressParams { compressLevel = bestSpeed } + bs = LBS.take entryHeadSize $ serialise entry <> LBS.replicate entryHeadSize 0 + co | compressEntries opts = defaultCompressParams { compressLevel = bestSpeed } + | otherwise = defaultCompressParams { compressLevel = noCompression } --- TODO: use-gitRepoLogMakeEntry-in-body -gitRepoLogWriteEntry :: forall m . MonadIO m => Handle -> GitLogEntry -> ByteString -> m () -gitRepoLogWriteEntry fh entry' o = do - let ss = compressWith co o - let entry = entry' & set gitLogEntrySize (fromIntegral $ LBS.length ss) - let bs = LBS.take entryHeadSize $ serialise entry <> LBS.replicate entryHeadSize 0 - liftIO $ LBS.hPutStr fh bs - liftIO $ LBS.hPutStr fh ss - where - co = defaultCompressParams { compressLevel = bestSpeed } +gitRepoLogWriteEntry :: forall o m . (MonadIO m, HasGitLogOptions o) + => o + -> Handle + -> GitLogEntry + -> ByteString + -> m () + +gitRepoLogWriteEntry opts fh entry' o = do + let entryWithSize = gitRepoLogMakeEntry opts entry' o + liftIO $ LBS.hPutStr fh entryWithSize gitRepoMakeIndex :: FilePath -> IO (HashSet GitHash) gitRepoMakeIndex fp = do diff --git a/hbs2-git/lib/HBS2Git/Import.hs b/hbs2-git/lib/HBS2Git/Import.hs index 4da77b84..673b4c88 100644 --- a/hbs2-git/lib/HBS2Git/Import.hs +++ b/hbs2-git/lib/HBS2Git/Import.hs @@ -35,6 +35,12 @@ import UnliftIO.IO import System.IO (openBinaryFile) import System.FilePath.Posix import Data.HashMap.Strict qualified as HashMap +import Data.Text qualified as Text +import Data.Config.Suckless +import Data.Either + +import Streaming.ByteString qualified as SB +import Streaming.Zip qualified as SZip data RunImportOpts = RunImportOpts @@ -54,6 +60,18 @@ walkHashes q h = walkMerkle h (readBlock . HashRef) $ \(hr :: Either (Hash HbSyn Right (hrr :: [HashRef]) -> do forM_ hrr $ liftIO . atomically . Q.writeTQueue q +blockSource :: (MonadIO m, HasCatAPI m) => HashRef -> SB.ByteStream m Integer +blockSource h = do + tsize <- liftIO $ newTVarIO 0 + deepScan ScanDeep (const none) (fromHashRef h) (lift . readBlock . HashRef) $ \ha -> do + sec <- lift $ readBlock (HashRef ha) `orDie` [qc|missed block {pretty ha}|] + -- skip merkle tree head block, write only the data + liftIO $ atomically $ modifyTVar tsize (+ LBS.length sec) + when (h /= HashRef ha) do + SB.fromLazy sec + + liftIO $ readTVarIO tsize <&> fromIntegral + importRefLogNew :: ( MonadIO m , MonadUnliftIO m , MonadCatch m @@ -117,32 +135,49 @@ importRefLogNew force ref = runResourceT do let (SequentialRef _ (AnnotatedHashRef _ h)) = payload trace $ "PUSH LOG HASH" <+> pretty h + treeBs <- MaybeT $ lift $ readBlock h + + let something = tryDetect (fromHashRef h) treeBs + let meta = mconcat $ rights [ parseTop (Text.unpack s) | ShortMetadata s <- universeBi something ] + + -- TODO: check-if-it-is-hbs2-git-log + + let flags = mconcat [ Text.splitOn ":" (Text.pack (show $ pretty s)) + | (ListVal (Key "flags:" [SymbolVal s]) ) <- meta + ] + + let gzipped = "gz" `elem` flags + + debug $ "FOUND LOG METADATA " <+> pretty flags + <+> pretty "gzipped:" <+> pretty gzipped + here <- withDB db $ stateGetLogImported h unless (here && not force) do - lift $ deepScan ScanDeep (const none) (fromHashRef h) (lift . readBlock . HashRef) $ \ha -> do - sec <- lift $ readBlock (HashRef ha) `orDie` [qc|missed block {pretty ha}|] - -- skip merkle tree head block, write only the data - when (h /= HashRef ha) do - liftIO $ LBS.hPutStr fh sec + sz <- if gzipped then do + SB.toHandle fh $ SZip.gunzip (blockSource h) + else + SB.toHandle fh (blockSource h) release keyFh + let fpathReal = fpath + tnum <- liftIO $ newTVarIO 0 - liftIO $ gitRepoLogScan True fpath $ \_ _ -> do + liftIO $ gitRepoLogScan True fpathReal $ \_ _ -> do liftIO $ atomically $ modifyTVar tnum succ num <- liftIO $ readTVarIO tnum trace $ "LOG ENTRY COUNT" <+> pretty num let pref = take 16 (show (pretty e)) - sz <- liftIO $ getFileSize fpath <&> realToFrac - let name = [qc|import {pref}... {sz / (1024*1024) :: Fixed E3}|] + let name = [qc|import {pref}... {realToFrac sz / (1024*1024) :: Fixed E3}|] oMon <- newProgressMonitor name num - lift $ gitRepoLogScan True fpath $ \entry s -> do + lift $ gitRepoLogScan True fpathReal $ \entry s -> do + updateProgress oMon 1 lbs <- pure s `orDie` [qc|git object not read from log|]