diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 0cd2982a..73b241f5 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -10,6 +10,7 @@ module Main where import HBS2.Git3.Prelude +import HBS2.Git3.State.Index import HBS2.Peer.CLI.Detect import HBS2.Peer.RPC.API.LWWRef @@ -306,29 +307,6 @@ readCommitChainHPSQ filt _ h0 action = flip runContT pure $ callCC \_ -> do Nothing -> (a, Just (n, HS.fromList p)) Just (l,s) -> (a, Just (min l n, s <> HS.fromList p)) -readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts, BytesReader m ) - => opts - -> ( GitHash -> Int -> ByteString -> m () ) - -> m Int - -readLogFileLBS _ action = flip fix 0 \go n -> do - done <- noBytesLeft - if done then pure n - else do - ssize <- readBytesMaybe 4 - >>= orThrow SomeReadLogError - <&> fromIntegral . N.word32 . LBS.toStrict - - hash <- readBytesMaybe 20 - >>= orThrow SomeReadLogError - <&> GitHash . BS.copy . LBS.toStrict - - sdata <- readBytesMaybe ( ssize - 20 ) - >>= orThrow SomeReadLogError - - void $ action hash (fromIntegral ssize) sdata - go (succ n) - readIndexFromFile :: forall m . MonadIO m => FilePath @@ -747,22 +725,24 @@ theDict = do _ -> throwIO (BadFormException @C nil) - entry $ bindMatch "test:git:log:index:flat:search:binary" $ nil_ $ \syn -> lift do + entry $ bindMatch "reflog:index:search" $ nil_ $ \syn -> lift $ flip runContT pure do + let (_, argz) = splitOpts [] syn - let argzz = [ x | StringLike x <- argz ] + hash <- headMay [ x | GitHashLike x <- argz ] & orThrowUser "need sha1" - hash <- headMay argzz - >>= fromStringMay @GitHash - & orThrowUser "no hash specified" + rq <- newTQueueIO - idxName <- headMay (tail argzz) - & orThrowUser "no index specified" + ContT $ withAsync $ startReflogIndexQueryQueue rq - file <- liftIO $ mmapFileByteString idxName Nothing - r <- liftIO $ binarySearchBS 56 (BS.take 20 . BS.drop 4) (coerce hash) file + answ_ <- newEmptyTMVarIO - liftIO $ print $ pretty r + atomically $ writeTQueue rq (hash,answ_) + + answ <- atomically $ readTMVar answ_ + + for_ answ $ \a -> do + liftIO $ print $ pretty a entry $ bindMatch "test:git:log:index:flat:search:linear:test" $ nil_ $ \case [ StringLike fn ] -> do @@ -1052,72 +1032,19 @@ theDict = do mergeSortedFilesN (BS.take 20) files out + entry $ bindMatch "reflog:index:path" $ nil_ $ const $ lift do + indexPath >>= liftIO . print . pretty + -- let entriesListOf lbs = S.toList_ $ runConsumeLBS lbs $ readSections $ \s ss -> do - entry $ bindMatch "test:git:reflog:index:files" $ nil_ $ \syn -> lift do - reflog <- getGitRemoteKey >>= orThrowUser "reflog not set" - idxPath <- getStatePath (AsBase58 reflog) <&> ( "index") - mkdir idxPath - idx <- dirFiles idxPath - for_ idx $ \f -> liftIO $ print $ pretty f + entry $ bindMatch "reflog:index:files" $ nil_ $ \syn -> lift do + files <- listObjectIndexFiles + cur <- pwd + for_ files $ \(f',s) -> do + let f = makeRelative cur f' + liftIO $ print $ fill 10 (pretty s) <+> pretty f - entry $ bindMatch "test:git:reflog:index" $ nil_ $ \syn -> lift $ connectedDo do - reflog <- getGitRemoteKey >>= orThrowUser "reflog not set" - - api <- getClientAPI @RefLogAPI @UNIX - - sto <- getStorage - - flip runContT pure do - - what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog - >>= orThrowUser "rpc timeout" - - what <- ContT $ maybe1 what' none - - idxPath <- getStatePath (AsBase58 reflog) <&> ( "index") - mkdir idxPath - - notice $ "STATE" <+> pretty idxPath - - sink <- S.toList_ do - walkMerkle (coerce what) (getBlock sto) $ \case - Left{} -> throwIO MissedBlockError - Right (hs :: [HashRef]) -> do - for_ hs $ \h -> void $ runMaybeT do - - tx <- getBlock sto (coerce h) - >>= toMPlus - - RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx - & toMPlus - - AnnotatedHashRef _ href <- deserialiseOrFail @AnnotatedHashRef (LBS.fromStrict _refLogUpdData) - & toMPlus - - -- FIXME: error logging - lbs <- liftIO (runExceptT (getTreeContents sto href)) - >>= orThrow MissedBlockError - - pieces <- S.toList_ do - void $ runConsumeLBS (ZstdL.decompress lbs) $ readLogFileLBS () $ \o s _ -> do - lift $ S.yield o - - lift $ S.yield (h, pieces) - - liftIO $ forConcurrently_ sink $ \(tx, pieces) -> do - idxName <- emptyTempFile idxPath "objects-.idx" - let ss = L.sort pieces - UIO.withBinaryFileAtomic idxName WriteMode $ \wh -> do - for_ ss $ \sha1 -> do - let key = coerce @_ @N.ByteString sha1 - let value = coerce @_ @N.ByteString tx - -- notice $ pretty sha1 <+> pretty tx - writeSection ( LBS.fromChunks [key,value] ) (LBS.hPutStr wh) - - -- files <- dirFiles idxPath - -- <&> filter ((== ".idx") . takeExtension) - -- out <- liftIO $ emptyTempFile idxPath "objects-.idx" - -- liftIO $ mergeSortedFilesN (LBS.take 20) files out + entry $ bindMatch "reflog:index:build" $ nil_ $ const $ lift $ connectedDo do + writeReflogIndex entry $ bindMatch "test:git:export" $ nil_ $ \syn -> lift $ connectedDo do let (opts, argz) = splitOpts [("--index",1),("--ref",1)] syn @@ -1438,26 +1365,6 @@ linearSearchLBS hash lbs = do pure $ listToMaybe found -binarySearchBS :: Monad m - => Int -- ^ record size - -> ( BS.ByteString -> BS.ByteString ) -- ^ key extractor - -> BS.ByteString -- ^ key - -> BS.ByteString -- ^ source - -> m (Maybe Int) - -binarySearchBS rs getKey s source = do - let maxn = BS.length source `div` rs - loop 0 maxn - where - loop l u | u <= l = pure Nothing - | otherwise = do - let e = getKey (BS.drop ( k * rs ) source) - case compare e s of - EQ -> pure $ Just (k * rs) - LT -> loop (k+1) u - GT -> loop l k - - where k = (l + u) `div` 2 -- debugPrefix :: LoggerEntry -> LoggerEntry debugPrefix = toStderr . logPrefix "[debug] " diff --git a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs index 23523b17..049fc483 100644 --- a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs +++ b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs @@ -206,3 +206,25 @@ writeCompressedStreamZstd stream source sink = do Nothing -> writeCompressedChunkZstd sink sn Nothing >> none Just lbs -> writeCompressedChunkZstd sink sn (Just lbs) >>= next + +binarySearchBS :: Monad m + => Int -- ^ record size + -> ( BS.ByteString -> BS.ByteString ) -- ^ key extractor + -> BS.ByteString -- ^ key + -> BS.ByteString -- ^ source + -> m (Maybe Int) + +binarySearchBS rs getKey s source = do + let maxn = BS.length source `div` rs + loop 0 maxn + where + loop l u | u <= l = pure Nothing + | otherwise = do + let e = getKey (BS.drop ( k * rs ) source) + case compare e s of + EQ -> pure $ Just (k * rs) + LT -> loop (k+1) u + GT -> loop l k + + where k = (l + u) `div` 2 + diff --git a/hbs2-git3/lib/HBS2/Git3/Prelude.hs b/hbs2-git3/lib/HBS2/Git3/Prelude.hs index 21a4c5b3..f6ca25d4 100644 --- a/hbs2-git3/lib/HBS2/Git3/Prelude.hs +++ b/hbs2-git3/lib/HBS2/Git3/Prelude.hs @@ -26,7 +26,7 @@ import HBS2.Storage as Exported import HBS2.Storage.Operations.Class as Exported import HBS2.System.Logger.Simple.ANSI as Exported -import HBS2.Git3.Types +import HBS2.Git3.Types as Exported -- TODO: about-to-remove import DBPipe.SQLite @@ -42,7 +42,9 @@ import Data.HashSet (HashSet(..)) import Data.HashSet qualified as HS import Data.Kind import System.Exit qualified as Q +import System.IO.MMap as Exported +import GHC.Natural as Exported import UnliftIO as Exported @@ -75,7 +77,9 @@ instance GitWritePacksOpts (HashSet GitWritePacksOptVal) where excludeParents o = not $ HS.member WriteFullPack o data Git3Exception = - Git3PeerNotConnected + Git3PeerNotConnected + | Git3ReflogNotSet + | Git3RpcTimeout deriving (Show,Typeable,Generic) instance Exception Git3Exception diff --git a/hbs2-git3/lib/HBS2/Git3/State/Index.hs b/hbs2-git3/lib/HBS2/Git3/State/Index.hs index 0fa4806a..95fa86e1 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Index.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Index.hs @@ -7,10 +7,14 @@ import HBS2.Git3.State.Types import HBS2.Data.Log.Structured +import Data.ByteString qualified as BS +import Data.ByteString.Lazy ( ByteString ) +import Data.ByteString.Lazy qualified as LBS import Data.List qualified as L import Network.ByteOrder qualified as N import System.IO.Temp as Temp import Data.ByteString.Lazy qualified as LBS +import Data.Maybe import Codec.Compression.Zstd.Lazy qualified as ZstdL import Streaming.Prelude qualified as S @@ -20,58 +24,151 @@ import UnliftIO import UnliftIO.IO.File qualified as UIO --- writeReflogIndex = do --- reflog <- getGitRemoteKey >>= orThrowUser "reflog not set" +readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts, BytesReader m ) + => opts + -> ( GitHash -> Int -> ByteString -> m () ) + -> m Int --- api <- getClientAPI @RefLogAPI @UNIX +readLogFileLBS _ action = flip fix 0 \go n -> do + done <- noBytesLeft + if done then pure n + else do + ssize <- readBytesMaybe 4 + >>= orThrow SomeReadLogError + <&> fromIntegral . N.word32 . LBS.toStrict --- sto <- getStorage + hash <- readBytesMaybe 20 + >>= orThrow SomeReadLogError + <&> GitHash . BS.copy . LBS.toStrict --- flip runContT pure do + sdata <- readBytesMaybe ( ssize - 20 ) + >>= orThrow SomeReadLogError --- what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog --- >>= orThrowUser "rpc timeout" + void $ action hash (fromIntegral ssize) sdata + go (succ n) --- what <- ContT $ maybe1 what' none +indexPath :: forall m . ( Git3Perks m + , MonadReader Git3Env m + , HasClientAPI PeerAPI UNIX m + , HasClientAPI RefLogAPI UNIX m + , HasStorage m + ) => m FilePath +indexPath = do + reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet + getStatePath (AsBase58 reflog) <&> ( "index") --- idxPath <- getStatePath (AsBase58 reflog) <&> ( "index") --- mkdir idxPath +listObjectIndexFiles :: forall m . ( Git3Perks m + , MonadReader Git3Env m + , HasClientAPI PeerAPI UNIX m + , HasClientAPI RefLogAPI UNIX m + , HasStorage m + ) => m [(FilePath, Natural)] --- notice $ "STATE" <+> pretty idxPath +listObjectIndexFiles = do + path <- indexPath + dirFiles path + <&> filter ( ("objects*.idx" ?==) . takeFileName ) + >>= \fs -> for fs $ \f -> do + z <- fileSize f <&> fromIntegral + pure (f,z) --- sink <- S.toList_ do --- walkMerkle (coerce what) (getBlock sto) $ \case --- Left{} -> throwIO MissedBlockError --- Right (hs :: [HashRef]) -> do --- for_ hs $ \h -> void $ runMaybeT do +startReflogIndexQueryQueue :: forall m . ( Git3Perks m + , MonadReader Git3Env m + , HasClientAPI PeerAPI UNIX m + , HasClientAPI RefLogAPI UNIX m + , HasStorage m + ) + => TQueue (GitHash, TMVar [HashRef]) + -> m () --- tx <- getBlock sto (coerce h) --- >>= toMPlus +startReflogIndexQueryQueue rq = flip runContT pure do + files <- lift $ listObjectIndexFiles <&> fmap fst --- RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx --- & toMPlus + -- один файл - не более, чем один поток + -- мапим файлы + -- возвращаем функцию запроса? + -- для каждого файла -- мы создаём отдельную очередь, + -- нам надо искать во всех файлах --- AnnotatedHashRef _ href <- deserialiseOrFail @AnnotatedHashRef (LBS.fromStrict _refLogUpdData) --- & toMPlus + mmaped <- liftIO $ for files (liftIO . flip mmapFileByteString Nothing) --- -- FIXME: error logging --- lbs <- liftIO (runExceptT (getTreeContents sto href)) --- >>= orThrow MissedBlockError + forever $ liftIO do + (githash, answ) <- atomically $ readTQueue rq --- pieces <- S.toList_ do --- void $ runConsumeLBS (ZstdL.decompress lbs) $ readLogFileLBS () $ \o s _ -> do --- lift $ S.yield o + let s = coerce githash --- lift $ S.yield (h, pieces) + found <- forConcurrently mmaped $ \bs -> runMaybeT do + -- FIXME: size-hardcodes + w <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) s bs + >>= toMPlus + + let v = BS.drop ( w * 56 ) bs & BS.take 32 + + pure $ coerce @_ @HashRef v + + atomically $ writeTMVar answ ( catMaybes found ) + + +writeReflogIndex :: forall m . ( Git3Perks m + , MonadReader Git3Env m + , HasClientAPI PeerAPI UNIX m + , HasClientAPI RefLogAPI UNIX m + , HasStorage m + ) => m () +writeReflogIndex = do + + reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet + + api <- getClientAPI @RefLogAPI @UNIX + + sto <- getStorage + + flip runContT pure do + + what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog + >>= orThrow Git3RpcTimeout + + what <- ContT $ maybe1 what' none + + idxPath <- lift indexPath + mkdir idxPath + + notice $ "STATE" <+> pretty idxPath + + sink <- S.toList_ do + walkMerkle (coerce what) (getBlock sto) $ \case + Left{} -> throwIO MissedBlockError + Right (hs :: [HashRef]) -> do + for_ hs $ \h -> void $ runMaybeT do + + tx <- getBlock sto (coerce h) + >>= toMPlus + + RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx + & toMPlus + + AnnotatedHashRef _ href <- deserialiseOrFail @AnnotatedHashRef (LBS.fromStrict _refLogUpdData) + & toMPlus + + -- FIXME: error logging + lbs <- liftIO (runExceptT (getTreeContents sto href)) + >>= orThrow MissedBlockError + + pieces <- S.toList_ do + void $ runConsumeLBS (ZstdL.decompress lbs) $ readLogFileLBS () $ \o s _ -> do + lift $ S.yield o + + lift $ S.yield (h, pieces) + + liftIO $ forConcurrently_ sink $ \(tx, pieces) -> do + idxName <- emptyTempFile idxPath "objects-.idx" + let ss = L.sort pieces + UIO.withBinaryFileAtomic idxName WriteMode $ \wh -> do + for_ ss $ \sha1 -> do + let key = coerce @_ @N.ByteString sha1 + let value = coerce @_ @N.ByteString tx + -- notice $ pretty sha1 <+> pretty tx + writeSection ( LBS.fromChunks [key,value] ) (LBS.hPutStr wh) --- liftIO $ forConcurrently_ sink $ \(tx, pieces) -> do --- idxName <- emptyTempFile idxPath "objects-.idx" --- let ss = L.sort pieces --- UIO.withBinaryFileAtomic idxName WriteMode $ \wh -> do --- for_ ss $ \sha1 -> do --- let key = coerce @_ @N.ByteString sha1 --- let value = coerce @_ @N.ByteString tx --- -- notice $ pretty sha1 <+> pretty tx --- writeSection ( LBS.fromChunks [key,value] ) (LBS.hPutStr wh) diff --git a/hbs2-git3/lib/HBS2/Git3/Types.hs b/hbs2-git3/lib/HBS2/Git3/Types.hs index e28e4043..fee71c67 100644 --- a/hbs2-git3/lib/HBS2/Git3/Types.hs +++ b/hbs2-git3/lib/HBS2/Git3/Types.hs @@ -1,8 +1,11 @@ -module HBS2.Git3.Types where +module HBS2.Git3.Types + ( module HBS2.Git3.Types + , module Exported + ) where import HBS2.Prelude.Plated import HBS2.Net.Auth.Credentials -import HBS2.Git.Local +import HBS2.Git.Local as Exported type GitRemoteKey = PubKey 'Sign 'HBS2Basic