mirror of https://github.com/voidlizard/hbs2
hbs2-git-log-segmentation
This commit is contained in:
parent
e116268c4a
commit
3681fd7bee
|
@ -41,3 +41,120 @@ TODO: log-object-reorder-for-better-dedup
|
|||
что префикс лога будет более константным, что ли. т.е при
|
||||
последующем разбиении на сегменты есть шансы, что сегменты в
|
||||
начале лога будут лучше дедупиться.
|
||||
|
||||
|
||||
FIXME: segmented-log-import-issue
|
||||
заметил, что не сразу срабатывает fetch,
|
||||
а только после скана.
|
||||
|
||||
Выяснить, почему.
|
||||
|
||||
Появилось только после сегментации лога,
|
||||
возможно, связано с асинхронностью прибытия секций,
|
||||
а может, сломан импорт. Проверить.
|
||||
|
||||
|
||||
FIXME: hbs2-segmented-log-remove-temp-files-ASAP
|
||||
Удалять временные файлы при импорте (и экспорте)
|
||||
максимально быстро.
|
||||
Наличие огромного количества временных файлов в каталоге
|
||||
убивает произодительность, и сильно жрёт дисковый ресурс
|
||||
и память, если это tmpfs, драматически замедляя процесс
|
||||
импорта/экспорта
|
||||
|
||||
FIXME: check-merkle-meta
|
||||
Проверять метаданные аннотированного дерева.
|
||||
Писать версию лога в эти данные.
|
||||
|
||||
FIXME: mandatory-compress-whole-log
|
||||
Эксперимент показал разницу в размере репозитория git и
|
||||
соответствующего ему стейта hbs2-git приблизительно в 10 раз (!!).
|
||||
|
||||
Очевидно, что git сжимает даже лучше, чем мы бы просто сжали его
|
||||
объекты архиватором, он проделывает еще некоторые дополнительные
|
||||
манипуляции, к которым шёл годами.
|
||||
|
||||
Всё, что мы пока можем сделать --- это хотя бы сжимать секции лога
|
||||
целиком.
|
||||
|
||||
Вопрос, ломать или не ломать при этом совместимость, т.е оставить ли
|
||||
возможность несжатых логов.
|
||||
|
||||
Указывать ли информацию в аннотации дерева, либо же завести секцию
|
||||
лога, указывающую, что за ней следуют сжатые данные.
|
||||
|
||||
Сжатие на уровне лога целиком нужно обязательно, т.к. сейчас мы
|
||||
имеем стейт размера 36G вместо 3.5G на тестовом примере репозитория
|
||||
nixpkgs.
|
||||
|
||||
Сжатие лога внутри секций неэффективно и, кажется, бесполезно.
|
||||
|
||||
Если его дропнуть, то поломается совместимость.
|
||||
|
||||
Если оставить, но поставить нулевую компрессию --- то будет
|
||||
бесполезное замедление работы.
|
||||
|
||||
Сжимать на уровне лога целиком --- правильно.
|
||||
|
||||
Но если иметь несегментированный лог, или очень большой размер
|
||||
сегмента, то это будет очень медленно, и чем больше файл, тем хуже
|
||||
будет ситуация.
|
||||
|
||||
На сжатии в памяти, кроме того, мы можем исчерпать память.
|
||||
|
||||
TODO: reflog-purge-function-ASAP
|
||||
Рекурсивное удаление всех данных ссылки при удалении
|
||||
ссылки.
|
||||
|
||||
Сейчас есть рекурсивное удаление дерева, но для ссылок
|
||||
будет удалён только журнал транзакций, а сами данные
|
||||
останутся.
|
||||
|
||||
Можно парсить ссылку, и если там обнаруживаем там дерево,
|
||||
то сносить его рекурсивно.
|
||||
|
||||
Это надо сделать ASAP, т.к. растёт стейт на тестовых данных,
|
||||
надо сносить.
|
||||
|
||||
|
||||
|
||||
NOTE: wtf?
|
||||
|
||||
FIXME: hbs2-git-storeObject-read-chunked
|
||||
возможно, очень большой mem footprint при экспорте
|
||||
больших репо объясняется тем, что мы читаем файл
|
||||
целиком (хоть в LBS). Попробовать читать частями.
|
||||
|
||||
TODO: reflog-monitoring-features
|
||||
сделать возможность теста рефлога и перезапроса
|
||||
его секций. почему-то сейчас рефлог подъехал не сразу,
|
||||
это довольно тревожно.
|
||||
|
||||
|
||||
TODO: hbs2-peer-dynamic-reflog-subscription
|
||||
сделать динамическую персистентную подписку
|
||||
на рефлоги.
|
||||
hbs2-peer reflog add REFLOG <poll-time-min>
|
||||
hbs2-peer reflog list
|
||||
hbs2-peer reflog del
|
||||
сделать соответствующие API
|
||||
|
||||
git clone hbs2://XXX --- добавляет подписку динамически (?)
|
||||
или по крайней мере может, если есть такая опция в конфиге
|
||||
|
||||
TODO: hbs2-git-remove-REFLOG-from-export
|
||||
Можно же использовать ключ, если он передан.
|
||||
Таким образом, параметр можно сделать необязательным,
|
||||
и если он не задан, то брать из ключа.
|
||||
|
||||
TODO: hbs2-git-export-segment-params-to-export-ASAP
|
||||
Отдельно обрабатывать блобы, деревья и коммиты
|
||||
при экспорте. Ввести умолчания -- для блобов
|
||||
сегмент меньше, для блобов --- больше.
|
||||
Нужно, так как блобы обычно больше, tree -- в зависимости
|
||||
от проекта, коммиты -- более менее стабильного размера и
|
||||
маленькие.
|
||||
|
||||
NOTE: test-1
|
||||
NOTE: test-2
|
||||
|
||||
|
|
|
@ -81,6 +81,7 @@ guessHead = \case
|
|||
loop :: forall m . ( MonadIO m
|
||||
, MonadCatch m
|
||||
, MonadUnliftIO m
|
||||
, MonadMask m
|
||||
, HasProgress (RunWithConfig (GitRemoteApp m))
|
||||
) => [String] -> GitRemoteApp m ()
|
||||
loop args = do
|
||||
|
@ -164,16 +165,19 @@ loop args = do
|
|||
next
|
||||
|
||||
["list"] -> do
|
||||
importRefLogNew False ref
|
||||
for_ (LBS.lines hd) (sendLn . LBS.toStrict)
|
||||
sendEol
|
||||
next
|
||||
|
||||
["list","for-push"] -> do
|
||||
importRefLogNew False ref
|
||||
for_ (LBS.lines hd) (sendLn . LBS.toStrict)
|
||||
sendEol
|
||||
next
|
||||
|
||||
["fetch", sha1, x] -> do
|
||||
importRefLogNew False ref
|
||||
trace $ "fetch" <+> pretty (BS.unpack sha1) <+> pretty (BS.unpack x)
|
||||
liftIO $ atomically $ writeTVar batch True
|
||||
-- sendEol
|
||||
|
|
|
@ -36,7 +36,7 @@ newtype RunWithConfig m a =
|
|||
, MonadTrans
|
||||
, MonadThrow
|
||||
, MonadCatch
|
||||
-- , MonadMask
|
||||
, MonadMask
|
||||
, MonadUnliftIO
|
||||
)
|
||||
|
||||
|
@ -60,7 +60,9 @@ instance MonadIO m => HasRefCredentials (RunWithConfig (GitRemoteApp m)) where
|
|||
push :: forall m . ( MonadIO m
|
||||
, MonadCatch m
|
||||
, HasProgress (RunWithConfig (GitRemoteApp m))
|
||||
, MonadMask (RunWithConfig (GitRemoteApp m))
|
||||
, MonadUnliftIO m
|
||||
, MonadMask m
|
||||
)
|
||||
|
||||
=> RepoRef -> [Maybe GitRef] -> GitRemoteApp m (Maybe GitRef)
|
||||
|
@ -79,7 +81,7 @@ push remote what@[Just bFrom , Just br] = do
|
|||
trace $ "PUSH PARAMS" <+> pretty what
|
||||
gh <- gitGetHash (normalizeRef bFrom) `orDie` [qc|can't read hash for ref {pretty br}|]
|
||||
_ <- traceTime "TIME: exportRefOnly" $ exportRefOnly () remote (Just bFrom) br gh
|
||||
importRefLogNew False remote
|
||||
-- importRefLogNew False remote
|
||||
pure (Just br)
|
||||
|
||||
push remote [Nothing, Just br] = do
|
||||
|
@ -90,7 +92,7 @@ push remote [Nothing, Just br] = do
|
|||
loadCredentials mempty
|
||||
trace $ "deleting remote reference" <+> pretty br
|
||||
exportRefDeleted () remote br
|
||||
importRefLogNew False remote
|
||||
-- importRefLogNew False remote
|
||||
pure (Just br)
|
||||
|
||||
push r w = do
|
||||
|
|
|
@ -37,6 +37,8 @@ newtype GitRemoteApp m a =
|
|||
, MonadThrow
|
||||
, MonadCatch
|
||||
, MonadUnliftIO
|
||||
, MonadMask
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
runRemoteM :: MonadIO m => RemoteEnv -> GitRemoteApp m a -> m a
|
||||
|
|
|
@ -77,6 +77,7 @@ common shared-properties
|
|||
, resourcet
|
||||
, safe
|
||||
, serialise
|
||||
, split
|
||||
, sqlite-simple
|
||||
, stm
|
||||
, suckless-conf
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
module HBS2.Git.Local.CLI
|
||||
( module HBS2.Git.Local.CLI
|
||||
, getStdin
|
||||
, getStdout
|
||||
, stopProcess
|
||||
) where
|
||||
|
||||
|
@ -31,6 +32,8 @@ import Data.Text.Encoding (decodeLatin1)
|
|||
import Data.Text qualified as Text
|
||||
import System.Process.Typed
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
import Lens.Micro.Platform
|
||||
import Control.Monad.Trans.Maybe
|
||||
import System.IO
|
||||
|
||||
-- FIXME: specify-git-dir
|
||||
|
@ -412,7 +415,8 @@ gitRevList :: MonadIO m => Maybe GitHash -> GitHash -> m [GitHash]
|
|||
gitRevList l h = do
|
||||
let from = maybe mempty (\r -> [qc|{pretty r}..|] ) l
|
||||
-- let cmd = [qc|git rev-list --objects --in-commit-order --reverse --date-order {from}{pretty h}|]
|
||||
let cmd = [qc|git rev-list --objects --reverse --in-commit-order {from}{pretty h}|]
|
||||
-- let cmd = [qc|git rev-list --objects --reverse --in-commit-order {from}{pretty h}|]
|
||||
let cmd = [qc|git rev-list --reverse --in-commit-order --objects {from}{pretty h}|]
|
||||
let procCfg = setStdin closed $ setStderr closed (shell cmd)
|
||||
(_, out, _) <- readProcess procCfg
|
||||
pure $ mapMaybe (fmap (fromString . LBS.unpack) . headMay . LBS.words) (LBS.lines out)
|
||||
|
@ -453,11 +457,59 @@ gitGetCommitImmediateDeps h = do
|
|||
pure $ fromString @GitHash $ LBS.unpack firstWord
|
||||
|
||||
|
||||
startGitHashObject :: GitObjectType -> IO (Process Handle () ())
|
||||
startGitHashObject :: MonadIO m => GitObjectType -> m (Process Handle () ())
|
||||
startGitHashObject objType = do
|
||||
let cmd = "git"
|
||||
let args = ["hash-object", "-w", "-t", show (pretty objType), "--stdin-paths"]
|
||||
let config = setStdin createPipe $ setStdout closed $ setStderr inherit $ proc cmd args
|
||||
startProcess config
|
||||
|
||||
startGitCatFile :: MonadIO m => m (Process Handle Handle ())
|
||||
startGitCatFile = do
|
||||
let cmd = "git"
|
||||
let args = ["cat-file", "--batch"]
|
||||
let config = setStdin createPipe $ setStdout createPipe $ setStderr closed $ proc cmd args
|
||||
startProcess config
|
||||
|
||||
gitReadFromCatFileBatch :: MonadIO m
|
||||
=> Process Handle Handle a
|
||||
-> GitHash
|
||||
-> m (Maybe GitObject)
|
||||
|
||||
gitReadFromCatFileBatch prc gh = do
|
||||
|
||||
let ssin = getStdin prc
|
||||
let sout = getStdout prc
|
||||
|
||||
liftIO $ hPrint ssin (pretty gh) >> hFlush ssin
|
||||
|
||||
runMaybeT do
|
||||
|
||||
here <- liftIO $ hWaitForInput sout 1000
|
||||
|
||||
guard here
|
||||
|
||||
header <- liftIO $ BS8.hGetLine sout
|
||||
|
||||
case BS8.unpack <$> BS8.words header of
|
||||
[ha, t, s] -> do
|
||||
(_, tp, size) <- MaybeT $ pure $ (,,) <$> fromStringMay @GitHash ha
|
||||
<*> fromStringMay @GitObjectType t
|
||||
<*> readMay s
|
||||
|
||||
content <- liftIO $ LBS.hGet sout size
|
||||
|
||||
guard (LBS.length content == fromIntegral size)
|
||||
|
||||
void $ liftIO $ LBS.hGet sout 1
|
||||
|
||||
let object = GitObject tp content
|
||||
|
||||
-- TODO: optionally-check-hash
|
||||
-- guard (gh== gitHashObject object)
|
||||
|
||||
pure object
|
||||
|
||||
_ -> MaybeT $ pure Nothing
|
||||
|
||||
|
||||
|
|
|
@ -1,11 +1,19 @@
|
|||
{-# Language AllowAmbiguousTypes #-}
|
||||
module HBS2Git.Export where
|
||||
{-# Language RankNTypes #-}
|
||||
{-# Language TemplateHaskell #-}
|
||||
module HBS2Git.Export
|
||||
( exportRefDeleted
|
||||
, exportRefOnly
|
||||
, runExport
|
||||
, ExportRepoOps
|
||||
) where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.OrDie
|
||||
import HBS2.System.Logger.Simple
|
||||
import HBS2.Net.Proto.Definition()
|
||||
import HBS2.Clock
|
||||
import HBS2.Base58
|
||||
|
||||
import HBS2.Git.Local
|
||||
|
@ -19,7 +27,6 @@ import HBS2Git.GitRepoLog
|
|||
import Control.Applicative
|
||||
import Control.Monad.Catch
|
||||
import Control.Monad.Reader
|
||||
import UnliftIO.Async
|
||||
import Control.Concurrent.STM
|
||||
import Data.ByteString.Lazy.Char8 qualified as LBS
|
||||
import Data.Foldable (for_)
|
||||
|
@ -37,14 +44,28 @@ import System.Directory
|
|||
import System.FilePath
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
import UnliftIO.IO
|
||||
import System.IO hiding (hClose,hPrint)
|
||||
import System.IO hiding (hClose,hPrint,hPutStrLn,hFlush)
|
||||
import System.IO.Temp
|
||||
import Control.Monad.Trans.Resource
|
||||
import Data.List.Split (chunksOf)
|
||||
|
||||
class ExportRepoOps a where
|
||||
|
||||
instance ExportRepoOps ()
|
||||
|
||||
data ExportEnv =
|
||||
ExportEnv
|
||||
{ _exportDB :: DBEnv
|
||||
, _exportWritten :: TVar (HashSet GitHash)
|
||||
, _exportFileName :: FilePath
|
||||
, _exportDir :: FilePath
|
||||
, _exportRepo :: RepoRef
|
||||
, _exportReadObject :: GitHash -> IO (Maybe GitObject)
|
||||
}
|
||||
|
||||
makeLenses 'ExportEnv
|
||||
|
||||
|
||||
exportRefDeleted :: forall o m . ( MonadIO m
|
||||
, MonadCatch m
|
||||
-- , MonadMask m
|
||||
|
@ -75,8 +96,7 @@ exportRefDeleted _ repo ref = do
|
|||
-- но вот если мы удаляем уже удаленную ссылку, то для ссылки 00..0
|
||||
-- будет ошибка где-то.
|
||||
|
||||
vals <- withDB db $ stateGetActualRefs <&> List.nub . fmap snd
|
||||
|
||||
vals <- withDB db $ stateGetLastKnownCommits 10
|
||||
let (ctxHead, ctxBs) = makeContextEntry vals
|
||||
|
||||
trace $ "DELETING REF CONTEXT" <+> pretty vals
|
||||
|
@ -106,11 +126,97 @@ makeContextEntry hashes = (entryHead, payload)
|
|||
payload = GitLogContextCommits (HashSet.fromList hashes) & serialise
|
||||
entryHead = GitLogEntry GitLogContext ha undefined
|
||||
|
||||
|
||||
newtype ExportT m a = ExportT { fromExportT :: ReaderT ExportEnv m a }
|
||||
deriving newtype ( Functor
|
||||
, Applicative
|
||||
, Monad
|
||||
, MonadIO
|
||||
, MonadTrans
|
||||
, MonadReader ExportEnv
|
||||
, MonadMask
|
||||
, MonadCatch
|
||||
, MonadThrow
|
||||
)
|
||||
|
||||
withExportEnv :: MonadIO m => ExportEnv -> ExportT m a -> m a
|
||||
withExportEnv env f = runReaderT (fromExportT f) env
|
||||
|
||||
writeLogSegments :: forall m . ( MonadIO m
|
||||
, HasCatAPI m
|
||||
, MonadMask m
|
||||
, HasRefCredentials m
|
||||
, HasConf m
|
||||
)
|
||||
=> ( Int -> m () )
|
||||
-> GitHash
|
||||
-> [GitHash]
|
||||
-> Int
|
||||
-> [(GitLogEntry, LBS.ByteString)]
|
||||
-> ExportT m [HashRef]
|
||||
|
||||
writeLogSegments onProgress val objs chunkSize trailing = do
|
||||
|
||||
db <- asks $ view exportDB
|
||||
written <- asks $ view exportWritten
|
||||
fname <- asks $ view exportFileName
|
||||
dir <- asks $ view exportDir
|
||||
remote <- asks $ view exportRepo
|
||||
readGit <- asks $ view exportReadObject
|
||||
|
||||
-- FIXME: fix-code-dup
|
||||
let meta = fromString $ show
|
||||
$ "hbs2-git" <> line
|
||||
<> "type:" <+> "hbs2-git-push-log"
|
||||
<> line
|
||||
|
||||
let segments = chunksOf chunkSize objs
|
||||
let totalSegments = length segments
|
||||
|
||||
forM (zip segments [1..]) $ \(segment, segmentIndex) -> do
|
||||
let fpath = dir </> fname <> "_" <> show segmentIndex
|
||||
bracket (liftIO $ openBinaryFile fpath AppendMode)
|
||||
(const $ pure ()) $ \fh -> do
|
||||
for_ segment $ \d -> do
|
||||
here <- liftIO $ readTVarIO written <&> HashSet.member d
|
||||
inState <- withDB db (stateIsLogObjectExists d)
|
||||
|
||||
lift $ onProgress 1
|
||||
|
||||
unless (here || inState) do
|
||||
|
||||
GitObject tp o <- liftIO $ readGit d `orDie` [qc|error reading object {pretty d}|]
|
||||
|
||||
let entry = GitLogEntry ( gitLogEntryTypeOf tp ) (Just d) ( fromIntegral $ LBS.length o )
|
||||
gitRepoLogWriteEntry fh entry o
|
||||
liftIO $ atomically $ modifyTVar written (HashSet.insert d)
|
||||
|
||||
-- gitRepoLogWriteEntry fh ctx ctxBs
|
||||
|
||||
trace $ "writing" <+> pretty tp <+> pretty d
|
||||
|
||||
when (segmentIndex == totalSegments) $ do
|
||||
for_ trailing $ \(e, bs) -> do
|
||||
gitRepoLogWriteEntry fh e bs
|
||||
|
||||
-- finalize log section
|
||||
hClose fh
|
||||
|
||||
content <- liftIO $ LBS.readFile fpath
|
||||
logMerkle <- lift $ storeObject meta content `orDie` [qc|Can't store push log|]
|
||||
|
||||
trace $ "PUSH LOG HASH: " <+> pretty logMerkle
|
||||
trace $ "POSTING REFERENCE UPDATE TRANSACTION" <+> pretty remote <+> pretty logMerkle
|
||||
|
||||
lift $ postRefUpdate remote 0 logMerkle
|
||||
|
||||
pure logMerkle
|
||||
|
||||
-- | Exports only one ref to the repo.
|
||||
-- Corresponds to a single ```git push``` operation
|
||||
exportRefOnly :: forall o m . ( MonadIO m
|
||||
, MonadCatch m
|
||||
-- , MonadMask m
|
||||
, MonadMask m
|
||||
, MonadUnliftIO m
|
||||
, HasCatAPI m
|
||||
, HasConf m
|
||||
|
@ -123,10 +229,11 @@ exportRefOnly :: forall o m . ( MonadIO m
|
|||
-> Maybe GitRef
|
||||
-> GitRef
|
||||
-> GitHash
|
||||
-> m HashRef
|
||||
-> m (Maybe HashRef)
|
||||
|
||||
exportRefOnly _ remote rfrom ref val = do
|
||||
|
||||
|
||||
let repoHead = RepoHead Nothing (HashMap.fromList [(ref,val)])
|
||||
|
||||
let repoHeadStr = (LBS.pack . show . pretty . AsGitRefsFile) repoHead
|
||||
|
@ -150,15 +257,23 @@ exportRefOnly _ remote rfrom ref val = do
|
|||
|
||||
trace $ "LAST_KNOWN_REV" <+> braces (pretty rfrom) <+> braces (pretty ref) <+> braces (pretty lastKnownRev)
|
||||
|
||||
entries <- gitRevList lastKnownRev val
|
||||
entries <- traceTime "gitRevList" $ gitRevList lastKnownRev val
|
||||
|
||||
let entryNum = length entries
|
||||
|
||||
-- NOTE: just-for-test-new-non-empty-push-to-another-branch-112
|
||||
|
||||
-- FIXME: may-blow-on-huge-repo-export
|
||||
types <- gitGetObjectTypeMany entries <&> Map.fromList
|
||||
types <- traceTime "gitGetObjectTypeMany" $ gitGetObjectTypeMany entries <&> Map.fromList
|
||||
|
||||
let lookupType t = Map.lookup t types
|
||||
let justOrDie msg x = pure x `orDie` msg
|
||||
|
||||
let onEntryType e = (fx $ lookupType e, e)
|
||||
where fx = \case
|
||||
Just Blob -> 0
|
||||
Just Tree -> 1
|
||||
Just Commit -> 2
|
||||
Nothing -> 3
|
||||
|
||||
trace $ "ENTRIES:" <+> pretty (length entries)
|
||||
|
||||
|
@ -166,98 +281,59 @@ exportRefOnly _ remote rfrom ref val = do
|
|||
|
||||
let fname = [qc|{pretty val}.data|]
|
||||
|
||||
-- TODO: investigate-on-signal-behaviour
|
||||
-- похоже, что в случае прилёта сигнала он тут не обрабатывается,
|
||||
-- и временный каталог остаётся
|
||||
runResourceT $ do
|
||||
|
||||
gitCatFile <- startGitCatFile
|
||||
|
||||
written <- liftIO $ newTVarIO (HashSet.empty :: HashSet GitHash)
|
||||
|
||||
let myTempDir = "hbs-git"
|
||||
temp <- liftIO getCanonicalTemporaryDirectory
|
||||
|
||||
(_,dir) <- allocate (createTempDirectory temp myTempDir) removeDirectoryRecursive
|
||||
|
||||
let fpath = dir </> fname
|
||||
fh <- liftIO $ openBinaryFile fpath AppendMode
|
||||
let (blobs, notBlobs) = List.partition (\e -> fst (onEntryType e) == 0) entries
|
||||
let (trees, notTrees) = List.partition (\e -> fst (onEntryType e) == 1) notBlobs
|
||||
-- FIXME: others-might-be-tags
|
||||
let (commits, others) = List.partition (\e -> fst (onEntryType e) == 2) notTrees
|
||||
|
||||
expMon <- newProgressMonitor "export objects" (length entries)
|
||||
-- FIXME: hbs2-git-size-hardcode-to-args
|
||||
let batch = 10000
|
||||
let objects = blobs <> trees <> others <> commits
|
||||
mon <- newProgressMonitor "write objects" (length objects)
|
||||
|
||||
enq <- liftIO newTQueueIO
|
||||
let env = ExportEnv
|
||||
{ _exportDB = db
|
||||
, _exportWritten = written
|
||||
, _exportFileName = fname
|
||||
, _exportDir = dir
|
||||
, _exportRepo = remote
|
||||
, _exportReadObject = gitReadFromCatFileBatch gitCatFile
|
||||
}
|
||||
|
||||
-- FIXME: export-wtf?
|
||||
|
||||
aread <- async $ do
|
||||
for_ entries $ \d -> do
|
||||
here <- liftIO $ readTVarIO written <&> HashSet.member d
|
||||
inState <- withDB db (stateIsLogObjectExists d)
|
||||
updateProgress expMon 1
|
||||
unless (here || inState) do
|
||||
tp <- lookupType d & justOrDie [qc|no object type for {pretty d}|]
|
||||
o <- gitReadObject (Just tp) d
|
||||
let entry = GitLogEntry ( gitLogEntryTypeOf tp ) (Just d) ( fromIntegral $ LBS.length o )
|
||||
liftIO $ atomically $ writeTQueue enq (Just (d,tp,entry,o))
|
||||
|
||||
liftIO $ atomically $ writeTQueue enq Nothing
|
||||
|
||||
fix \next -> do
|
||||
mbEntry <- liftIO $ atomically $ readTQueue enq
|
||||
case mbEntry of
|
||||
Nothing -> pure ()
|
||||
Just (d,tp,entry,o) -> do
|
||||
gitRepoLogWriteEntry fh entry o
|
||||
liftIO $ atomically $ modifyTVar written (HashSet.insert d)
|
||||
trace $ "writing" <+> pretty tp <+> pretty d
|
||||
-- TODO: here-split-log-to-parts
|
||||
next
|
||||
|
||||
mapM_ wait [aread]
|
||||
|
||||
-- FIXME: problem-log-is-not-assotiated-with-commit
|
||||
-- Если так получилось, что в журнале подъехала только ссылка,
|
||||
-- и больше нет никакой информации -- мы не можем определить
|
||||
-- глубину(высоту?) этой ссылки, и, соответственно, вычислить
|
||||
-- её depth в стейте.
|
||||
-- Решение: в этом (или иных) случаях добавлять информацию о контексте,
|
||||
-- например, состояние других известных ссылок в моменте. Список ссылок
|
||||
-- берём из state, полагая, что раз ссылка в стейте, значит, она является
|
||||
-- важной. Имея эту информацию, мы можем хоть как-то вычислять depth
|
||||
-- этого лога. Похоже на векторные часы, кстати.
|
||||
|
||||
-- это "нормальный" лог. даже если хвост его приедет пустым (не будет коммитов)
|
||||
-- тут мы запомним, что его контекст = коммит, на который он устанавливает ссылку
|
||||
-- и этот коммит должен быть в секциях лога, которые приехали перед ним.
|
||||
-- следствие: у предыдущего лога будет такая же глубина, как и у этого.
|
||||
|
||||
vals <- withDB db $ stateGetActualRefs <&> List.nub . fmap snd
|
||||
let (e, bs) = makeContextEntry (val:vals)
|
||||
trace $ "writing context entry" <+> pretty [val]
|
||||
gitRepoLogWriteEntry fh e bs
|
||||
|
||||
let ha = gitHashObject (GitObject Blob repoHeadStr)
|
||||
let headEntry = GitLogEntry GitLogEntryHead (Just ha) ( fromIntegral $ LBS.length repoHeadStr )
|
||||
gitRepoLogWriteEntry fh headEntry repoHeadStr
|
||||
|
||||
-- TODO: find-prev-push-log-and-make-ref
|
||||
gitRepoLogWriteHead fh (GitLogHeadEntry Nothing)
|
||||
let upd = updateProgress mon
|
||||
|
||||
hClose fh
|
||||
vals <- withDB db $ stateGetLastKnownCommits 10
|
||||
let (ctx, ctxBs) = makeContextEntry (List.nub $ val:vals)
|
||||
|
||||
trace "STORING PUSH LOG"
|
||||
-- we need context entries to determine log HEAD operation sequence
|
||||
-- so only the last section needs it alongwith headEntry
|
||||
logz <- lift $ withExportEnv env (writeLogSegments upd val objects batch [ (ctx, ctxBs)
|
||||
, (headEntry, repoHeadStr)
|
||||
])
|
||||
|
||||
let meta = fromString $ show
|
||||
$ "hbs2-git" <> line
|
||||
<> "type:" <+> "hbs2-git-push-log"
|
||||
<> line
|
||||
-- NOTE: отдаём только последнюю секцию лога,
|
||||
-- что бы оставить совместимость
|
||||
pure $ lastMay logz
|
||||
|
||||
content <- liftIO $ LBS.readFile fpath
|
||||
logMerkle <- lift $ storeObject meta content `orDie` [qc|Can't store push log|]
|
||||
|
||||
trace $ "PUSH LOG HASH: " <+> pretty logMerkle
|
||||
trace $ "POSTING REFERENCE UPDATE TRANSACTION" <+> pretty remote <+> pretty logMerkle
|
||||
|
||||
-- FIXME: calculate-seqno-as-topsort-order
|
||||
lift $ postRefUpdate remote 0 logMerkle
|
||||
|
||||
pure logMerkle
|
||||
|
||||
runExport :: forall m . (MonadIO m, MonadUnliftIO m, MonadCatch m, HasProgress (App m))
|
||||
runExport :: forall m . (MonadIO m, MonadUnliftIO m, MonadCatch m, HasProgress (App m), MonadMask (App m))
|
||||
=> Maybe FilePath -> RepoRef -> App m ()
|
||||
runExport fp repo = do
|
||||
|
||||
|
|
|
@ -200,12 +200,13 @@ importRefLogNew force ref = runResourceT do
|
|||
statePutLogImported h
|
||||
statePutTranImported e
|
||||
|
||||
mapM_ hClose handles
|
||||
|
||||
withDB db $ do
|
||||
statePutRefImported logRoot
|
||||
stateUpdateCommitDepths
|
||||
statePutRefImported logRoot
|
||||
savepointRelease sp0
|
||||
|
||||
mapM_ hClose handles
|
||||
|
||||
where
|
||||
|
||||
|
|
|
@ -12,7 +12,6 @@ import Data.Function
|
|||
import Database.SQLite.Simple
|
||||
import Database.SQLite.Simple.FromField
|
||||
import Database.SQLite.Simple.ToField
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Reader
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
import Data.String
|
||||
|
@ -26,6 +25,8 @@ import Data.UUID.V4 qualified as UUID
|
|||
import Control.Monad.Catch
|
||||
import Control.Concurrent.STM
|
||||
import System.IO.Unsafe
|
||||
import Data.Graph (graphFromEdges, topSort)
|
||||
import Data.Map qualified as Map
|
||||
|
||||
-- FIXME: move-orphans-to-separate-module
|
||||
|
||||
|
@ -360,36 +361,36 @@ stateGetActualRefValue ref = do
|
|||
where refname = ?
|
||||
|] (Only ref) <&> fmap fromOnly . listToMaybe
|
||||
|
||||
stateGetLastKnownCommits :: MonadIO m => Int -> DB m [GitHash]
|
||||
stateGetLastKnownCommits n = do
|
||||
conn <- ask
|
||||
liftIO $ query conn [qc|
|
||||
select kommit from logcommitdepth order by depth asc limit ?;
|
||||
|] (Only n) <&> fmap fromOnly
|
||||
|
||||
stateUpdateCommitDepths :: MonadIO m => DB m ()
|
||||
stateUpdateCommitDepths = do
|
||||
conn <- ask
|
||||
sp <- savepointNew
|
||||
|
||||
rows <- liftIO $ query_ @(GitHash, GitHash) conn [qc|SELECT kommit, parent FROM logcommitparent|]
|
||||
|
||||
-- TODO: check-it-works-on-huge-graphs
|
||||
let commitEdges = rows
|
||||
let (graph, nodeFromVertex, _) = graphFromEdges [(commit, commit, [parent]) | (commit, parent) <- commitEdges]
|
||||
let sortedVertices = topSort graph
|
||||
let sortedCommits = reverse [commit | vertex <- sortedVertices, let (commit, _, _) = nodeFromVertex vertex]
|
||||
let ordered = zip sortedCommits [1..]
|
||||
|
||||
savepointBegin sp
|
||||
-- TODO: check-if-delete-logcommitdepth-is-needed
|
||||
liftIO $ execute_ conn [qc|DELETE FROM logcommitdepth|]
|
||||
liftIO $ execute_ conn [qc|
|
||||
INSERT INTO logcommitdepth (kommit, depth)
|
||||
WITH RECURSIVE depths(kommit, level) AS (
|
||||
SELECT
|
||||
kommit,
|
||||
0
|
||||
FROM logcommitparent
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
p.kommit,
|
||||
d.level + 1
|
||||
FROM logcommitparent p
|
||||
INNER JOIN depths d ON p.parent = d.kommit
|
||||
)
|
||||
SELECT
|
||||
kommit,
|
||||
MAX(level)
|
||||
FROM depths
|
||||
WHERE kommit NOT IN (SELECT kommit FROM logcommitdepth)
|
||||
GROUP BY kommit;
|
||||
|]
|
||||
forM_ ordered $ \(co, n) -> do
|
||||
liftIO $ execute conn
|
||||
[qc| INSERT INTO logcommitdepth(kommit,depth)
|
||||
VALUES(?,?)
|
||||
ON CONFLICT(kommit)
|
||||
DO UPDATE SET depth = ?
|
||||
|] (co,n,n)
|
||||
pure ()
|
||||
savepointRelease sp
|
||||
|
||||
|
||||
|
|
|
@ -37,10 +37,12 @@ import Data.HashMap.Strict qualified as HashMap
|
|||
import Codec.Serialise
|
||||
import Control.Concurrent.STM
|
||||
import System.IO qualified as IO
|
||||
import UnliftIO.IO qualified as UIO
|
||||
import System.IO (Handle)
|
||||
import Data.Kind
|
||||
import Control.Monad.Catch
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Trans.Resource
|
||||
|
||||
import System.TimeIt
|
||||
|
||||
|
@ -167,6 +169,13 @@ instance (HasCatAPI m, MonadIO m) => HasCatAPI (MaybeT m) where
|
|||
getHttpPutAPI = lift getHttpPutAPI
|
||||
getHttpRefLogGetAPI = lift getHttpRefLogGetAPI
|
||||
|
||||
|
||||
instance (HasCatAPI m, MonadIO m) => HasCatAPI (ResourceT m) where
|
||||
getHttpCatAPI = lift getHttpCatAPI
|
||||
getHttpSizeAPI = lift getHttpSizeAPI
|
||||
getHttpPutAPI = lift getHttpPutAPI
|
||||
getHttpRefLogGetAPI = lift getHttpRefLogGetAPI
|
||||
|
||||
-- instance (HasCatAPI (App m), MonadIO m) => HasCatAPI (ResourceT (App m)) where
|
||||
-- getHttpCatAPI = lift getHttpCatAPI
|
||||
-- getHttpSizeAPI = lift getHttpSizeAPI
|
||||
|
@ -192,6 +201,7 @@ newtype App m a =
|
|||
, MonadReader AppEnv
|
||||
, MonadThrow
|
||||
, MonadCatch
|
||||
, MonadMask
|
||||
, MonadUnliftIO
|
||||
, MonadTrans
|
||||
)
|
||||
|
|
|
@ -545,3 +545,53 @@ test-suite test-misc
|
|||
, vector
|
||||
, terminal-progress-bar
|
||||
|
||||
|
||||
executable topsort-commits
|
||||
import: shared-properties
|
||||
import: common-deps
|
||||
default-language: Haskell2010
|
||||
|
||||
ghc-options:
|
||||
-- -prof
|
||||
-- -fprof-auto
|
||||
|
||||
other-modules:
|
||||
|
||||
-- other-extensions:
|
||||
|
||||
-- type: exitcode-stdio-1.0
|
||||
hs-source-dirs: test
|
||||
main-is: TopSortCommits.hs
|
||||
|
||||
build-depends:
|
||||
base, hbs2-core
|
||||
-- , async
|
||||
-- , attoparsec
|
||||
, bytestring
|
||||
-- , cache
|
||||
-- , clock
|
||||
, containers
|
||||
, interpolatedstring-perl6
|
||||
-- , data-default
|
||||
-- , data-textual
|
||||
-- , directory
|
||||
-- , hashable
|
||||
-- , microlens-platform
|
||||
-- , mtl
|
||||
-- , mwc-random
|
||||
-- , network
|
||||
-- , network-ip
|
||||
, prettyprinter
|
||||
-- , random
|
||||
, safe
|
||||
, serialise
|
||||
-- , stm
|
||||
-- , streaming
|
||||
-- , saltine
|
||||
, text
|
||||
, typed-process
|
||||
-- , transformers
|
||||
, uniplate
|
||||
-- , vector
|
||||
-- , fast-logger
|
||||
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
module Main where
|
||||
|
||||
import Control.Monad
|
||||
import Control.Monad (replicateM)
|
||||
import Data.Graph
|
||||
import Data.List.Split
|
||||
import System.Random
|
||||
|
||||
-- main = do
|
||||
-- input <- getContents
|
||||
-- let commitLines = lines input
|
||||
-- let commitEdges = [(commit, parent) | line <- commitLines, let [commit, parent] = splitOn "|" line]
|
||||
-- let (graph, nodeFromVertex, vertexFromKey) = graphFromEdges [(commit, commit, [parent]) | (commit, parent) <- commitEdges]
|
||||
-- let sortedVertices = topSort graph
|
||||
-- let sortedCommits = reverse [commit | vertex <- sortedVertices, let (commit, _, _) = nodeFromVertex vertex]
|
||||
-- let ordered = zip sortedCommits [1..]
|
||||
-- forM_ ordered \(s,n) -> putStrLn (s <> " " <> show n)
|
||||
|
||||
|
||||
|
||||
|
||||
generateCommitGraph :: Int -> IO [(String, String)]
|
||||
generateCommitGraph edgesCount = do
|
||||
gen <- getStdGen
|
||||
let commitIds = randomRs ('a','z') gen :: [Char]
|
||||
let commitNames = take edgesCount $ map (\id -> "commit" ++ [id]) commitIds
|
||||
let parentNames = "root" : init commitNames
|
||||
return $ zip commitNames parentNames
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
let edgesCount = 1000000 -- Set the desired number of edges
|
||||
commitEdges <- generateCommitGraph edgesCount
|
||||
mapM_ print commitEdges
|
||||
let (graph, nodeFromVertex, vertexFromKey) = graphFromEdges [(commit, commit, [parent]) | (commit, parent) <- commitEdges]
|
||||
let sortedVertices = topSort graph
|
||||
let sortedCommits = reverse [commit | vertex <- sortedVertices, let (commit, _, _) = nodeFromVertex vertex]
|
||||
let ordered = zip sortedCommits [1..]
|
||||
forM_ ordered \(s,n) -> putStrLn (s <> " " <> show n)
|
Loading…
Reference in New Issue