mirror of https://github.com/voidlizard/hbs2
wip2
This commit is contained in:
parent
3773c7857b
commit
6e39900d6b
|
@ -706,34 +706,6 @@ instance (Ord k, Hashable k) => Cached (CacheFixedHPSQ k v) k v where
|
|||
|
||||
pure v
|
||||
|
||||
data RCC =
|
||||
RCC [GitHash] (HashMap GitHash (HashSet GitHash))
|
||||
|
||||
readCommitChain :: ( HBS2GitPerks m
|
||||
, MonadUnliftIO m
|
||||
, MonadReader Git3Env m
|
||||
, HasStorage m
|
||||
, HasStateDB m
|
||||
)
|
||||
=> Maybe GitRef
|
||||
-> GitHash
|
||||
-> ( GitHash -> m () )
|
||||
-> m (HashMap GitHash (HashSet GitHash))
|
||||
|
||||
readCommitChain _ h0 action = flip runContT pure $ callCC \_ -> do
|
||||
theReader <- ContT $ withGitCat
|
||||
void $ ContT $ bracket (pure theReader) stopProcess
|
||||
flip fix (RCC [h0] mempty) $ \next -> \case
|
||||
RCC [] seen -> pure seen
|
||||
RCC ( h : hs ) seen | HM.member h seen -> next ( RCC hs seen )
|
||||
RCC ( h : hs ) seen -> do
|
||||
lift (action h)
|
||||
co <- gitReadObjectMaybe theReader h >>= orThrowUser ("object not found" <+> pretty h)
|
||||
parents <- gitReadCommitParents (Just h) (snd co)
|
||||
debug $ "processed commit" <+> pretty h
|
||||
next $ RCC ( parents <> hs ) (HM.insertWith (<>) h (HS.fromList parents) seen)
|
||||
|
||||
|
||||
data HCC =
|
||||
HCC { hccHeight :: Int
|
||||
, hccRest :: [GitHash]
|
||||
|
@ -746,23 +718,36 @@ readCommitChainHPSQ :: ( HBS2GitPerks m
|
|||
, HasStorage m
|
||||
, HasStateDB m
|
||||
)
|
||||
=> Maybe GitRef
|
||||
=> (GitHash -> m Bool)
|
||||
-> Maybe GitRef
|
||||
-> GitHash
|
||||
-> (GitHash -> m ())
|
||||
-> m (HashPSQ GitHash Int (HashSet GitHash))
|
||||
|
||||
readCommitChainHPSQ _ h0 action = flip runContT pure $ callCC \_ -> do
|
||||
readCommitChainHPSQ filt _ h0 action = flip runContT pure $ callCC \_ -> do
|
||||
theReader <- ContT $ withGitCat
|
||||
void $ ContT $ bracket (pure theReader) stopProcess
|
||||
flip fix (HCC 0 [h0] HPSQ.empty) $ \next -> \case
|
||||
|
||||
HCC _ [] result -> pure result
|
||||
|
||||
HCC n ( h : hs ) result | HPSQ.member h result -> next ( HCC n hs result )
|
||||
|
||||
HCC n ( h : hs ) result -> do
|
||||
co <- gitReadObjectMaybe theReader h
|
||||
>>= orThrow(GitReadError $ show $ pretty "object not found" <+> pretty h)
|
||||
parents <- gitReadCommitParents (Just h) (snd co)
|
||||
lift $ action h
|
||||
next $ HCC (n-1) ( parents <> hs ) (snd $ HPSQ.alter (addParents () n parents) h result )
|
||||
|
||||
done <- not <$> lift ( filt h )
|
||||
|
||||
if done then next (HCC n hs result) else do
|
||||
|
||||
co <- gitReadObjectMaybe theReader h
|
||||
>>= orThrow(GitReadError $ show $ pretty "object not found" <+> pretty h)
|
||||
|
||||
parents <- gitReadCommitParents (Just h) (snd co)
|
||||
|
||||
lift $ action h
|
||||
next $ HCC (n-1) ( parents <> hs ) (snd $ HPSQ.alter (addParents () n parents) h result )
|
||||
|
||||
|
||||
where
|
||||
addParents :: a
|
||||
-> Int
|
||||
|
@ -1238,86 +1223,6 @@ indexCBlockCommits :: forall m . ( MonadIO m
|
|||
indexCBlockCommits cb = do
|
||||
pure ()
|
||||
|
||||
class ReadLogOpts a where
|
||||
|
||||
data ReadLogError = SomeReadLogError
|
||||
deriving stock (Typeable, Show)
|
||||
|
||||
instance Exception ReadLogError
|
||||
|
||||
instance ReadLogOpts ()
|
||||
|
||||
type NumBytes = Int
|
||||
|
||||
class Monad m => BytesReader m where
|
||||
noBytesLeft :: m Bool
|
||||
readBytes :: NumBytes -> m ByteString
|
||||
|
||||
readBytesMaybe :: NumBytes -> m (Maybe ByteString)
|
||||
readBytesMaybe n = do
|
||||
bs <- readBytes n
|
||||
if LBS.length bs == fromIntegral n then pure (Just bs) else pure Nothing
|
||||
|
||||
newtype ConsumeLBS m a = ConsumeLBS { fromConsumeLBS :: StateT ByteString m a }
|
||||
deriving newtype ( Applicative
|
||||
, Functor
|
||||
, Monad
|
||||
, MonadState ByteString
|
||||
, MonadIO
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
readChunkThrow :: MonadIO m => Int -> ConsumeLBS m ByteString
|
||||
readChunkThrow n = do
|
||||
lbs <- get
|
||||
let (this, that) = LBS.splitAt (fromIntegral n) lbs
|
||||
if LBS.length this /= fromIntegral n then
|
||||
throwIO SomeReadLogError
|
||||
else do
|
||||
put $! that
|
||||
pure this
|
||||
|
||||
readChunkSimple :: Monad m => Int -> ConsumeLBS m ByteString
|
||||
readChunkSimple n = do
|
||||
lbs <- get
|
||||
let (this, that) = LBS.splitAt (fromIntegral n) lbs
|
||||
put $! that
|
||||
pure this
|
||||
|
||||
reminds :: Monad m => ConsumeLBS m Int
|
||||
reminds = gets (fromIntegral . LBS.length)
|
||||
|
||||
consumed :: Monad m => ConsumeLBS m Bool
|
||||
consumed = gets LBS.null
|
||||
|
||||
runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a
|
||||
runConsumeLBS s m = evalStateT (fromConsumeLBS m) s
|
||||
|
||||
newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a }
|
||||
deriving newtype ( Applicative
|
||||
, Functor
|
||||
, Monad
|
||||
, MonadState BS.ByteString
|
||||
, MonadIO
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
|
||||
instance Monad m => BytesReader (ConsumeLBS m) where
|
||||
readBytes n = readChunkSimple n
|
||||
noBytesLeft = consumed
|
||||
|
||||
instance Monad m => BytesReader (ConsumeBS m) where
|
||||
noBytesLeft = gets BS.null
|
||||
readBytes n = do
|
||||
s <- get
|
||||
let (a,b) = BS.splitAt n s
|
||||
put $! b
|
||||
pure (LBS.fromStrict a)
|
||||
|
||||
runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a
|
||||
runConsumeBS s m = evalStateT (fromConsumeBS m) s
|
||||
|
||||
readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts, BytesReader m )
|
||||
=> opts
|
||||
-> ( GitHash -> Int -> ByteString -> m () )
|
||||
|
@ -1719,78 +1624,6 @@ theDict = do
|
|||
|
||||
export (w <|> re <|> hd) r
|
||||
|
||||
entry $ bindMatch "test:git:read-commit-chain-full" $ nil_ $ \syn -> lift do
|
||||
let (opts, argz) = splitOpts [("--threads",1)] syn
|
||||
|
||||
|
||||
xead <- headDef "HEAD" [ x | StringLike x <- argz ] & gitRevParseThrow
|
||||
let tnum = headDef 1 [fromIntegral x | ListVal [StringLike "--threads", LitIntVal x] <- opts]
|
||||
|
||||
co <- readCommitChain Nothing xead dontHandle
|
||||
let total = HM.size co
|
||||
_n <- newTVarIO 0
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
_fake <- newTVarIO 0
|
||||
|
||||
void $ ContT $ withAsync $ flip fix (0,t0) $ \next (x,tt0) -> do
|
||||
pause @'Seconds 1
|
||||
t1 <- getTimeCoarse
|
||||
n <- readTVarIO _n
|
||||
let dt = 1e-9 * realToFrac (t1 - tt0)
|
||||
|
||||
-- when (dt <= 0) $ next (n,t1)
|
||||
|
||||
let dn = realToFrac (n-x)
|
||||
let v = realToFrac $ dn / dt
|
||||
let est = if v > 0 then realToFrac (total - n) / v :: Fixed E2 else 0
|
||||
sz <- readTVarIO _fake
|
||||
|
||||
debug $ "read"
|
||||
<+> pretty n <+> "/" <+> pretty total
|
||||
<+> "elapsed" <+> pretty (ceiling $ realToFrac (t1-t0) * 1e-9 )
|
||||
<+> "dn" <+> pretty dn
|
||||
<+> "sz" <+> pretty (realToFrac sz / (1024*1024) :: Fixed E2)
|
||||
<+> pretty (realToFrac @(Fixed E2) v) <+> "per sec"
|
||||
<+> "est." <+> pretty est
|
||||
|
||||
next (n,t1)
|
||||
|
||||
_already <- newTVarIO ( mempty :: HashSet GitHash )
|
||||
|
||||
let chunks = chunksOf (total `div` tnum) (HM.keys co)
|
||||
|
||||
liftIO $ pooledForConcurrentlyN_ tnum chunks $ \chunk -> flip runContT pure do
|
||||
theReader <- ContT $ withGitCat
|
||||
void $ ContT $ bracket none (const $ stopProcess theReader)
|
||||
for_ chunk $ \commit -> do
|
||||
|
||||
hashes <- gitReadTreeObjectsOnly commit
|
||||
>>= filterM ( \x -> readTVarIO _already <&> not . HS.member x)
|
||||
|
||||
-- hashes <- gitReadTree commit
|
||||
-- <&> fmap gitEntryHash
|
||||
-- >>= filterM ( \x -> readTVarIO _already <&> not . HS.member x)
|
||||
|
||||
atomically $ modifyTVar _n succ
|
||||
|
||||
for_ hashes $ \gh -> do
|
||||
|
||||
(_t,lbs) <- gitReadObjectMaybe theReader gh
|
||||
>>= orThrow (GitReadError (show $ pretty gh))
|
||||
|
||||
let l = sum $ fmap BS.length (LBS.toChunks lbs)
|
||||
|
||||
atomically do
|
||||
modifyTVar' _fake (+ l)
|
||||
modifyTVar' _already (HS.insert gh)
|
||||
|
||||
n <- readTVarIO _n
|
||||
liftIO $ print $ pretty "read objects" <+> pretty n <+> "of" <+> pretty total
|
||||
|
||||
entry $ bindMatch "test:git:read-commit-chain" $ nil_ $ \syn -> do
|
||||
(mpath, hss) <- case syn of
|
||||
[ HashLike s ] -> pure (Nothing, s)
|
||||
|
@ -1801,14 +1634,14 @@ theDict = do
|
|||
liftIO $ mapM_ setCurrentDirectory mpath
|
||||
-- let hss = headDef "HEAD" [ x | StringLike x <- snd (splitOpts [] syn) ]
|
||||
h <- gitRevParseThrow hss
|
||||
r <- lift $ readCommitChain Nothing h dontHandle
|
||||
liftIO $ print ( HM.size r )
|
||||
r <- lift $ readCommitChainHPSQ (const $ pure True) Nothing h dontHandle
|
||||
liftIO $ print ( HPSQ.size r )
|
||||
|
||||
entry $ bindMatch "test:git:read-commit-chain-dfs" $ nil_ $ \syn -> lift do
|
||||
let (_, argz) = splitOpts [] syn
|
||||
let hd = headDef "HEAD" [ x | StringLike x <- argz]
|
||||
h <- gitRevParseThrow hd
|
||||
r <- readCommitChainHPSQ Nothing h (\c -> debug $ "commit" <+> pretty c)
|
||||
r <- readCommitChainHPSQ (const $ pure True) Nothing h (\c -> debug $ "commit" <+> pretty c)
|
||||
<&> HPSQ.toList
|
||||
<&> sortBy (comparing (view _2))
|
||||
for_ r $ \(c,_,_) -> do
|
||||
|
@ -2118,35 +1951,34 @@ theDict = do
|
|||
|
||||
entry $ bindMatch "test:git:export-commit-dfs" $ nil_ $ \syn -> lift do
|
||||
let (opts, argz) = splitOpts [("--index",1)] syn
|
||||
let hd = headDef "HEAD" [ x | StringLike x <- argz]
|
||||
h <- gitRevParseThrow hd
|
||||
hpsq <- readCommitChainHPSQ Nothing h (\c -> debug $ "commit" <+> pretty c)
|
||||
|
||||
let useIndex = headMay [ f | ListVal [StringLike "--index", StringLike f] <- opts ]
|
||||
|
||||
let hd = headDef "HEAD" [ x | StringLike x <- argz]
|
||||
h <- gitRevParseThrow hd
|
||||
|
||||
mmaped <- runMaybeT do
|
||||
fname <- toMPlus useIndex
|
||||
liftIO $ mmapFileByteString fname Nothing
|
||||
|
||||
let r = HPSQ.toList hpsq
|
||||
& sortBy (comparing (view _2))
|
||||
& fmap (view _1)
|
||||
|
||||
let total = HPSQ.size hpsq
|
||||
|
||||
_already <- newTVarIO mempty
|
||||
|
||||
let
|
||||
notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool
|
||||
notWrittenYet x = do
|
||||
already <- readTVarIO _already <&> HS.member x
|
||||
-- alsoInIdx <- maybe1 mmaped (pure False) $ \m -> do
|
||||
-- let found = binarySearch m 24 (coerce x) & isJust
|
||||
-- -- error $ show $ "MOTHERFUCKER!" <+> pretty x <+> pretty found
|
||||
-- pure found
|
||||
alsoInIdx <- maybe1 mmaped (pure False) $ \m -> do
|
||||
found <- binarySearchBS 24 (BS.take 20 . BS.drop 4) (coerce x) m
|
||||
pure (isJust found)
|
||||
pure (not already && not alsoInIdx)
|
||||
|
||||
pure (not already) -- && not alsoInIdx)
|
||||
hpsq <- readCommitChainHPSQ notWrittenYet Nothing h (\c -> debug $ "commit" <+> pretty c)
|
||||
|
||||
let r = HPSQ.toList hpsq
|
||||
& sortBy (comparing (view _2))
|
||||
& fmap (view _1)
|
||||
|
||||
let total = HPSQ.size hpsq
|
||||
|
||||
liftIO $ flip runContT pure do
|
||||
|
||||
|
@ -2184,15 +2016,16 @@ theDict = do
|
|||
>>= filterM notWrittenYet
|
||||
|
||||
for_ hashes $ \gh -> do
|
||||
atomically $ modifyTVar _already (HS.insert gh)
|
||||
(_t,lbs) <- gitReadObjectMaybe theReader gh
|
||||
>>= orThrow (GitReadError (show $ pretty gh))
|
||||
|
||||
let e = [ Builder.byteString (coerce gh)
|
||||
, Builder.char8 (headDef 'B' $ show $ pretty $ Short _t)
|
||||
, Builder.lazyByteString lbs
|
||||
] & Builder.toLazyByteString . mconcat
|
||||
|
||||
atomically do
|
||||
modifyTVar _already (HS.insert gh)
|
||||
writeTBQueue sourceQ (Just e)
|
||||
|
||||
ContT $ withAsync $ forever do
|
||||
|
@ -2234,7 +2067,7 @@ linearSearchLBS hash lbs = do
|
|||
pure $ listToMaybe found
|
||||
|
||||
|
||||
binarySearchBS :: MonadIO m
|
||||
binarySearchBS :: Monad m
|
||||
=> Int -- ^ record size
|
||||
-> ( BS.ByteString -> BS.ByteString ) -- ^ key extractor
|
||||
-> BS.ByteString -- ^ key
|
||||
|
|
|
@ -19,7 +19,7 @@ common shared-properties
|
|||
-threaded
|
||||
-rtsopts
|
||||
-O2
|
||||
"-with-rtsopts=-N4 -A64m -AL256m -I0"
|
||||
"-with-rtsopts=-N4 -A64m -AL256m -I0 -M4G"
|
||||
|
||||
default-language: GHC2021
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import Data.ByteString.Lazy qualified as LBS
|
|||
import Data.ByteString qualified as BS
|
||||
import Data.Maybe
|
||||
import Network.ByteOrder hiding (ByteString)
|
||||
import Control.Monad.State
|
||||
|
||||
import Codec.Compression.Zstd qualified as Zstd
|
||||
import Codec.Compression.Zstd.Streaming qualified as Zstd
|
||||
|
@ -17,6 +18,92 @@ import Control.Exception
|
|||
|
||||
-- import UnliftIO
|
||||
|
||||
class ReadLogOpts a where
|
||||
|
||||
data ReadLogError = SomeReadLogError
|
||||
deriving stock (Typeable, Show)
|
||||
|
||||
instance Exception ReadLogError
|
||||
|
||||
instance ReadLogOpts ()
|
||||
|
||||
type NumBytes = Int
|
||||
|
||||
class Monad m => BytesReader m where
|
||||
noBytesLeft :: m Bool
|
||||
readBytes :: NumBytes -> m ByteString
|
||||
|
||||
readBytesMaybe :: NumBytes -> m (Maybe ByteString)
|
||||
readBytesMaybe n = do
|
||||
bs <- readBytes n
|
||||
if LBS.length bs == fromIntegral n then pure (Just bs) else pure Nothing
|
||||
|
||||
newtype ConsumeLBS m a = ConsumeLBS { fromConsumeLBS :: StateT ByteString m a }
|
||||
deriving newtype ( Applicative
|
||||
, Functor
|
||||
, Monad
|
||||
, MonadState ByteString
|
||||
, MonadIO
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
readChunkThrow :: MonadIO m => Int -> ConsumeLBS m ByteString
|
||||
readChunkThrow n = do
|
||||
lbs <- get
|
||||
let (this, that) = LBS.splitAt (fromIntegral n) lbs
|
||||
if LBS.length this /= fromIntegral n then
|
||||
liftIO $ throwIO SomeReadLogError
|
||||
else do
|
||||
put $! that
|
||||
pure this
|
||||
|
||||
readChunkSimple :: Monad m => Int -> ConsumeLBS m ByteString
|
||||
readChunkSimple n = do
|
||||
lbs <- get
|
||||
let (this, that) = LBS.splitAt (fromIntegral n) lbs
|
||||
put $! that
|
||||
pure this
|
||||
|
||||
reminds :: Monad m => ConsumeLBS m Int
|
||||
reminds = gets (fromIntegral . LBS.length)
|
||||
|
||||
consumed :: Monad m => ConsumeLBS m Bool
|
||||
consumed = gets LBS.null
|
||||
|
||||
runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a
|
||||
runConsumeLBS s m = evalStateT (fromConsumeLBS m) s
|
||||
|
||||
newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a }
|
||||
deriving newtype ( Applicative
|
||||
, Functor
|
||||
, Monad
|
||||
, MonadState BS.ByteString
|
||||
, MonadIO
|
||||
, MonadTrans
|
||||
)
|
||||
|
||||
|
||||
instance Monad m => BytesReader (ConsumeLBS m) where
|
||||
readBytes n = readChunkSimple n
|
||||
noBytesLeft = consumed
|
||||
|
||||
instance Monad m => BytesReader (ConsumeBS m) where
|
||||
noBytesLeft = gets BS.null
|
||||
readBytes n = do
|
||||
s <- get
|
||||
let (a,b) = BS.splitAt n s
|
||||
put $! b
|
||||
pure (LBS.fromStrict a)
|
||||
|
||||
runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a
|
||||
runConsumeBS s m = evalStateT (fromConsumeBS m) s
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
writeSection :: forall m . Monad m
|
||||
=> ByteString
|
||||
-> ( ByteString -> m () )
|
||||
|
|
Loading…
Reference in New Issue