This commit is contained in:
voidlizard 2024-12-03 07:55:22 +03:00
parent 1c0952ad95
commit 778e172b9d
1 changed files with 268 additions and 226 deletions

View File

@ -1,4 +1,5 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language FunctionalDependencies #-}
{-# Language ViewPatterns #-}
{-# Language PatternSynonyms #-}
{-# Language RecordWildCards #-}
@ -67,6 +68,7 @@ import Text.InterpolatedString.Perl6 (qc)
import Data.HashSet qualified as HS
import Data.HashSet (HashSet(..))
import Data.HashMap.Strict qualified as HM
import Data.HashMap.Strict (HashMap(..))
import Data.Word
import Streaming.Prelude qualified as S
@ -97,6 +99,9 @@ type HBS2GitPerks m = (MonadUnliftIO m)
quit :: MonadUnliftIO m => m ()
quit = liftIO Q.exitSuccess
class Cached cache k v | cache -> k, cache -> v where
cached :: forall m . MonadIO m => cache -> k -> m v -> m v
data GitException =
CompressionError String
| DecompressionError String
@ -361,6 +366,10 @@ gitReadCommitParents bs = do
| ListVal [ StringLike "parent", StringLike hash ] <- what
] & catMaybes
gitObjectExists :: (MonadIO m, Pretty what) => what -> m Bool
gitObjectExists what = do
gitRunCommand [qc|git cat-file -e {pretty what}|] <&> isRight
data UState =
UHead ByteString
@ -420,8 +429,10 @@ enumGitPackObjectsFromLBS lbs action = do
data ExportState =
ExportGetCommit
| ExportProcessCommit GitHash ByteString
| ExportCheck
| ExportStart
| ExportExit
data WState =
WStart
@ -440,6 +451,260 @@ data WInput =
data EWState =
EWAcc Int [GitTreeEntry] Int [(GitHash, GitObjectType,Maybe GitTreeEntry, ByteString)]
newtype CacheTVH k v = CacheTVH (TVar (HashMap k v))
instance Hashable k => Cached (CacheTVH k v) k v where
cached (CacheTVH t) k a = do
what <- readTVarIO t <&> HM.lookup k
case what of
Just x -> pure x
Nothing -> do
r <- a
atomically $ modifyTVar t (HM.insert k r)
pure r
export :: ( HBS2GitPerks m
, MonadUnliftIO m
, MonadReader Git3Env m
, HasStorage m
, HasStateDB m
)
=> GitHash -> m ()
export r = connectedDo $ flip runContT pure do
debug $ green "export" <+> pretty r
q <- newTVarIO ( HPSQ.empty @GitHash @Double @() )
done <- newTVarIO ( mempty :: HashSet GitHash )
atomically $ modifyTVar q (HPSQ.insert r 1.0 ())
sto <- lift getStorage
reader <- ContT $ withGitCat
missed <- CacheTVH <$> newTVarIO mempty
ContT $ bracket none $ const do
hClose $ getStdin reader
lift $ flip fix ExportGetCommit $ \next -> \case
ExportStart -> do
here <- withState $ selectCBlock r <&> isJust
if here then next ExportCheck else next ExportGetCommit
ExportGetCommit -> do
co' <- atomically $ stateTVar q $ HPSQ.alterMin \case
Nothing -> (Nothing, Nothing)
Just (k,p,v) -> (Just (k,p), Nothing)
case co' of
Nothing -> do
debug $ red "go ExportCheck"
next ExportCheck
Just (co,prio) -> do
debug $ "Process commit" <+> pretty co <+> pretty prio
debug $ "check-pack-for" <+> pretty prio <+> pretty co
isDone <- readTVarIO done <&> HS.member co
let already = isDone
if already
then do
next ExportGetCommit
else do
(t,bs) <- liftIO (gitReadObjectMaybe reader co)
>>= orThrow (GitReadError (show $ pretty co))
parents <- gitReadCommitParents bs
n <- for (zip [1..] parents) $ \(i,gh) -> do
-- exists <- cached missed gh (gitObjectExists gh)
exists <- liftIO $ cached missed gh (isJust <$> gitReadObjectMaybe reader gh)
here <- withState $ selectCBlock gh <&> isJust
unless exists do
debug $ red "missed!" <+> pretty gh
-- atomically $ modifyTVar done (HS.insert gh)
atomically do
pdone <- readTVar done <&> HS.member gh
if pdone || here || not exists then do -- for shallow commits?
pure 0
else do
modifyTVar q (HPSQ.insert gh (prio-i) ())
pure 1
if sum n == 0 then do
next $ ExportProcessCommit co bs
else do
-- error "FUCK!"
debug $ yellow "put commit back" <+> pretty co
atomically $ modifyTVar q (HPSQ.insert co prio ())
next ExportGetCommit
ExportProcessCommit co bs -> do
debug $ "write pack for" <+> pretty co
hhead <- gitRevParse co
>>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co)
parents <- gitReadObjectThrow Commit hhead
>>= gitReadCommitParents
skip <- if not (excludeParents ()) then do
pure mempty
else do
skip' <- S.toList_ $ for parents $ \p -> do
-- exists <- liftIO $ cached missed p (gitObjectExists p)
exists <- liftIO $ cached missed p (isJust <$> gitReadObjectMaybe reader p)
when exists do
gitReadTree p <&> fmap gitEntryHash >>= S.each
pure $ HS.fromList skip'
r <- gitReadTree hhead
<&> L.filter (\GitTreeEntry{..} -> not (HS.member gitEntryHash skip))
-- <&> L.filter (\GitTreeEntry{..} -> gitEntryType /= Tree)
<&> sortGitTreeEntries
let blkMax = 1048576
out <- newTQueueIO
flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case
EWAcc _ [] _ [] -> none
EWAcc i [] l acc -> do
writePack sto l acc >>= atomically . writeTQueue out
EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do
writeLargeBlob sto reader r >>= atomically . writeTQueue out
go (EWAcc (succ i) rs l acc)
EWAcc i rs l acc | l >= blkMax -> do
writePack sto l acc >>= atomically . writeTQueue out
go (EWAcc (succ i) rs 0 mempty)
EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do
lbs <- gitReadObjectMaybe reader gitEntryHash
>>= orThrow (GitReadError (show $ pretty gitEntryHash))
<&> snd
go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc))
packs <- atomically $ STM.flushTQueue out
phashes <- catMaybes <$> withState (for parents selectCBlock)
let v = "hbs2-git 3.0 zstd"
let pps = vcat $ mconcat $ for phashes $ \p -> ["p" <+> pretty p]
let meta = LBS8.pack $ show $ pretty v <> line <> pps
hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef
let cblock = hmeta : phashes <> packs
let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock
root <- makeMerkle 0 pt $ \(_,_,s) -> do
void $ putBlock sto s
withState $ transactional do
for_ packs $ \href -> do
insertGitPack co href
insertCBlock co (HashRef root)
notice $ "cblock" <+> pretty root
atomically do
modifyTVar done (HS.insert co)
modifyTVar q (HPSQ.delete co)
next ExportGetCommit
ExportCheck -> do
debug $ "ExportCheck dummy" <+> pretty r
c <- withState $ selectCBlock r >>= orThrowUser "export failed"
liftIO $ hPrint stdout (pretty c)
next ExportExit
ExportExit -> finish
where
finish = none
writeLargeBlob sto reader GitTreeEntry{..} = liftIO do
size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash))
debug $ yellow "write large object" <+> pretty gitEntryHash
let p = Builder.byteString [qc|{pretty $ Short gitEntryType} {pretty size} {pretty gitEntryHash} {gitEntryName}|]
<> Builder.byteString "\n"
& LBS.toStrict . Builder.toLazyByteString
-- TODO: check-if-work-on-large-files
pieces <- S.toList_ do
stream <- lift $ ZstdS.compress maxCLevel
(t,lbs) <- gitReadObjectMaybe reader gitEntryHash
>>= orThrow (GitReadError (show $ pretty gitEntryHash))
let chunks = p : LBS.toChunks lbs
flip fix (chunks, stream) $ \go r ->
case r of
(c, Produce chunk continue) -> do
S.yield chunk
w <- lift continue
go (c,w)
([], Consume consume) -> do
x <- lift $ consume mempty
go ([],x)
(s:ss, Consume consume) -> do
x <- lift $ consume s
go (ss,x)
(_,Done bs) -> do
S.yield bs
(_,Error s1 s2) -> do
throwIO (CompressionError (s1 <> " " <> s2))
-- TODO: check-if-work-on-large-files
createTreeWithMetadata sto mzero mempty (LBS.fromChunks pieces)
>>= orThrowPassIO
writePack sto l racc = do
-- write
-- pack
-- merkle
let acc = reverse racc
debug $ green "write pack of objects" <+> pretty l <+> pretty (length acc)
parts <- for acc $ \(h,t,e,lbs) -> liftIO do
let ename = [qc|{fromMaybe mempty $ gitEntryName <$> e}|] :: ByteString
-- notice $ "pack" <+> pretty h <+> pretty t
let p = Builder.byteString [qc|{pretty $ Short t} {pretty (LBS.length lbs)} {pretty h} {ename}|]
<> Builder.byteString "\n"
<> Builder.lazyByteString lbs
<> Builder.byteString "\n"
pure p
let packed = Zstd.compress maxCLevel (LBS.toStrict $ Builder.toLazyByteString $ mconcat parts)
createTreeWithMetadata sto mzero mempty (LBS.fromStrict packed)
>>= orThrowPassIO
theDict :: forall m . ( HBS2GitPerks m
, HasClientAPI PeerAPI UNIX m
, HasStorage m
@ -540,237 +805,13 @@ theDict = do
putStrLn $ show $ pretty t <+> pretty h <+> pretty s
pure True
entry $ bindMatch "test:git:tree:export" $ nil_ $ \syn -> do
mkdir "export"
entry $ bindMatch "test:git:tree:export" $ nil_ $ \syn -> lift do
r <- case syn of
[] -> gitRevParseThrow "HEAD"
[ StringLike co ] -> gitRevParseThrow co
_ -> throwIO (BadFormException @C nil)
debug $ "process commit" <+> pretty r
q <- newTVarIO ( HPSQ.empty @GitHash @Double @() )
done <- newTVarIO ( mempty :: HashSet GitHash )
atomically $ modifyTVar q (HPSQ.insert r 1.0 ())
lift $ connectedDo do
sto <- getStorage
flip runContT pure do
reader <- ContT $ withGitCat
ContT $ bracket none $ const do
hClose $ getStdin reader
flip fix ExportGetCommit $ \next -> \case
ExportStart -> do
here <- lift $ withState $ selectCBlock r <&> isJust
if here then next ExportCheck else next ExportGetCommit
ExportGetCommit -> do
co' <- atomically $ stateTVar q $ HPSQ.alterMin \case
Nothing -> (Nothing, Nothing)
Just (k,p,v) -> (Just (k,p), Nothing)
case co' of
Nothing -> next ExportCheck
Just (co,prio) -> do
debug $ "Process commit" <+> pretty co
debug $ "check-pack-for" <+> pretty prio <+> pretty co
isDone <- readTVarIO done <&> HS.member co
let already = isDone
if already
then next ExportGetCommit
else do
(t,bs) <- liftIO (gitReadObjectMaybe reader co)
>>= orThrow (GitReadError (show $ pretty co))
parents <- gitReadCommitParents bs
n <- for (zip [1..] parents) $ \(i,gh) -> do
here <- lift $ withState $ selectCBlock gh <&> isJust
-- here <- pure False -- lift $ withState $ selectCBlock gh <&> isJust
atomically do
pdone <- readTVar done <&> HS.member gh
if pdone || here then do
pure 0
else do
modifyTVar q (HPSQ.insert gh (prio-i) ())
pure 1
if sum n == 0 then lift do
debug $ "write pack for" <+> pretty co
let dir = "export"
mkdir dir
hhead <- gitRevParse co
>>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co)
parents <- gitReadObjectThrow Commit hhead >>= gitReadCommitParents
skip <- if not (excludeParents ()) then do
pure mempty
else do
skip' <- S.toList_ $ for parents $ \p -> do
gitReadTree p <&> fmap gitEntryHash >>= S.each
pure $ HS.fromList skip'
r <- gitReadTree hhead
<&> L.filter (\GitTreeEntry{..} -> not (HS.member gitEntryHash skip))
-- <&> L.filter (\GitTreeEntry{..} -> gitEntryType /= Tree)
<&> sortGitTreeEntries
let blkMax = 1048576
out <- newTQueueIO
let writeLargeBlob n GitTreeEntry{..} = do
size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash))
debug $ yellow "write large object" <+> pretty gitEntryHash
let p = Builder.byteString [qc|{pretty $ Short gitEntryType} {pretty size} {pretty gitEntryHash} {gitEntryName}|]
<> Builder.byteString "\n"
& LBS.toStrict . Builder.toLazyByteString
liftIO do
-- TODO: check-if-work-on-large-files
pieces <- S.toList_ do
stream <- lift $ ZstdS.compress maxCLevel
let fn = dir </> show (pretty co) <> "." <> show n <> ".big" <> ".pack"
(t,lbs) <- gitReadObjectMaybe reader gitEntryHash
>>= orThrow (GitReadError (show $ pretty gitEntryHash))
let chunks = p : LBS.toChunks lbs
flip fix (chunks, stream) $ \go r ->
case r of
(c, Produce chunk continue) -> do
S.yield chunk
w <- lift continue
go (c,w)
([], Consume consume) -> do
x <- lift $ consume mempty
go ([],x)
(s:ss, Consume consume) -> do
x <- lift $ consume s
go (ss,x)
(_,Done bs) -> do
S.yield bs
debug "done!"
(_,Error s1 s2) -> do
throwIO (CompressionError (s1 <> " " <> s2))
-- TODO: check-if-work-on-large-files
href <- createTreeWithMetadata sto mzero mempty (LBS.fromChunks pieces)
>>= orThrowPassIO
atomically $ writeTQueue out href
let writePack i l racc = do
-- write
-- pack
-- merkle
let fn = dir </> show (pretty co) <> "." <> show (length racc) <> "." <> show i <> ".pack"
let acc = reverse racc
debug $ green "write pack of objects" <+> pretty l <+> pretty (length acc)
parts <- for acc $ \(h,t,e,lbs) -> liftIO do
let ename = [qc|{fromMaybe mempty $ gitEntryName <$> e}|] :: ByteString
-- notice $ "pack" <+> pretty h <+> pretty t
let p = Builder.byteString [qc|{pretty $ Short t} {pretty (LBS.length lbs)} {pretty h} {ename}|]
<> Builder.byteString "\n"
<> Builder.lazyByteString lbs
<> Builder.byteString "\n"
pure p
let packed = Zstd.compress maxCLevel (LBS.toStrict $ Builder.toLazyByteString $ mconcat parts)
href <- createTreeWithMetadata sto mzero mempty (LBS.fromStrict packed)
>>= orThrowPassIO
atomically $ writeTQueue out href
flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case
EWAcc _ [] _ [] -> none
EWAcc i [] l acc -> do
writePack i l acc
EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do
writeLargeBlob i r
go (EWAcc (succ i) rs l acc)
EWAcc i rs l acc | l >= blkMax -> do
writePack i l acc
go (EWAcc (succ i) rs 0 mempty)
EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do
lbs <- gitReadObjectMaybe reader gitEntryHash
>>= orThrow (GitReadError (show $ pretty gitEntryHash))
<&> snd
go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc))
packs <- atomically $ STM.flushTQueue out
phashes <- withState $ for parents \p -> do
selectCBlock p
>>= orThrowUser ("pack export failed" <+> pretty p)
let v = "hbs2-git 3.0 zstd"
let pps = vcat $ mconcat $ for phashes $ \p -> ["p" <+> pretty p]
let meta = LBS8.pack $ show $ pretty v <> line <> pps
hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef
let cblock = hmeta : phashes <> packs
let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock
root <- makeMerkle 0 pt $ \(_,_,s) -> do
void $ putBlock sto s
withState $ transactional do
for_ packs $ \href -> do
insertGitPack co href
insertCBlock co (HashRef root)
notice $ "cblock" <+> pretty root
atomically $ modifyTVar done (HS.insert co)
else do
atomically $ modifyTVar q (HPSQ.insert co prio ())
next ExportGetCommit
ExportCheck -> do
debug $ "ExportCheck dummy" <+> pretty r
c <- lift $ withState $ selectCBlock r >>= orThrowUser "export failed"
liftIO $ hPrint stdout (pretty c)
export r
-- debugPrefix :: LoggerEntry -> LoggerEntry
debugPrefix = toStderr . logPrefix "[debug] "
@ -812,4 +853,5 @@ main = flip runContT pure do
conf <- readLocalConf
let dict = theDict
recover $ run dict (conf <> cli)
`finally` silence