module HBS2.Git3.State.Index where import HBS2.Git3.Prelude import HBS2.System.Dir import HBS2.CLI.Run.Internal.Merkle (getTreeContents) import HBS2.Git3.State.Types import HBS2.Data.Log.Structured import Data.ByteString qualified as BS import Data.ByteString.Lazy ( ByteString ) import Data.ByteString.Lazy qualified as LBS import Data.List qualified as L import Network.ByteOrder qualified as N import System.IO.Temp as Temp import Data.ByteString.Lazy qualified as LBS import Data.Maybe import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM import Control.Concurrent.STM qualified as STM import Codec.Compression.Zstd.Lazy qualified as ZstdL import Streaming.Prelude qualified as S import Streaming hiding (run,chunksOf) import UnliftIO import UnliftIO.IO.File qualified as UIO readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts, BytesReader m ) => opts -> ( GitHash -> Int -> ByteString -> m () ) -> m Int readLogFileLBS _ action = flip fix 0 \go n -> do done <- noBytesLeft if done then pure n else do ssize <- readBytesMaybe 4 >>= orThrow SomeReadLogError <&> fromIntegral . N.word32 . LBS.toStrict hash <- readBytesMaybe 20 >>= orThrow SomeReadLogError <&> GitHash . BS.copy . LBS.toStrict sdata <- readBytesMaybe ( ssize - 20 ) >>= orThrow SomeReadLogError void $ action hash (fromIntegral ssize) sdata go (succ n) indexPath :: forall m . ( Git3Perks m , MonadReader Git3Env m ) => m FilePath indexPath = do reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet getStatePath (AsBase58 reflog) <&> ( "index") listObjectIndexFiles :: forall m . ( Git3Perks m , MonadReader Git3Env m ) => m [(FilePath, Natural)] listObjectIndexFiles = do path <- indexPath dirFiles path <&> filter ( ("objects*.idx" ?==) . takeFileName ) >>= \fs -> for fs $ \f -> do z <- fileSize f <&> fromIntegral pure (f,z) enumEntries :: forall m . ( Git3Perks m , MonadReader Git3Env m ) => ( BS.ByteString -> m () ) -> m () enumEntries action = do files <- listObjectIndexFiles <&> fmap fst forConcurrently_ files $ \f -> do bs <- liftIO $ mmapFileByteString f Nothing scanBS bs action startReflogIndexQueryQueue :: forall a m . ( Git3Perks m , MonadReader Git3Env m , HasClientAPI PeerAPI UNIX m , HasClientAPI RefLogAPI UNIX m , HasStorage m ) => TQueue (BS.ByteString, BS.ByteString -> a, TMVar (Maybe a)) -> m () startReflogIndexQueryQueue rq = flip runContT pure do files <- lift $ listObjectIndexFiles <&> fmap fst -- один файл - не более, чем один поток -- мапим файлы -- возвращаем функцию запроса? -- для каждого файла -- мы создаём отдельную очередь, -- нам надо искать во всех файлах mmaped <- liftIO $ for files (liftIO . flip mmapFileByteString Nothing) forever $ liftIO do requests <- atomically do _ <- peekTQueue rq STM.flushTQueue rq for_ requests $ \(s,f,answ) -> do found <- for mmaped $ \bs -> runMaybeT do -- FIXME: size-hardcodes w <- binarySearchBS 56 ( BS.take 20 . BS.drop 4 ) s bs >>= toMPlus let v = BS.drop ( w * 56 ) bs pure $ f v let what = headMay (catMaybes found) atomically $ writeTMVar answ what writeReflogIndex :: forall m . ( Git3Perks m , MonadReader Git3Env m , HasClientAPI PeerAPI UNIX m , HasClientAPI RefLogAPI UNIX m , HasStorage m ) => m () writeReflogIndex = do reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet api <- getClientAPI @RefLogAPI @UNIX sto <- getStorage flip runContT pure do what' <- lift $ callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) api reflog >>= orThrow Git3RpcTimeout what <- ContT $ maybe1 what' none idxPath <- lift indexPath mkdir idxPath notice $ "STATE" <+> pretty idxPath sink <- S.toList_ do walkMerkle (coerce what) (getBlock sto) $ \case Left{} -> throwIO MissedBlockError Right (hs :: [HashRef]) -> do for_ hs $ \h -> void $ runMaybeT do tx <- getBlock sto (coerce h) >>= toMPlus RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx & toMPlus AnnotatedHashRef _ href <- deserialiseOrFail @AnnotatedHashRef (LBS.fromStrict _refLogUpdData) & toMPlus -- FIXME: error logging lbs <- liftIO (runExceptT (getTreeContents sto href)) >>= orThrow MissedBlockError pieces <- S.toList_ do void $ runConsumeLBS (ZstdL.decompress lbs) $ readLogFileLBS () $ \o s _ -> do lift $ S.yield o lift $ S.yield (h, pieces) liftIO $ forConcurrently_ sink $ \(tx, pieces) -> do idxName <- emptyTempFile idxPath "objects-.idx" let ss = L.sort pieces UIO.withBinaryFileAtomic idxName WriteMode $ \wh -> do for_ ss $ \sha1 -> do let key = coerce @_ @N.ByteString sha1 let value = coerce @_ @N.ByteString tx -- notice $ pretty sha1 <+> pretty tx writeSection ( LBS.fromChunks [key,value] ) (LBS.hPutStr wh)