This commit is contained in:
voidlizard 2025-01-08 12:54:13 +03:00
parent 218ff12000
commit 568fb735a5
2 changed files with 63 additions and 138 deletions

View File

@ -593,16 +593,9 @@ theDict = do
liftIO $ mapM_ setCurrentDirectory mpath liftIO $ mapM_ setCurrentDirectory mpath
rq <- newTQueueIO idx <- lift openIndex
ContT $ withAsync (startReflogIndexQueryQueue rq)
let req h = do let req h = lift $ indexEntryLookup idx h <&> isNothing
let bs = coerce @GitHash @N.ByteString h
let tr = const True
w <- newEmptyTMVarIO
atomically $ writeTQueue rq (bs, tr, w)
r <- atomically $ readTMVar w
pure $ isNothing r
-- let hss = headDef "HEAD" [ x | StringLike x <- snd (splitOpts [] syn) ] -- let hss = headDef "HEAD" [ x | StringLike x <- snd (splitOpts [] syn) ]
h <- gitRevParseThrow hss h <- gitRevParseThrow hss
@ -611,7 +604,6 @@ theDict = do
for_ (HPSQ.toList r) $ \(k,_,_) -> do for_ (HPSQ.toList r) $ \(k,_,_) -> do
liftIO $ print $ pretty k liftIO $ print $ pretty k
entry $ bindMatch "test:git:log:cat" $ nil_ $ \syn -> lift do entry $ bindMatch "test:git:log:cat" $ nil_ $ \syn -> lift do
let (opts, argz) = splitOpts [("--git",0),("--packed",0),("--import",1)] syn let (opts, argz) = splitOpts [("--git",0),("--packed",0),("--import",1)] syn
@ -754,23 +746,18 @@ theDict = do
found <- liftIO $ binarySearchBS 56 (BS.take 20 . BS.drop 4) (coerce h) file found <- liftIO $ binarySearchBS 56 (BS.take 20 . BS.drop 4) (coerce h) file
liftIO $ notice $ pretty h <+> pretty (isJust found) liftIO $ notice $ pretty h <+> pretty (isJust found)
entry $ bindMatch "reflog:index:search" $ nil_ $ \syn -> lift $ flip runContT pure do entry $ bindMatch "reflog:index:search" $ nil_ $ \syn -> lift do
let (_, argz) = splitOpts [] syn let (_, argz) = splitOpts [] syn
hash <- headMay [ x | GitHashLike x <- argz ] & orThrowUser "need sha1" hash <- headMay [ x | GitHashLike x <- argz ] & orThrowUser "need sha1"
rq <- newTQueueIO idx <- openIndex
ContT $ withAsync $ startReflogIndexQueryQueue rq answ <- indexEntryLookup idx hash
answ_ <- newEmptyTMVarIO for_ answ $ \bs -> do
let a = coerce (BS.take 32 bs) :: HashRef
atomically $ writeTQueue rq (coerce hash, const True, answ_)
answ <- atomically $ readTMVar answ_
for_ answ $ \a -> do
liftIO $ print $ pretty a liftIO $ print $ pretty a
entry $ bindMatch "test:git:log:index:flat:search:linear:test" $ nil_ $ \case entry $ bindMatch "test:git:log:index:flat:search:linear:test" $ nil_ $ \case
@ -1095,7 +1082,7 @@ theDict = do
idx <- openIndex idx <- openIndex
let req h = lift $ indexEntryLookup idx h <&> isNothing -- let req h = lift $ indexEntryLookup idx h <&> isNothing
flip runContT pure do flip runContT pure do
cap <- liftIO getNumCapabilities cap <- liftIO getNumCapabilities
@ -1114,7 +1101,7 @@ theDict = do
-- читаем только те объекты, которые не в индексе -- читаем только те объекты, которые не в индексе
hashes <- gitReadTreeObjectsOnly commit hashes <- gitReadTreeObjectsOnly commit
<&> ([commit,tree]<>) <&> ([commit,tree]<>)
>>= filterM req >>= lift . indexFilterNewObjects idx . HS.fromList
-- --
atomically $ mapM_ (writeTQueue new_) hashes atomically $ mapM_ (writeTQueue new_) hashes
atomically (STM.flushTQueue new_) >>= liftIO . print . pretty . length atomically (STM.flushTQueue new_) >>= liftIO . print . pretty . length
@ -1131,63 +1118,6 @@ theDict = do
void $ flip runContT pure do void $ flip runContT pure do
-- cache <- newTVarIO ( mempty :: HashSet GitHash )
-- читаем вообще всё из индекса в память и строим HashSet
-- получается, что вообще никакого профита, что это индекс,
-- это фуллскан в любом случае.
-- Индекс это сортированная последовательность [(GitHash, HashRef)]
-- в виде байстроки формата "SD", D ~ GitHash <> HashRef
-- let blm = runST (MBloom.new undefined 1000000)
--
-- bloom <- liftIO $ stToIO $ MBloom.new 10000
--
index <- lift openIndex
-- let req h = do
-- atomically do
-- readTVar cache <&> not . HS.member h
-- let
-- req2 :: GitHash -> Git3 m Bool
-- req2 h = liftIO do
-- here <- liftIO $ stToIO $ MBloom.elem h bloom
-- if not here then pure True else do
-- atomically $ modifyTVar blmn_ succ
-- forConcurrently_ files $ \f -> do
-- found <- binarySearchBS 56 ( BS.take 20. BS.drop 4 ) (coerce h) f
-- when (isJust found) do
-- atomically $ modifyTVar excl_ (HS.insert h)
-- readTVarIO excl_ <&> not . HS.member h
-- req3 :: HashSet GitHash -> Git3 m (HashSet GitHash)
-- req3 hs = liftIO do
-- forConcurrently_ files $ \f -> do
-- flip fix (HS.toList hs) $ \next -> \case
-- [] -> none
-- (x:xs) -> do
-- already <- readTVarIO excl_ <&> HS.member x
-- inBloom <- liftIO $ stToIO $ MBloom.elem x bloom
-- when inBloom do
-- atomically $ modifyTVar blmn_ succ
-- when (not already || inBloom) do
-- found <- binarySearchBS 56 ( BS.take 20. BS.drop 4 ) (coerce x) f
-- when (isJust found) do
-- atomically $ modifyTVar excl_ (HS.insert x)
-- next xs
-- found <- readTVarIO excl_
-- pure ( hs `HS.difference` found)
-- читаем только те коммиты, которые не в индексе
-- очень быстро, пушо относительно мало объектов
idx <- lift openIndex idx <- lift openIndex
let req h = lift $ indexEntryLookup idx h <&> isNothing let req h = lift $ indexEntryLookup idx h <&> isNothing
@ -1217,7 +1147,7 @@ theDict = do
notice $ "all shit read" <+> pretty (realToFrac @_ @(Fixed E2) t3) notice $ "all shit read" <+> pretty (realToFrac @_ @(Fixed E2) t3)
(t4,new) <- lift $ timeItT $ readTVarIO uniq_ >>= filterM req . HS.toList (t4,new) <- lift $ timeItT $ readTVarIO uniq_ >>= indexFilterNewObjects idx
notice $ pretty (length new) <+> "new objects" <+> "at" <+> pretty (realToFrac @_ @(Fixed E2) t4) notice $ pretty (length new) <+> "new objects" <+> "at" <+> pretty (realToFrac @_ @(Fixed E2) t4)

