mirror of https://github.com/voidlizard/hbs2
wip7
This commit is contained in:
parent
6105fb0446
commit
4941c5442c
|
@ -10,6 +10,6 @@ constraints:
|
|||
, http-client >=0.7.16 && <0.8
|
||||
|
||||
-- executable-static: True
|
||||
-- profiling: True
|
||||
profiling: True
|
||||
--library-profiling: False
|
||||
|
||||
|
|
|
@ -66,6 +66,7 @@ import Data.ByteString qualified as BS
|
|||
import Data.ByteString.Lazy (ByteString)
|
||||
import Data.ByteString.Builder as Builder
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
import Data.Set qualified as Set
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.HashSet (HashSet(..))
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
|
@ -101,8 +102,9 @@ quit :: MonadUnliftIO m => m ()
|
|||
quit = liftIO Q.exitSuccess
|
||||
|
||||
class Cached cache k v | cache -> k, cache -> v where
|
||||
cached :: forall m . MonadIO m => cache -> k -> m v -> m v
|
||||
uncache :: forall m . MonadIO m => cache -> k -> m ()
|
||||
isCached :: forall m . MonadIO m => cache -> k -> m Bool
|
||||
cached :: forall m . MonadIO m => cache -> k -> m v -> m v
|
||||
uncache :: forall m . MonadIO m => cache -> k -> m ()
|
||||
|
||||
data GitException =
|
||||
CompressionError String
|
||||
|
@ -456,6 +458,7 @@ data EWState =
|
|||
newtype CacheTVH k v = CacheTVH (TVar (HashMap k v))
|
||||
|
||||
instance Hashable k => Cached (CacheTVH k v) k v where
|
||||
isCached (CacheTVH t) k = readTVarIO t <&> HM.member k
|
||||
uncache (CacheTVH t) k = atomically (modifyTVar t (HM.delete k))
|
||||
cached (CacheTVH t) k a = do
|
||||
what <- readTVarIO t <&> HM.lookup k
|
||||
|
@ -476,6 +479,9 @@ newCacheFixedHPSQ :: MonadIO m => Int -> m (CacheFixedHPSQ k v)
|
|||
newCacheFixedHPSQ l = CacheFixedHPSQ l <$> newTVarIO HPSQ.empty
|
||||
|
||||
instance (Ord k, Hashable k) => Cached (CacheFixedHPSQ k v) k v where
|
||||
|
||||
isCached CacheFixedHPSQ{..} k = readTVarIO _theCache <&> HPSQ.member k
|
||||
|
||||
uncache CacheFixedHPSQ{..} k = atomically $ modifyTVar _theCache (HPSQ.delete k)
|
||||
|
||||
cached CacheFixedHPSQ{..} k a = do
|
||||
|
@ -514,14 +520,38 @@ export r = connectedDo $ flip runContT pure do
|
|||
|
||||
sto <- lift getStorage
|
||||
|
||||
reader <- ContT $ withGitCat
|
||||
reader <- ContT $ withGitCat
|
||||
reader2 <- ContT $ withGitCat
|
||||
|
||||
let commitCacheSize = 2000
|
||||
|
||||
missed <- CacheTVH <$> newTVarIO mempty
|
||||
commits <- newCacheFixedHPSQ 1000
|
||||
commits <- newCacheFixedHPSQ commitCacheSize
|
||||
|
||||
deferred <- newTQueueIO
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
hClose $ getStdin reader
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
hClose $ getStdin reader2
|
||||
|
||||
ContT $ withAsync $ replicateM_ 2 $ forever do
|
||||
join $ atomically (readTQueue deferred)
|
||||
|
||||
-- let noCBlock x = do
|
||||
-- here <- withState $ selectCBlock x
|
||||
-- pure (isNothing here)
|
||||
|
||||
-- pre <- gitRunCommand [qc|git rev-list {pretty r}|]
|
||||
-- <&> fromRight mempty
|
||||
-- <&> LBS8.lines
|
||||
-- <&> mapMaybe ( fromStringMay @GitHash . LBS8.unpack )
|
||||
-- <&> take (10 * commitCacheSize)
|
||||
-- >>= filterM (lift . noCBlock)
|
||||
-- <&> take commitCacheSize
|
||||
-- >>= \xs -> lift (mapM_ (\x -> cached commits x (gitReadObjectMaybe reader x)) xs)
|
||||
|
||||
lift $ flip fix ExportGetCommit $ \next -> \case
|
||||
|
||||
ExportStart -> do
|
||||
|
@ -587,7 +617,6 @@ export r = connectedDo $ flip runContT pure do
|
|||
hhead <- gitRevParse co
|
||||
>>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co)
|
||||
|
||||
|
||||
parents <- gitReadObjectThrow Commit hhead
|
||||
>>= gitReadCommitParents
|
||||
|
||||
|
@ -608,19 +637,24 @@ export r = connectedDo $ flip runContT pure do
|
|||
|
||||
let blkMax = 1048576
|
||||
|
||||
packs <- S.toList_ $ flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case
|
||||
-- wtf <- ContT $ withAsync do
|
||||
-- pure ()
|
||||
|
||||
out <- newTQueueIO
|
||||
|
||||
flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case
|
||||
|
||||
EWAcc _ [] _ [] -> none
|
||||
|
||||
EWAcc i [] l acc -> do
|
||||
lift (writePack sto l acc) >>= S.yield
|
||||
writePack sto l acc >>= atomically . writeTQueue out
|
||||
|
||||
EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do
|
||||
writeLargeBlob sto reader r >>= S.yield
|
||||
atomically $ writeTQueue deferred $ writeLargeBlob sto reader2 r >>= atomically . writeTQueue out
|
||||
go (EWAcc (succ i) rs l acc)
|
||||
|
||||
EWAcc i rs l acc | l >= blkMax -> do
|
||||
lift (writePack sto l acc) >>= S.yield
|
||||
atomically $ writeTQueue deferred $ writePack sto l acc >>= atomically . writeTQueue out
|
||||
go (EWAcc (succ i) rs 0 mempty)
|
||||
|
||||
EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do
|
||||
|
@ -631,6 +665,11 @@ export r = connectedDo $ flip runContT pure do
|
|||
|
||||
go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc))
|
||||
|
||||
packs <- atomically do
|
||||
allDone <- isEmptyTQueue deferred
|
||||
unless allDone STM.retry
|
||||
STM.flushTQueue out
|
||||
|
||||
phashes <- catMaybes <$> withState (for parents selectCBlock)
|
||||
|
||||
let v = "hbs2-git 3.0 zstd"
|
||||
|
@ -639,7 +678,7 @@ export r = connectedDo $ flip runContT pure do
|
|||
|
||||
hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef
|
||||
|
||||
let cblock = hmeta : phashes <> packs
|
||||
let cblock = hmeta : uniqAndOrdered phashes <> uniqAndOrdered packs
|
||||
let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock
|
||||
|
||||
root <- makeMerkle 0 pt $ \(_,_,s) -> do
|
||||
|
@ -669,6 +708,8 @@ export r = connectedDo $ flip runContT pure do
|
|||
where
|
||||
finish = none
|
||||
|
||||
uniqAndOrdered = Set.toList . Set.fromList
|
||||
|
||||
writeLargeBlob sto reader GitTreeEntry{..} = liftIO do
|
||||
size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash))
|
||||
debug $ yellow "write large object" <+> pretty gitEntryHash
|
||||
|
|
Loading…
Reference in New Issue