larger repos & some fixes

This commit is contained in:
Dmitry Zuikov 2023-04-03 08:38:42 +03:00
parent 0d78244314
commit c687c3654b
9 changed files with 294 additions and 100 deletions

View File

@ -1,4 +1,2 @@
(fixme-set "workflow" "test" "FkbL6CVp5Q")
(fixme-set "workflow" "test" "7MxDVXBd2e")
(fixme-set "workflow" "test" "8BdLTM4Ds1")
(fixme-set "workflow" "test" "8vUEBAHeh9")

View File

@ -257,4 +257,6 @@ main = do
shutUp
hPutStrLn stdout ""
hPutStrLn stderr ""

View File

@ -69,6 +69,7 @@ common shared-properties
, microlens-platform
, mtl
, prettyprinter
, prettyprinter-ansi-terminal
, safe
, serialise
, suckless-conf

View File

@ -6,11 +6,17 @@ import HBS2.Git.Types
import HBS2.System.Logger.Simple
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Monad.Writer
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy.Char8 (ByteString)
import Data.ByteString.Lazy.Char8 qualified as LBS
import Data.Functor
import Data.Function
import Data.Maybe
import Data.Set qualified as Set
@ -99,6 +105,64 @@ gitGetDependencies hash = do
_ -> pure mempty
gitGetAllDependencies :: MonadIO m
=> Int
-> [ GitHash ]
-> ( GitHash -> IO [GitHash] )
-> ( GitHash -> IO () )
-> m [(GitHash, GitHash)]
gitGetAllDependencies n objects lookup progress = liftIO do
input <- newTQueueIO
output <- newTQueueIO
memo <- newTVarIO ( mempty :: HashSet GitHash )
work <- newTVarIO ( mempty :: HashMap Int Int )
num <- newTVarIO 1
atomically $ mapM_ (writeTQueue input) objects
replicateConcurrently_ n $ do
i <- atomically $ stateTVar num ( \x -> (x, succ x) )
fix \next -> do
o <- atomically $ tryReadTQueue input
case o of
Nothing -> do
todo <- atomically $ do
modifyTVar work (HashMap.delete i)
readTVar work <&> HashMap.elems <&> sum
when (todo > 0) next
Just h -> do
progress h
done <- atomically $ do
here <- readTVar memo <&> HashSet.member h
modifyTVar memo (HashSet.insert h)
pure here
unless done do
cached <- lookup h
deps <- if null cached then do
gitGetDependencies h
else
pure cached
forM_ deps $ \d -> do
atomically $ writeTQueue output (h,d)
atomically $ modifyTVar work (HashMap.insert i (length deps))
next
liftIO $ atomically $ flushTQueue output
gitGetTransitiveClosure :: forall cache . (HasCache cache GitHash (Set GitHash) IO)
=> cache
-> Set GitHash
@ -112,7 +176,7 @@ gitGetTransitiveClosure cache exclude hash = do
Just xs -> pure xs
Nothing -> do
deps <- gitGetDependencies hash
clos <- mapConcurrently (gitGetTransitiveClosure cache exclude) deps
clos <- mapM (gitGetTransitiveClosure cache exclude) deps
let res = (Set.fromList (hash:deps) <> Set.unions clos) `Set.difference` exclude
cacheInsert cache hash res
pure res
@ -241,6 +305,16 @@ gitGetHash ref = do
else
pure Nothing
gitGetBranchHEAD :: MonadIO m => m (Maybe GitRef)
gitGetBranchHEAD = do
(code, out, _) <- readProcess (shell [qc|git rev-parse --abbrev-ref HEAD|])
if code == ExitSuccess then do
let hash = fromString . LBS.unpack <$> headMay (LBS.lines out)
pure hash
else
pure Nothing
gitListLocalBranches :: MonadIO m => m [(GitRef, GitHash)]
gitListLocalBranches = do

View File

@ -356,8 +356,6 @@ storeObjectHttpPut meta bs = do
let chu = chunks (fromIntegral defBlockSize) bs
trace $ length chu
rt <- liftIO $ Cache.newCache Nothing
-- FIXME: run-concurrently
@ -367,7 +365,7 @@ storeObjectHttpPut meta bs = do
let pt = toPTree (MaxSize 1024) (MaxNum 1024) hashes -- FIXME: settings
trace $ viaShow pt
-- trace $ viaShow pt
root <- makeMerkle 0 pt $ \(h,t,bss) -> do
liftIO $ Cache.insert rt h (t,bss)

View File

