This commit is contained in:
voidlizard 2025-01-09 11:35:42 +03:00
parent eb839eca94
commit 7742ad81ce
2 changed files with 58 additions and 38 deletions

View File

@ -424,7 +424,7 @@ mergeSortedFilesN getKey inputFiles outFile = do
liftIO $ writeSection (LBS.fromStrict e) (LBS.hPutStr hOut) liftIO $ writeSection (LBS.fromStrict e) (LBS.hPutStr hOut)
next (catMaybes files, newWin) next (catMaybes files, newWin)
mapM_ rm inputFiles mapM_ rm inputFiles
where where
readEntry :: BS.ByteString -> (Maybe BS.ByteString, Maybe BS.ByteString) readEntry :: BS.ByteString -> (Maybe BS.ByteString, Maybe BS.ByteString)
@ -727,7 +727,7 @@ theDict = do
if done then pure () if done then pure ()
else do else do
ssize <- readBytesMaybe 4 ssize <- readBytesMaybe 4
>>= orThrow SomeReadLogError >>= orThrow SomeReadLogError
<&> fromIntegral . N.word32 . LBS.toStrict <&> fromIntegral . N.word32 . LBS.toStrict
hash <- readBytesMaybe 20 hash <- readBytesMaybe 20
@ -1057,12 +1057,16 @@ theDict = do
liftIO $ print $ fill 10 (pretty s) <+> pretty f liftIO $ print $ fill 10 (pretty s) <+> pretty f
entry $ bindMatch "reflog:index:list:tx" $ nil_ $ const $ lift do entry $ bindMatch "reflog:index:list:tx" $ nil_ $ const $ lift do
r <- newTVarIO ( mempty :: HashSet HashRef ) r <- newIORef ( mempty :: HashSet HashRef )
index <- openIndex index <- openIndex
enumEntries index $ \bs -> do enumEntries index $ \bs -> do
atomically $ modifyTVar r (HS.insert (coerce $ BS.take 32 $ BS.drop 20 bs)) let h = coerce $ BS.take 32 $ BS.drop 20 bs
z <- readTVarIO r <&> HS.toList -- here <- readIORef r <&> HS.member h
liftIO $ mapM_ ( print . pretty ) z -- unless here do
atomicModifyIORef' r ( \x -> (HS.insert h x, ()))
z <- readIORef r <&> HS.toList
for_ z $ \h ->do
liftIO $ print $ pretty h
entry $ bindMatch "reflog:index:build" $ nil_ $ const $ lift $ connectedDo do entry $ bindMatch "reflog:index:build" $ nil_ $ const $ lift $ connectedDo do
writeReflogIndex writeReflogIndex

View File

