From 7742ad81cef009e121ec8a2a195f8dd9aae7042f Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 9 Jan 2025 11:35:42 +0300 Subject: [PATCH] wip --- hbs2-git3/app/Main.hs | 16 ++++-- hbs2-git3/lib/HBS2/Git3/State/Index.hs | 80 +++++++++++++++----------- 2 files changed, 58 insertions(+), 38 deletions(-) diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 689572f9..1adb0923 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -424,7 +424,7 @@ mergeSortedFilesN getKey inputFiles outFile = do liftIO $ writeSection (LBS.fromStrict e) (LBS.hPutStr hOut) next (catMaybes files, newWin) - mapM_ rm inputFiles + mapM_ rm inputFiles where readEntry :: BS.ByteString -> (Maybe BS.ByteString, Maybe BS.ByteString) @@ -727,7 +727,7 @@ theDict = do if done then pure () else do ssize <- readBytesMaybe 4 - >>= orThrow SomeReadLogError + >>= orThrow SomeReadLogError <&> fromIntegral . N.word32 . LBS.toStrict hash <- readBytesMaybe 20 @@ -1057,12 +1057,16 @@ theDict = do liftIO $ print $ fill 10 (pretty s) <+> pretty f entry $ bindMatch "reflog:index:list:tx" $ nil_ $ const $ lift do - r <- newTVarIO ( mempty :: HashSet HashRef ) + r <- newIORef ( mempty :: HashSet HashRef ) index <- openIndex enumEntries index $ \bs -> do - atomically $ modifyTVar r (HS.insert (coerce $ BS.take 32 $ BS.drop 20 bs)) - z <- readTVarIO r <&> HS.toList - liftIO $ mapM_ ( print . pretty ) z + let h = coerce $ BS.take 32 $ BS.drop 20 bs + -- here <- readIORef r <&> HS.member h + -- unless here do + atomicModifyIORef' r ( \x -> (HS.insert h x, ())) + z <- readIORef r <&> HS.toList + for_ z $ \h ->do + liftIO $ print $ pretty h entry $ bindMatch "reflog:index:build" $ nil_ $ const $ lift $ connectedDo do writeReflogIndex diff --git a/hbs2-git3/lib/HBS2/Git3/State/Index.hs b/hbs2-git3/lib/HBS2/Git3/State/Index.hs index 9e426294..7c113742 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Index.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Index.hs @@ -14,6 +14,7 @@ import Data.List qualified as L import Network.ByteOrder qualified as N import System.IO.Temp as Temp import Data.ByteString.Lazy qualified as LBS +import Data.Fixed import Data.Maybe import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM @@ -36,6 +37,7 @@ import Codec.Compression.Zstd.Lazy qualified as ZstdL import Codec.Serialise import Streaming.Prelude qualified as S import Streaming hiding (run,chunksOf) +import System.TimeIt import UnliftIO import UnliftIO.IO.File qualified as UIO @@ -81,7 +83,6 @@ data IndexEntry = data Index a = Index { entries :: [IndexEntry] - , bitmap :: Bloom GitHash } openIndex :: forall a m . (Git3Perks m, MonadReader Git3Env m) @@ -91,20 +92,7 @@ openIndex = do files <- listObjectIndexFiles bss <- liftIO $ for files $ \(f,_) -> (f,) <$> mmapFileByteString f Nothing let entries = [ IndexEntry f bs | (f,bs) <- bss ] - let n = sum (fmap snd files) - let bss = bloomFilterSize n 5 0.01 & fromIntegral - - bloom <- liftIO $ stToIO $ MBloom.new bloomHash bss - - let idx = Index entries undefined - - -- enumEntries idx $ \bs -> do - -- let h = coerce (BS.take 20 bs) :: GitHash - -- liftIO $ stToIO (MBloom.insert bloom h) - - bm <- liftIO $ stToIO $ Bloom.freeze bloom - - pure $ idx { bitmap = bm } + pure $ Index entries indexEntryLookup :: forall a m . (Git3Perks m) => Index a @@ -115,11 +103,9 @@ indexEntryLookup Index{..} h = do already_ <- newTVarIO ( mempty :: HashMap GitHash N.ByteString ) forConcurrently_ entries $ \IndexEntry{..} -> do what <- readTVarIO already_ <&> HM.lookup h - let inBloom = True -- Bloom.elem h bitmap - case (inBloom,what) of - (False,_) -> none - (_,Just{}) -> none - (_,Nothing) -> do + case what of + Just{} -> none + Nothing -> do offset' <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce h) entryBS maybe1 offset' none $ \offset -> do let ebs = BS.take 32 $ BS.drop (offset + 4 + 20) entryBS @@ -138,18 +124,14 @@ indexFilterNewObjects Index{..} hashes = do flip fix (HS.toList hashes) $ \next -> \case [] -> none (x:xs) -> do - let inBloom = True -- Bloom.elem x bitmap - if not inBloom then + old <- readTVarIO old_ <&> HS.member x + if old then next xs else do - old <- readTVarIO old_ <&> HS.member x - if old then - next xs - else do - off <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce x) entryBS - when (isJust off) do - atomically $ modifyTVar old_ (HS.insert x) - next xs + off <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce x) entryBS + when (isJust off) do + atomically $ modifyTVar old_ (HS.insert x) + next xs old <- readTVarIO old_ pure $ HS.toList (hashes `HS.difference` old) @@ -186,9 +168,31 @@ enumEntries :: forall a m . ( Git3Perks m ) => Index a -> ( BS.ByteString -> m () ) -> m () enumEntries Index{..} action = do - forConcurrently_ entries $ \IndexEntry{..} -> do + for_ entries $ \IndexEntry{..} -> do scanBS entryBS action +enumEntriesFixed :: forall a m . ( Git3Perks m + ) + => Int + -> Index a + -> ( BS.ByteString -> m () ) + -> m () + +enumEntriesFixed n Index{..} action = do + + q <- newTQueueIO + + atomically $ mapM_ (writeTQueue q) entries + + replicateM_ n $ do + fix \next -> do + es' <- atomically $ tryReadTQueue q + case es' of + Nothing -> none + Just IndexEntry{..} -> do + scanBS entryBS action + next + bloomHash :: GitHash -> [Word32] bloomHash gh = [a,b,c,d,e] where @@ -223,6 +227,18 @@ writeReflogIndex = do sto <- getStorage + idx <- openIndex + + written_ <- newTVarIO mempty + + (t1,_) <- timeItT do + enumEntries idx $ \bs -> do + let txh = coerce (BS.take 32 $ BS.drop 20 bs) :: HashRef + atomically $ modifyTVar written_ (HS.insert txh) + + written <- readTVarIO written_ + notice $ "read index at" <+> pretty (realToFrac @_ @(Fixed E2) t1) + flip runContT pure do what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog @@ -239,7 +255,7 @@ writeReflogIndex = do walkMerkle (coerce what) (getBlock sto) $ \case Left{} -> throwIO MissedBlockError Right (hs :: [HashRef]) -> do - for_ hs $ \h -> void $ runMaybeT do + for_ [h | h <- hs, not (HS.member h written)] $ \h -> void $ runMaybeT do tx <- getBlock sto (coerce h) >>= toMPlus