View File

@ -17,6 +17,8 @@ import Data.ByteString.Lazy qualified as LBS
import Data.Maybe import Data.Maybe
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.Word import Data.Word
import Data.Vector (Vector) import Data.Vector (Vector)
import Data.Vector qualified as V import Data.Vector qualified as V
@ -79,7 +81,7 @@ data IndexEntry =
data Index a = data Index a =
Index { entries :: [IndexEntry] Index { entries :: [IndexEntry]
, bitmap :: MBloom.MBloom RealWorld GitHash , bitmap :: Bloom GitHash
} }
openIndex :: forall a m . (Git3Perks m, MonadReader Git3Env m) openIndex :: forall a m . (Git3Perks m, MonadReader Git3Env m)
@ -89,8 +91,20 @@ openIndex = do
files <- listObjectIndexFiles files <- listObjectIndexFiles
bss <- liftIO $ for files $ \(f,_) -> (f,) <$> mmapFileByteString f Nothing bss <- liftIO $ for files $ \(f,_) -> (f,) <$> mmapFileByteString f Nothing
let entries = [ IndexEntry f bs | (f,bs) <- bss ] let entries = [ IndexEntry f bs | (f,bs) <- bss ]
bloom <- liftIO $ stToIO $ MBloom.new bloomHash 10 let n = sum (fmap snd files)
pure $ Index entries bloom let bss = bloomFilterSize n 5 0.01 & fromIntegral
bloom <- liftIO $ stToIO $ MBloom.new bloomHash bss
let idx = Index entries undefined
enumEntries idx $ \bs -> do
let h = coerce (BS.take 20 bs) :: GitHash
liftIO $ stToIO (MBloom.insert bloom h)
bm <- liftIO $ stToIO $ Bloom.freeze bloom
pure $ idx { bitmap = bm }
indexEntryLookup :: forall a m . (Git3Perks m) indexEntryLookup :: forall a m . (Git3Perks m)
=> Index a => Index a
@ -101,9 +115,11 @@ indexEntryLookup Index{..} h = do
already_ <- newTVarIO ( mempty :: HashMap GitHash N.ByteString ) already_ <- newTVarIO ( mempty :: HashMap GitHash N.ByteString )
forConcurrently_ entries $ \IndexEntry{..} -> do forConcurrently_ entries $ \IndexEntry{..} -> do
what <- readTVarIO already_ <&> HM.lookup h what <- readTVarIO already_ <&> HM.lookup h
case what of let inBloom = Bloom.elem h bitmap
Just{} -> none case (inBloom,what) of
Nothing -> do (False,_) -> none
(_,Just{}) -> none
(_,Nothing) -> do
offset' <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce h) entryBS offset' <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce h) entryBS
maybe1 offset' none $ \offset -> do maybe1 offset' none $ \offset -> do
let ebs = BS.take 32 $ BS.drop (offset + 4 + 20) entryBS let ebs = BS.take 32 $ BS.drop (offset + 4 + 20) entryBS
@ -111,6 +127,33 @@ indexEntryLookup Index{..} h = do
readTVarIO already_ <&> headMay . HM.elems readTVarIO already_ <&> headMay . HM.elems
indexFilterNewObjects :: forall a m . (Git3Perks m)
=> Index a
-> HashSet GitHash
-> m [GitHash]
indexFilterNewObjects Index{..} hashes = do
old_ <- newTVarIO ( mempty :: HashSet GitHash )
forConcurrently_ entries $ \IndexEntry{..} -> do
flip fix (HS.toList hashes) $ \next -> \case
[] -> none
(x:xs) -> do
let inBloom = Bloom.elem x bitmap
if not inBloom then
next xs
else do
old <- readTVarIO old_ <&> HS.member x
if old then
next xs
else do
off <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce x) entryBS
when (isJust off) do
atomically $ modifyTVar old_ (HS.insert x)
next xs
old <- readTVarIO old_
pure $ HS.toList (hashes `HS.difference` old)
listObjectIndexFiles :: forall m . ( Git3Perks m listObjectIndexFiles :: forall m . ( Git3Perks m
, MonadReader Git3Env m , MonadReader Git3Env m
) => m [(FilePath, Natural)] ) => m [(FilePath, Natural)]
@ -142,58 +185,10 @@ bloomHash gh = [a,b,c,d,e]
d = N.word32 (BS.take 4 $ BS.drop 12 bs) d = N.word32 (BS.take 4 $ BS.drop 12 bs)
e = N.word32 (BS.take 4 $ BS.drop 16 bs) e = N.word32 (BS.take 4 $ BS.drop 16 bs)
startReflogIndexQueryQueue :: forall a m . ( Git3Perks m bloomFilterSize :: Natural -- ^ elems?
, MonadReader Git3Env m -> Natural -- ^ hash functions
, HasClientAPI PeerAPI UNIX m -> Double -- ^ error probability
, HasClientAPI RefLogAPI UNIX m -> Natural
, HasStorage m
)
=> TQueue (BS.ByteString, BS.ByteString -> a, TMVar (Maybe a))
-> m ()
startReflogIndexQueryQueue rq = flip runContT pure do
files <- lift $ listObjectIndexFiles <&> fmap fst
-- один файл - не более, чем один поток
-- мапим файлы
-- возвращаем функцию запроса?
-- для каждого файла -- мы создаём отдельную очередь,
-- нам надо искать во всех файлах
mmaped <- liftIO $ for files (liftIO . flip mmapFileByteString Nothing)
answQ <- newTVarIO mempty
forever $ liftIO do
requests <- atomically do
_ <- peekTQueue rq
w <- STM.flushTQueue rq
for_ w $ \(k,_,a) -> do
modifyTVar answQ (HM.insert k a)
pure w
forConcurrently_ mmaped \bs -> do
for requests $ \(s,f,answ) -> runMaybeT do
still <- readTVarIO answQ <&> HM.member s
guard still
-- FIXME: size-hardcodes
w <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) s bs
>>= toMPlus
let r = f (BS.drop (w * 56) bs)
atomically do
writeTMVar answ (Just r)
modifyTVar answQ (HM.delete bs)
atomically do
rest <- readTVar answQ
for_ rest $ \x -> writeTMVar x Nothing
bloomFilterSize :: Natural -> Natural -> Double -> Natural
bloomFilterSize n k p bloomFilterSize n k p
| p <= 0 || p >= 1 = 0 | p <= 0 || p >= 1 = 0
| otherwise = rnd $ negate (fromIntegral n * fromIntegral k) / log (1 - p ** (1 / fromIntegral k)) | otherwise = rnd $ negate (fromIntegral n * fromIntegral k) / log (1 - p ** (1 / fromIntegral k))