@ -14,6 +14,7 @@ import Data.List qualified as L
import Network.ByteOrder qualified as N import Network.ByteOrder qualified as N
import System.IO.Temp as Temp import System.IO.Temp as Temp
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.Fixed
import Data.Maybe import Data.Maybe
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict qualified as HM
@ -36,6 +37,7 @@ import Codec.Compression.Zstd.Lazy qualified as ZstdL
import Codec.Serialise import Codec.Serialise
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
import Streaming hiding (run,chunksOf) import Streaming hiding (run,chunksOf)
import System.TimeIt
import UnliftIO import UnliftIO
import UnliftIO.IO.File qualified as UIO import UnliftIO.IO.File qualified as UIO
@ -81,7 +83,6 @@ data IndexEntry =
data Index a = data Index a =
Index { entries :: [IndexEntry] Index { entries :: [IndexEntry]
, bitmap :: Bloom GitHash
} }
openIndex :: forall a m . (Git3Perks m, MonadReader Git3Env m) openIndex :: forall a m . (Git3Perks m, MonadReader Git3Env m)
@ -91,20 +92,7 @@ openIndex = do
files <- listObjectIndexFiles files <- listObjectIndexFiles
bss <- liftIO $ for files $ \(f,_) -> (f,) <$> mmapFileByteString f Nothing bss <- liftIO $ for files $ \(f,_) -> (f,) <$> mmapFileByteString f Nothing
let entries = [ IndexEntry f bs | (f,bs) <- bss ] let entries = [ IndexEntry f bs | (f,bs) <- bss ]
let n = sum (fmap snd files) pure $ Index entries
let bss = bloomFilterSize n 5 0.01 & fromIntegral
bloom <- liftIO $ stToIO $ MBloom.new bloomHash bss
let idx = Index entries undefined
-- enumEntries idx $ \bs -> do
-- let h = coerce (BS.take 20 bs) :: GitHash
-- liftIO $ stToIO (MBloom.insert bloom h)
bm <- liftIO $ stToIO $ Bloom.freeze bloom
pure $ idx { bitmap = bm }
indexEntryLookup :: forall a m . (Git3Perks m) indexEntryLookup :: forall a m . (Git3Perks m)
=> Index a => Index a
@ -115,11 +103,9 @@ indexEntryLookup Index{..} h = do
already_ <- newTVarIO ( mempty :: HashMap GitHash N.ByteString ) already_ <- newTVarIO ( mempty :: HashMap GitHash N.ByteString )
forConcurrently_ entries $ \IndexEntry{..} -> do forConcurrently_ entries $ \IndexEntry{..} -> do
what <- readTVarIO already_ <&> HM.lookup h what <- readTVarIO already_ <&> HM.lookup h
let inBloom = True -- Bloom.elem h bitmap case what of
case (inBloom,what) of Just{} -> none
(False,_) -> none Nothing -> do
(_,Just{}) -> none
(_,Nothing) -> do
offset' <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce h) entryBS offset' <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce h) entryBS
maybe1 offset' none $ \offset -> do maybe1 offset' none $ \offset -> do
let ebs = BS.take 32 $ BS.drop (offset + 4 + 20) entryBS let ebs = BS.take 32 $ BS.drop (offset + 4 + 20) entryBS
@ -138,18 +124,14 @@ indexFilterNewObjects Index{..} hashes = do
flip fix (HS.toList hashes) $ \next -> \case flip fix (HS.toList hashes) $ \next -> \case
[] -> none [] -> none
(x:xs) -> do (x:xs) -> do
let inBloom = True -- Bloom.elem x bitmap old <- readTVarIO old_ <&> HS.member x
if not inBloom then if old then
next xs next xs
else do else do
old <- readTVarIO old_ <&> HS.member x off <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce x) entryBS
if old then when (isJust off) do
next xs atomically $ modifyTVar old_ (HS.insert x)
else do next xs
off <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce x) entryBS
when (isJust off) do
atomically $ modifyTVar old_ (HS.insert x)
next xs
old <- readTVarIO old_ old <- readTVarIO old_
pure $ HS.toList (hashes `HS.difference` old) pure $ HS.toList (hashes `HS.difference` old)
@ -186,9 +168,31 @@ enumEntries :: forall a m . ( Git3Perks m
) => Index a -> ( BS.ByteString -> m () ) -> m () ) => Index a -> ( BS.ByteString -> m () ) -> m ()
enumEntries Index{..} action = do enumEntries Index{..} action = do
forConcurrently_ entries $ \IndexEntry{..} -> do for_ entries $ \IndexEntry{..} -> do
scanBS entryBS action scanBS entryBS action
enumEntriesFixed :: forall a m . ( Git3Perks m
)
=> Int
-> Index a
-> ( BS.ByteString -> m () )
-> m ()
enumEntriesFixed n Index{..} action = do
q <- newTQueueIO
atomically $ mapM_ (writeTQueue q) entries
replicateM_ n $ do
fix \next -> do
es' <- atomically $ tryReadTQueue q
case es' of
Nothing -> none
Just IndexEntry{..} -> do
scanBS entryBS action
next
bloomHash :: GitHash -> [Word32] bloomHash :: GitHash -> [Word32]
bloomHash gh = [a,b,c,d,e] bloomHash gh = [a,b,c,d,e]
where where
@ -223,6 +227,18 @@ writeReflogIndex = do
sto <- getStorage sto <- getStorage
idx <- openIndex
written_ <- newTVarIO mempty
(t1,_) <- timeItT do
enumEntries idx $ \bs -> do
let txh = coerce (BS.take 32 $ BS.drop 20 bs) :: HashRef
atomically $ modifyTVar written_ (HS.insert txh)
written <- readTVarIO written_
notice $ "read index at" <+> pretty (realToFrac @_ @(Fixed E2) t1)
flip runContT pure do flip runContT pure do
what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog
@ -239,7 +255,7 @@ writeReflogIndex = do
walkMerkle (coerce what) (getBlock sto) $ \case walkMerkle (coerce what) (getBlock sto) $ \case
Left{} -> throwIO MissedBlockError Left{} -> throwIO MissedBlockError
Right (hs :: [HashRef]) -> do Right (hs :: [HashRef]) -> do
for_ hs $ \h -> void $ runMaybeT do for_ [h | h <- hs, not (HS.member h written)] $ \h -> void $ runMaybeT do
tx <- getBlock sto (coerce h) tx <- getBlock sto (coerce h)
>>= toMPlus >>= toMPlus