diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index c6a8411b..a54cae65 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -93,7 +93,9 @@ import Data.Vector.Algorithms.Search qualified as MV import UnliftIO.Concurrent import UnliftIO.IO.File qualified as UIO -import Data.BloomFilter.Easy qualified as Bloom +import Control.Monad.ST +import Data.BloomFilter qualified as Bloom +import Data.BloomFilter.Mutable qualified as MBloom {- HLINT ignore "Functor law" -} {- HLINT ignore "Eta reduce" -} @@ -146,6 +148,7 @@ recover m = fix \again -> do liftIO $ withGit3Env connected again + e -> throwIO e --- data TreeReadState = TreeReadState @@ -578,14 +581,6 @@ theDict = do entry $ bindMatch "zlib:deflate" $ nil_ $ const $ liftIO do LBS.hGetContents stdin <&> Zlib.decompress >>= LBS.hPutStr stdout - - entry $ bindMatch "test:reflog:bloom:create" $ nil_ $ \syn -> lift do - r <- newTQueueIO - enumEntries $ \e -> do - atomically $ writeTQueue r (BS.take 20 e) - atomically (STM.flushTQueue r) <&> (Bloom.easyList 0.01) - notice $ "bloom filter build done" - entry $ bindMatch "test:git:read-commit-chain" $ nil_ $ \syn -> lift do (mpath, hss) <- case syn of [ HashLike s ] -> pure (Nothing, s) @@ -705,7 +700,8 @@ theDict = do entry $ bindMatch "test:reflog:index:search:binary:test:2" $ nil_ $ const $ lift do r <- newTQueueIO - enumEntries $ \e -> do + idx <- openIndex + enumEntries idx $ \e -> do let ha = GitHash $ coerce $ BS.take 20 e atomically $ writeTQueue r ha @@ -958,19 +954,16 @@ theDict = do LBS.hPutStr fh contents - entry $ bindMatch "test:git:reflog:index:list:fast" $ nil_ $ \case - [ StringLike f ] -> lift do + entry $ bindMatch "test:git:reflog:index:list:fast" $ nil_ $ const $ lift do + files <- listObjectIndexFiles + forConcurrently_ files $ \(f,_) -> do bs <- liftIO $ mmapFileByteString f Nothing scanBS bs $ \segment -> do - none - -- let (sha1,blake) = BS.splitAt 20 segment - -- & over _1 (coerce @_ @GitHash) - -- & over _2 (coerce @_ @HashRef) + let (sha1,blake) = BS.splitAt 20 segment + & over _1 (coerce @_ @GitHash) + & over _2 (coerce @_ @HashRef) - -- notice $ pretty sha1 <+> pretty blake - liftIO $ print $ pretty "okay" - - _ -> throwIO (BadFormException @C nil) + notice $ pretty sha1 <+> pretty blake entry $ bindMatch "reflog:index:list" $ nil_ $ const $ lift do files <- listObjectIndexFiles @@ -1068,7 +1061,6 @@ theDict = do entry $ bindMatch "reflog:index:path" $ nil_ $ const $ lift do indexPath >>= liftIO . print . pretty - -- let entriesListOf lbs = S.toList_ $ runConsumeLBS lbs $ readSections $ \s ss -> do entry $ bindMatch "reflog:index:files" $ nil_ $ \syn -> lift do files <- listObjectIndexFiles @@ -1079,7 +1071,8 @@ theDict = do entry $ bindMatch "reflog:index:list:tx" $ nil_ $ const $ lift do r <- newTVarIO ( mempty :: HashSet HashRef ) - enumEntries $ \bs -> do + index <- openIndex + enumEntries index $ \bs -> do atomically $ modifyTVar r (HS.insert (coerce $ BS.take 32 $ BS.drop 20 bs)) z <- readTVarIO r <&> HS.toList liftIO $ mapM_ ( print . pretty ) z @@ -1087,67 +1080,33 @@ theDict = do entry $ bindMatch "reflog:index:build" $ nil_ $ const $ lift $ connectedDo do writeReflogIndex - entry $ bindMatch "git:list:objects:new" $ nil_ $ \syn -> lift do - let (opts,argz) = splitOpts [] syn + entry $ bindMatch "test:reflog:index:lookup" $ nil_ \case + [ GitHashLike h ] -> lift do + idx <- openIndex + what <- indexEntryLookup idx h >>= orThrowUser "object not found" + liftIO $ print $ pretty ( coerce @_ @HashRef what ) - let what = headDef "HEAD" [ x | StringLike x <- argz ] - h0 <- gitRevParseThrow what + _ -> throwIO (BadFormException @C nil) - void $ flip runContT pure do + entry $ bindMatch "git:commit:list:objects:new" $ nil_ $ \case + [ StringLike what ] -> lift do - rq <- newTQueueIO - -- ContT $ withAsync (startReflogIndexQueryQueue rq) + commit <- gitRevParseThrow what - -- let req h = do - -- let bs = coerce @GitHash @N.ByteString h - -- let tr = const True - -- w <- newEmptyTMVarIO - -- atomically $ writeTQueue rq (bs, tr, w) - -- r <- atomically $ readTMVar w - -- pure $ isNothing r + idx <- openIndex - cache <- newTVarIO ( mempty :: HashSet GitHash ) + let req h = lift $ indexEntryLookup idx h <&> isNothing - -- читаем вообще всё из индекса в память и строим HashSet - -- получается, что вообще никакого профита, что это индекс, - -- это фуллскан в любом случае. - -- Индекс это сортированная последовательность [(GitHash, HashRef)] - -- в виде байстроки формата "SD", D ~ GitHash <> HashRef - -- - -- + flip runContT pure do + cap <- liftIO getNumCapabilities + gitCatBatchQ <- contWorkerPool cap do + che <- ContT withGitCat + pure $ gitReadObjectMaybe che - (t,_) <- timeItT do - lift $ enumEntries $ \e -> do - atomically do - modifyTVar cache ( HS.insert (coerce $ BS.take 20 e) ) + new_ <- newTQueueIO + c1 <- newCacheFixedHPSQ 1000 - s <- readTVarIO cache <&> HS.size - notice $ pretty s <+> " records read at" <+> pretty t - - l_ <- newTVarIO 0 - h_ <- newTVarIO ( mempty :: HashSet GitHash ) - s_ <- newTVarIO ( mempty :: HashMap GitHash Int ) - - let req h = do - atomically do - modifyTVar l_ succ - readTVar cache <&> not . HS.member h - - -- читаем только те коммиты, которые не в индексе - -- очень быстро, пушо относительно мало объектов - (t1,r) <- timeItT (lift $ readCommitChainHPSQ req Nothing h0 dontHandle) - notice $ pretty s <+> " new commits read at" <+> pretty (realToFrac @_ @(Fixed E3) t1) - - cap <- liftIO getNumCapabilities - gitCatBatchQ <- contWorkerPool cap do - che <- ContT withGitCat - pure $ gitReadObjectMaybe che - - new_ <- newTQueueIO - c1 <- newCacheFixedHPSQ 1000 - (t3, _) <- timeItT $ lift $ forConcurrently_ (HPSQ.toList r) $ \(commit,_,_) -> do - - (_,self) <- gitCatBatchQ commit + (_,self) <- lift $ gitCatBatchQ commit >>= orThrow (GitReadError (show $ pretty commit)) tree <- gitReadCommitTree self @@ -1157,23 +1116,121 @@ theDict = do <&> ([commit,tree]<>) >>= filterM req -- - atomically $ modifyTVar s_ (HM.insertWith (\old new -> max old new) commit (length hashes)) - - atomically $ modifyTVar h_ (HS.union (HS.fromList hashes)) - atomically $ mapM_ (writeTQueue new_) hashes + atomically (STM.flushTQueue new_) >>= liftIO . print . pretty . length - -- 1.8 секунд и заметно растёт от числа коммитов, сука - atomically (STM.flushTQueue new_) >>= liftIO . print . pretty . length + _ -> throwIO (BadFormException @C nil) - l <- readTVarIO l_ - n <- readTVarIO h_ - w <- readTVarIO s_ <&> HM.elems + entry $ bindMatch "git:list:objects:new" $ nil_ $ \syn -> lift do + let (opts,argz) = splitOpts [] syn - let a = realToFrac (sum w) / realToFrac (length w) + let what = headDef "HEAD" [ x | StringLike x <- argz ] + h0 <- gitRevParseThrow what - notice $ pretty l <+> pretty (HS.size n) <+> "done in " <+> pretty (realToFrac @_ @(Fixed E3) t3) - notice $ "avg per commit" <+> pretty a + no_ <- newTVarIO 0 + + void $ flip runContT pure do + + -- cache <- newTVarIO ( mempty :: HashSet GitHash ) + + -- читаем вообще всё из индекса в память и строим HashSet + -- получается, что вообще никакого профита, что это индекс, + -- это фуллскан в любом случае. + -- Индекс это сортированная последовательность [(GitHash, HashRef)] + -- в виде байстроки формата "SD", D ~ GitHash <> HashRef + + -- let blm = runST (MBloom.new undefined 1000000) + -- + -- bloom <- liftIO $ stToIO $ MBloom.new 10000 + -- + + index <- lift openIndex + + -- let req h = do + -- atomically do + -- readTVar cache <&> not . HS.member h + + -- let + -- req2 :: GitHash -> Git3 m Bool + -- req2 h = liftIO do + -- here <- liftIO $ stToIO $ MBloom.elem h bloom + + -- if not here then pure True else do + -- atomically $ modifyTVar blmn_ succ + -- forConcurrently_ files $ \f -> do + -- found <- binarySearchBS 56 ( BS.take 20. BS.drop 4 ) (coerce h) f + -- when (isJust found) do + -- atomically $ modifyTVar excl_ (HS.insert h) + -- readTVarIO excl_ <&> not . HS.member h + + -- req3 :: HashSet GitHash -> Git3 m (HashSet GitHash) + -- req3 hs = liftIO do + + -- forConcurrently_ files $ \f -> do + -- flip fix (HS.toList hs) $ \next -> \case + -- [] -> none + -- (x:xs) -> do + -- already <- readTVarIO excl_ <&> HS.member x + -- inBloom <- liftIO $ stToIO $ MBloom.elem x bloom + + -- when inBloom do + -- atomically $ modifyTVar blmn_ succ + + -- when (not already || inBloom) do + -- found <- binarySearchBS 56 ( BS.take 20. BS.drop 4 ) (coerce x) f + -- when (isJust found) do + -- atomically $ modifyTVar excl_ (HS.insert x) + -- next xs + + -- found <- readTVarIO excl_ + -- pure ( hs `HS.difference` found) + + -- читаем только те коммиты, которые не в индексе + -- очень быстро, пушо относительно мало объектов + + idx <- lift openIndex + let req h = lift $ indexEntryLookup idx h <&> isNothing + + (t1,r) <- timeItT (lift $ readCommitChainHPSQ req Nothing h0 dontHandle) + + let s = HPSQ.size r + notice $ pretty s <+> "new commits read at" <+> pretty (realToFrac @_ @(Fixed E3) t1) + + cap <- liftIO getNumCapabilities + gitCatBatchQ <- contWorkerPool cap do + che <- ContT withGitCat + pure $ gitReadObjectMaybe che + + uniq_ <- newTVarIO mempty + -- c1 <- newCacheFixedHPSQ 1000 + (t3, _) <- timeItT $ lift $ forConcurrently_ (HPSQ.toList r) $ \(commit,_,_) -> do + + (_,self) <- gitCatBatchQ commit + >>= orThrow (GitReadError (show $ pretty commit)) + + tree <- gitReadCommitTree self + + -- читаем только те объекты, которые не в индексе + gitReadTreeObjectsOnly commit + <&> ([commit,tree]<>) + >>= \hs -> atomically (for_ hs (modifyTVar uniq_ . HS.insert)) + + notice $ "all shit read" <+> pretty (realToFrac @_ @(Fixed E2) t3) + + (t4,new) <- lift $ timeItT $ readTVarIO uniq_ >>= filterM req . HS.toList + + notice $ pretty (length new) <+> "new objects" <+> "at" <+> pretty (realToFrac @_ @(Fixed E2) t4) + + -- x <- readTVarIO uniq_ <&> HS.size + + -- blmn <- readTVarIO blmn_ + -- notice $ "all shit filter" <+> parens (pretty x) <+> brackets (pretty blmn) <+> pretty (realToFrac @_ @(Fixed E2) t4) + + -- notice $ pretty (length new) + + + -- notice $ "total objects" <+> pretty + -- notice $ "present" <+> pretty nhere -- liftIO $ print $ pretty (HS diff --git a/hbs2-git3/lib/HBS2/Git3/State/Index.hs b/hbs2-git3/lib/HBS2/Git3/State/Index.hs index a3f1570b..8e7fdfb0 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Index.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Index.hs @@ -17,9 +17,21 @@ import Data.ByteString.Lazy qualified as LBS import Data.Maybe import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM +import Data.Word +import Data.Vector (Vector) +import Data.Vector qualified as V +import Data.Kind + + +import Data.BloomFilter qualified as Bloom +import Data.BloomFilter (Bloom(..)) +import Data.BloomFilter.Mutable qualified as MBloom + +import Control.Monad.ST import Control.Concurrent.STM qualified as STM import Codec.Compression.Zstd.Lazy qualified as ZstdL +import Codec.Serialise import Streaming.Prelude qualified as S import Streaming hiding (run,chunksOf) @@ -59,6 +71,46 @@ indexPath = do reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet getStatePath (AsBase58 reflog) <&> ( "index") +data IndexEntry = + IndexEntry + { entryFile :: FilePath + , entryBS :: N.ByteString + } + +data Index a = + Index { entries :: [IndexEntry] + , bitmap :: MBloom.MBloom RealWorld GitHash + } + +openIndex :: forall a m . (Git3Perks m, MonadReader Git3Env m) + => m (Index a) + +openIndex = do + files <- listObjectIndexFiles + bss <- liftIO $ for files $ \(f,_) -> (f,) <$> mmapFileByteString f Nothing + let entries = [ IndexEntry f bs | (f,bs) <- bss ] + bloom <- liftIO $ stToIO $ MBloom.new bloomHash 10 + pure $ Index entries bloom + +indexEntryLookup :: forall a m . (Git3Perks m) + => Index a + -> GitHash + -> m (Maybe N.ByteString) + +indexEntryLookup Index{..} h = do + already_ <- newTVarIO ( mempty :: HashMap GitHash N.ByteString ) + forConcurrently_ entries $ \IndexEntry{..} -> do + what <- readTVarIO already_ <&> HM.lookup h + case what of + Just{} -> none + Nothing -> do + offset' <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) (coerce h) entryBS + maybe1 offset' none $ \offset -> do + let ebs = BS.take 32 $ BS.drop (offset + 4 + 20) entryBS + atomically $ modifyTVar already_ (HM.insert h ebs) + + readTVarIO already_ <&> headMay . HM.elems + listObjectIndexFiles :: forall m . ( Git3Perks m , MonadReader Git3Env m ) => m [(FilePath, Natural)] @@ -72,16 +124,23 @@ listObjectIndexFiles = do pure (f,z) - -enumEntries :: forall m . ( Git3Perks m +enumEntries :: forall a m . ( Git3Perks m , MonadReader Git3Env m - ) => ( BS.ByteString -> m () ) -> m () + ) => Index a -> ( BS.ByteString -> m () ) -> m () -enumEntries action = do - files <- listObjectIndexFiles <&> fmap fst - forConcurrently_ files $ \f -> do - bs <- liftIO $ mmapFileByteString f Nothing - scanBS bs action +enumEntries Index{..} action = do + forConcurrently_ entries $ \IndexEntry{..} -> do + scanBS entryBS action + +bloomHash :: GitHash -> [Word32] +bloomHash gh = [a,b,c,d,e] + where + bs = coerce gh + a = N.word32 (BS.take 4 bs) + b = N.word32 (BS.take 4 $ BS.drop 4 bs) + c = N.word32 (BS.take 4 $ BS.drop 8 bs) + d = N.word32 (BS.take 4 $ BS.drop 12 bs) + e = N.word32 (BS.take 4 $ BS.drop 16 bs) startReflogIndexQueryQueue :: forall a m . ( Git3Perks m , MonadReader Git3Env m @@ -134,6 +193,13 @@ startReflogIndexQueryQueue rq = flip runContT pure do rest <- readTVar answQ for_ rest $ \x -> writeTMVar x Nothing +bloomFilterSize :: Natural -> Natural -> Double -> Natural +bloomFilterSize n k p + | p <= 0 || p >= 1 = 0 + | otherwise = rnd $ negate (fromIntegral n * fromIntegral k) / log (1 - p ** (1 / fromIntegral k)) + where + rnd x = 2 ** realToFrac (ceiling (logBase 2 x)) & round + writeReflogIndex :: forall m . ( Git3Perks m , MonadReader Git3Env m , HasClientAPI PeerAPI UNIX m @@ -195,4 +261,27 @@ writeReflogIndex = do -- notice $ pretty sha1 <+> pretty tx writeSection ( LBS.fromChunks [key,value] ) (LBS.hPutStr wh) + -- files <- lift listObjectIndexFiles + -- let num = sum (fmap snd files) `div` 56 + -- let size = bloomFilterSize num 5 0.01 + + -- bloom <- liftIO $ stToIO (MBloom.new bloomHash (fromIntegral size)) + + -- lift $ enumEntries $ \bs -> do + -- liftIO $ stToIO $ MBloom.insert bloom (coerce bs) + + -- let bloomIdxName = idxPath "filter" + -- bytes <- liftIO $ stToIO $ Bloom.freeze bloom + + -- liftIO $ UIO.withBinaryFileAtomic bloomIdxName WriteMode $ \wh -> do + -- LBS.hPutStr wh "puk" + -- LBS.hPutStr wh (serialise bytes) + -- LBS.writeFile (serialise b + -- for_ ss $ \sha1 -> do + -- let key = coerce @_ @N.ByteString sha1 + -- let value = coerce @_ @N.ByteString tx + -- -- notice $ pretty sha1 <+> pretty tx + -- writeSection ( LBS.fromChunks [key,value] ) (LBS.hPutStr wh) + +