@ -16,6 +16,7 @@ import HBS2.Git.Local.CLI
import HBS2Git.App
import HBS2Git.State
import HBS2Git.Update
import HBS2Git.Config
import Data.Functor
import Data.List (sortBy)
@ -25,6 +26,7 @@ import Data.ByteString.Lazy.Char8 qualified as LBS
import Data.Cache as Cache
import Data.Foldable (for_)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
@ -32,6 +34,10 @@ import Lens.Micro.Platform
import Control.Concurrent.STM
import Control.Concurrent.Async
import Control.Monad.Catch
import Text.InterpolatedString.Perl6 (qc)
import System.Directory
import System.FilePath
import Prettyprinter.Render.Terminal
data HashCache =
HashCache
@ -73,115 +79,145 @@ export h repoHead = do
db <- dbEnv dbPath
cache <- newHashCache db
sp <- withDB db savepointNew
notice "calculate dependencies"
withDB db $ savepointBegin sp
for_ refs $ \(_, r) -> do
liftIO $ gitGetTransitiveClosure cache mempty r <&> Set.toList
rr <- try $ do
-- notice "store dependencies to state"
-- hashes <- readHashesFromBlock undefined
skip <- withDB db stateGetExported <&> HashSet.fromList
sz <- liftIO $ Cache.size (hCache cache)
mon1 <- newProgressMonitor "storing dependencies" sz
-- TODO: process-only-commits-to-make-first-run-faster
ooo <- gitListAllObjects <&> filter (not . (`HashSet.member` skip))
withDB db $ transactional do
els <- liftIO $ Cache.toList (hCache cache)
for_ els $ \(k,vs,_) -> do
updateProgress mon1 1
for_ (Set.toList vs) $ \ha -> do
stateAddDep k ha
cached0 <- withDB db stateGetAllDeps
let cached = HashMap.fromListWith (<>) [ (k, [v]) | (k,v) <- cached0 ]
let lookup h = pure $ HashMap.lookup h cached & fromMaybe mempty
deps <- withDB db $ do
x <- forM refs $ stateGetDeps . snd
pure $ mconcat x
monDep <- newProgressMonitor "calculate dependencies" (length ooo)
withDB db $ transactional do -- to speedup inserts
allDeps <- gitGetAllDependencies 4 ooo lookup (const $ updateProgress monDep 1)
let metaApp = "application:" <+> "hbs2-git" <> line
let sz = length allDeps
mon1 <- newProgressMonitor "storing dependencies" sz
let metaHead = fromString $ show
$ metaApp <> "type:" <+> "head" <> line
withDB db $ transactional do
for_ allDeps $ \(obj,dep) -> do
updateProgress mon1 1
stateAddDep dep obj
-- let gha = gitHashObject (GitObject Blob repoHead)
hh <- lift $ storeObject metaHead repoHeadStr `orDie` "cant save repo head"
deps <- withDB db $ do
x <- forM refs $ stateGetDepsRec . snd
pure $ mconcat x
mon3 <- newProgressMonitor "export objects from repo" (length deps)
withDB db $ transactional do -- to speedup inserts
for_ deps $ \d -> do
here <- stateGetHash d <&> isJust
-- FIXME: asap-check-if-objects-is-in-hbs2
unless here do
lbs <- gitReadObject Nothing d
let metaApp = "application:" <+> "hbs2-git" <> line
-- TODO: why-not-default-blob
-- anything is blob
tp <- gitGetObjectType d <&> fromMaybe Blob --
let metaHead = fromString $ show
$ metaApp <> "type:" <+> "head" <> line
let metaO = fromString $ show
$ metaApp
<> "type:" <+> pretty tp <+> pretty d
<> line
-- let gha = gitHashObject (GitObject Blob repoHead)
hh <- lift $ storeObject metaHead repoHeadStr `orDie` "cant save repo head"
hr' <- lift $ storeObject metaO lbs
mon3 <- newProgressMonitor "export objects from repo" (length deps)
maybe1 hr' (pure ()) $ \hr -> do
statePutHash tp d hr
for_ deps $ \d -> do
here <- stateGetHash d <&> isJust
-- FIXME: asap-check-if-objects-is-in-hbs2
unless here do
lbs <- gitReadObject Nothing d
updateProgress mon3 1
-- TODO: why-not-default-blob
-- anything is blob
tp <- gitGetObjectType d <&> fromMaybe Blob --
hashes <- (hh : ) <$> stateGetAllHashes
let metaO = fromString $ show
$ metaApp
<> "type:" <+> pretty tp <+> pretty d
<> line
let pt = toPTree (MaxSize 512) (MaxNum 512) hashes -- FIXME: settings
hr' <- lift $ storeObject metaO lbs
tobj <- liftIO newTQueueIO
-- FIXME: progress-indicator
root <- makeMerkle 0 pt $ \(ha,_,bss) -> do
liftIO $ atomically $ writeTQueue tobj (ha,bss)
maybe1 hr' (pure ()) $ \hr -> do
statePutHash tp d hr
objs <- liftIO $ atomically $ flushTQueue tobj
updateProgress mon3 1
mon2 <- newProgressMonitor "store objects" (length objs)
hashes <- (hh : ) <$> stateGetAllHashes
for_ objs $ \(ha,bss) -> do
updateProgress mon2 1
here <- lift $ getBlockSize (HashRef ha) <&> isJust
unless here do
void $ lift $ storeObject (fromString (show metaApp)) bss
let pt = toPTree (MaxSize 512) (MaxNum 512) hashes -- FIXME: settings
trace "generate update transaction"
tobj <- liftIO newTQueueIO
-- FIXME: progress-indicator
root <- makeMerkle 0 pt $ \(ha,_,bss) -> do
liftIO $ atomically $ writeTQueue tobj (ha,bss)
trace $ "objects:" <+> pretty (length hashes)
objs <- liftIO $ atomically $ flushTQueue tobj
seqno <- stateGetSequence <&> succ
-- FIXME: same-transaction-different-seqno
mon2 <- newProgressMonitor "store objects" (length objs)
postRefUpdate h seqno (HashRef root)
for_ objs $ \(ha,bss) -> do
updateProgress mon2 1
here <- lift $ getBlockSize (HashRef ha) <&> isJust
unless here do
void $ lift $ storeObject (fromString (show metaApp)) bss
let noRef = do
pause @'Seconds 20
shutUp
die $ show $ pretty "No reference appeared for" <+> pretty h
trace "generate update transaction"
wmon <- newProgressMonitor "waiting for ref" 20
void $ liftIO $ race noRef $ do
runApp NoLog do
fix \next -> do
v <- readRefHttp h
updateProgress wmon 1
case v of
Nothing -> pause @'Seconds 1 >> next
Just{} -> pure ()
trace $ "objects:" <+> pretty (length hashes)
pure (HashRef root, hh)
seqno <- stateGetSequence <&> succ
-- FIXME: same-transaction-different-seqno
postRefUpdate h seqno (HashRef root)
let noRef = do
pause @'Seconds 20
shutUp
die $ show $ pretty "No reference appeared for" <+> pretty h
wmon <- newProgressMonitor "waiting for ref" 20
void $ liftIO $ race noRef $ do
runApp NoLog do
fix \next -> do
v <- readRefHttp h
updateProgress wmon 1
case v of
Nothing -> pause @'Seconds 1 >> next
Just{} -> pure ()
withDB db $ transactional $ mapM_ statePutExported ooo
pure (HashRef root, hh)
case rr of
Left ( e :: SomeException ) -> do
withDB db (savepointRollback sp)
err $ viaShow e
shutUp
die "aborted"
Right r -> do
withDB db (savepointRelease sp)
pure r
runExport :: forall m . (MonadIO m, MonadCatch m, HasProgress (App m))
=> Maybe FilePath -> RepoRef -> App m ()
runExport fp h = do
trace $ "Export" <+> pretty (AsBase58 h)
let green = annotate (color Green)
let yellow = annotate (color Yellow)
let section = line <> line
liftIO $ putDoc $
line
<> green "Exporting to reflog" <+> pretty (AsBase58 h)
<> section
<> "it may take some time on the first run"
<> section
git <- asks (view appGitDir)
@ -189,26 +225,13 @@ runExport fp h = do
loadCredentials (maybeToList fp)
branches <- cfgValue @ConfBranch
-- FIXME: wtf-runExport
branchesGr <- cfgValue @ConfBranch <&> Set.map normalizeRef
headBranch' <- cfgValue @HeadBranch
trace $ "BRANCHES" <+> pretty (Set.toList branches)
let defSort a b = case (a,b) of
("master",_) -> LT
("main", _) -> LT
_ -> GT
let sortedBr = sortBy defSort $ Set.toList branches
let headBranch = fromMaybe "master"
$ headBranch' <|> (fromString <$> headMay sortedBr)
headBranch <- gitGetBranchHEAD `orDie` "undefined HEAD for repo"
refs <- gitListLocalBranches
<&> filter (\x -> Set.member (fst x) branchesGr)
<&> filter (\x -> Set.null branchesGr || Set.member (fst x) branchesGr)
trace $ "REFS" <+> pretty refs
@ -225,6 +248,37 @@ runExport fp h = do
updateLocalState h
info $ "head:" <+> pretty hhh
info $ "merkle:" <+> pretty root
shutUp
cwd <- liftIO getCurrentDirectory
cfgPath <- configPath cwd
let krf = fromMaybe "keyring-file" fp & takeFileName
liftIO $ putStrLn ""
liftIO $ putDoc $
"exported" <+> pretty hhh
<> section
<> green "Repository config:" <+> pretty (cfgPath </> "config")
<> section
<> "Put the keyring file" <+> yellow (pretty krf) <+> "into a safe place," <> line
<> "like encrypted directory or volume."
<> section
<> "You will need this keyring to push into the repository."
<> section
<> green "Add keyring into the repo's config:"
<> section
<> "keyring" <+> pretty [qc|"/my/safe/place/{krf}"|]
<> section
<> green "Add git remote:"
<> section
<> pretty [qc|git remote add remotename hbs2://{pretty h}|]
<> section
<> green "Work with git as usual:"
<> section
<> "git pull remotename" <> line
<> "(or git fetch remotename && git reset --hard remotename/branch)" <> line
<> "git push remotename" <> line
<> line

View File

@ -132,6 +132,12 @@ stateInit = do
)
|]
liftIO $ execute_ conn [qc|
create table if not exists exported
( githash text not null primary key
)
|]
newtype Savepoint =
Savepoint String
@ -182,6 +188,22 @@ transactional action = do
-- тогда можно будет откатиться на любое предыдущее
-- состояние репозитория
statePutExported :: MonadIO m => GitHash -> DB m ()
statePutExported h = do
conn <- ask
liftIO $ execute conn [qc|
insert into exported (githash) values(?)
on conflict (githash) do nothing
|] (Only h)
stateGetExported :: MonadIO m => DB m [GitHash]
stateGetExported = do
conn <- ask
liftIO $ query_ conn [qc|
select githash from exported
|] <&> fmap fromOnly
statePutImported :: MonadIO m => HashRef -> HashRef -> DB m ()
statePutImported merkle hd = do
conn <- ask
@ -232,11 +254,54 @@ stateAddDep h1 h2 = do
on conflict (object,parent) do nothing
|] (h1,h2)
stateGetDepsRec :: MonadIO m => GitHash -> DB m [GitHash]
stateGetDepsRec h = do
conn <- ask
liftIO $ query conn [qc|
WITH RECURSIVE find_children(object, parent) AS (
SELECT object, parent FROM dep WHERE parent = ?
UNION
SELECT d.object, d.parent FROM dep d INNER JOIN find_children fc
ON d.parent = fc.object
)
SELECT object FROM find_children group by object;
|] (Only h) <&> mappend [h] . fmap fromOnly
stateGetAllDeps :: MonadIO m => DB m [(GitHash,GitHash)]
stateGetAllDeps = do
conn <- ask
liftIO $ query_ conn [qc|
select parent, object from dep where parent = ?
|]
stateDepFilterAll :: MonadIO m => DB m [GitHash]
stateDepFilterAll = do
conn <- ask
liftIO $ query_ conn [qc|
select distinct(parent) from dep
union
select githash from object o where o.type = 'blob'
|] <&> fmap fromOnly
stateDepFilter :: MonadIO m => GitHash -> DB m Bool
stateDepFilter h = do
conn <- ask
liftIO $ query @_ @[Int] conn [qc|
select 1 from dep
where parent = ?
or exists (select null from object where githash = ? and type = 'blob')
limit 1
|] (h,h) <&> isJust . listToMaybe
stateGetDeps :: MonadIO m => GitHash -> DB m [GitHash]
stateGetDeps h = do
conn <- ask
liftIO $ query conn [qc|
select parent from dep where object = ?
select object from dep where parent = ?
|] (Only h) <&> fmap fromOnly

View File

@ -113,7 +113,9 @@ instance {-# OVERLAPPABLE #-} MonadIO m => HasProgress m where
updateProgress bar n = liftIO (incProgress bar n)
newProgressMonitor s total = liftIO $ liftIO $ newProgressBar st 10 (Progress 0 total ())
where
st = defStyle { stylePrefix = msg (fromString s) }
st = defStyle { stylePrefix = msg (fromString s)
, styleWidth = ConstantWidth 60
}
class MonadIO m => HasCatAPI m where
getHttpCatAPI :: m API

View File

@ -396,7 +396,7 @@ blockDownloadLoop env0 = do
pause @'Seconds 3.81
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
pause @'Seconds 60
pause @'Seconds 10
debug "I'm peer thread sweeping thread"
known <- knownPeers @e pl