From a304510d027b88a81f68798ae5d683e65ac31903 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 29 Dec 2024 09:30:08 +0300 Subject: [PATCH] wip --- hbs2-git3/app/Main.hs | 156 ++++++++++++++++++++++++++++-------------- 1 file changed, 105 insertions(+), 51 deletions(-) diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 63f3e7c5..bbc6efe4 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -19,6 +19,7 @@ import HBS2.Data.Detect qualified as Detect import HBS2.Storage import HBS2.Storage.Operations.Class import HBS2.Storage.Operations.ByteString +import HBS2.Peer.Proto.RefLog import HBS2.Peer.CLI.Detect import HBS2.Peer.RPC.Client import HBS2.Peer.RPC.Client.Unix @@ -36,6 +37,7 @@ import HBS2.Data.Log.Structured import HBS2.CLI.Run.Internal.Merkle (createTreeWithMetadata) +import HBS2.CLI.Run.RefLog (getCredentialsForReflog,mkRefLogUpdateFrom) import HBS2.System.Logger.Simple.ANSI as Exported import HBS2.System.Dir @@ -100,6 +102,7 @@ import System.Random hiding (next) import System.IO.MMap (mmapFileByteString) import System.IO qualified as IO import System.IO (hPrint) +import System.IO.Temp as Temp import Data.Either import Data.Coerce @@ -115,6 +118,11 @@ import UnliftIO.IO.File qualified as UIO {- HLINT ignore "Eta reduce" -} +defSegmentSize :: Int +defSegmentSize = 50 * 1024 * 1024 + +defCompressionLevel :: Int +defCompressionLevel = maxCLevel type HBS2GitPerks m = (MonadUnliftIO m) @@ -159,6 +167,7 @@ data Git3Env = , peerSocket :: FilePath , peerStorage :: AnyStorage , peerAPI :: ServiceCaller PeerAPI UNIX + , reflogAPI :: ServiceCaller RefLogAPI UNIX , gitRefLog :: TVar (Maybe GitRemoteKey) , gitPackedSegmentSize :: TVar Int , gitCompressionLevel :: TVar Int @@ -205,6 +214,7 @@ newtype Git3 (m :: Type -> Type) a = Git3M { fromGit3 :: ReaderT Git3Env m a } , MonadIO , MonadUnliftIO , MonadReader Git3Env + , MonadTrans ) type Git3Perks m = ( MonadIO m @@ -218,11 +228,17 @@ instance MonadUnliftIO m => HasClientAPI PeerAPI UNIX (Git3 m) where Git3Disconnected{} -> throwIO Git3PeerNotConnected Git3Connected{..} -> pure peerAPI +instance (MonadUnliftIO m, MonadReader Git3Env m) => HasClientAPI RefLogAPI UNIX m where + getClientAPI = do + ask >>= \case + Git3Disconnected{} -> throwIO Git3PeerNotConnected + Git3Connected{..} -> pure reflogAPI + nullGit3Env :: MonadIO m => m Git3Env nullGit3Env = Git3Disconnected <$> newTVarIO Nothing - <*> newTVarIO ( 100 * 1024 * 1024 ) - <*> newTVarIO maxCLevel + <*> newTVarIO defSegmentSize + <*> newTVarIO defCompressionLevel connectedDo :: (MonadIO m, MonadReader Git3Env m) => m a -> m a connectedDo what = do @@ -279,10 +295,10 @@ recover m = fix \again -> do let sto = AnyStorage (StorageClient storageAPI) - connected <- Git3Connected db soname sto peerAPI + connected <- Git3Connected db soname sto peerAPI refLogAPI <$> newTVarIO (Just ref) - <*> newTVarIO (100 * 1024 * 1024 ) - <*> newTVarIO maxCLevel + <*> newTVarIO defSegmentSize + <*> newTVarIO defCompressionLevel liftIO $ withGit3Env connected (evolveState >> again) @@ -522,8 +538,8 @@ splitOpts def opts' = flip fix (mempty, opts) $ \go -> \case data ECC = ECCInit - | ECCWrite Int Handle Result - | ECCFinalize Bool Handle Result + | ECCWrite Int FilePath Handle Result + | ECCFinalize Bool FilePath Handle Result class HasExportOpts m where setPackedSegmedSize :: Int -> m () @@ -1018,11 +1034,9 @@ theDict = do let contents = Zlib.compressWith params o LBS.hPutStr fh contents - entry $ bindMatch "test:git:export" $ nil_ $ \syn -> lift do + entry $ bindMatch "test:git:export" $ nil_ $ \syn -> lift $ connectedDo do let (opts, argz) = splitOpts [("--index",1),("--ref",1)] syn - maxW <- getPackedSegmetSize - let useIndex = headMay [ f | ListVal [StringLike "--index", StringLike f] <- opts ] let hd = headDef "HEAD" [ x | StringLike x <- argz] @@ -1038,8 +1052,9 @@ theDict = do _already <- newTVarIO mempty - level <- getCompressionLevel + level <- getCompressionLevel segment <- getPackedSegmetSize + env <- ask let notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool @@ -1070,56 +1085,46 @@ theDict = do tn <- getNumCapabilities sourceQ <- newTBQueueIO (fromIntegral tn * 1024) + hbs2Q <- newTBQueueIO @_ @(Maybe FilePath) 100 - let write sz_ fh ss = do - LBS.hPutStr fh ss - atomically $ modifyTVar sz_ (+ LBS.length ss) + hbs2 <- liftIO $ async $ void $ withGit3Env env do + sto <- getStorage + reflogAPI <- getClientAPI @RefLogAPI @UNIX - l <- lift $ async $ do + reflog <- getGitRemoteKey + >>= orThrowUser "reflog not set" - flip fix ECCInit $ \loop -> \case - ECCInit -> do - zstd <- ZstdS.compress level - seed <- randomIO @Word16 - let fn = show $ "export-" <> pretty seed <> ".log" - logFile <- IO.openBinaryFile fn WriteMode - debug $ red "NEW FILE" <+> pretty fn - loop $ ECCWrite 0 logFile zstd + lift $ fix \next -> atomically (readTBQueue hbs2Q) >>= \case + Nothing -> none + Just fn -> void $ flip runContT pure do + ContT $ bracket none (const $ rm fn) + lift do + ts <- liftIO getPOSIXTime <&> round + lbs <- LBS.readFile fn + let meta = mempty + let gk = Nothing + href <- createTreeWithMetadata sto gk meta lbs >>= orThrowPassIO + writeLogEntry ("tree" <+> pretty ts <+> pretty href) + debug $ "SENDING" <+> pretty href <+> pretty fn - ECCWrite bnum fh sn | bnum >= maxW -> do - loop (ECCFinalize True fh sn) + let payload = pure $ LBS.toStrict $ serialise (AnnotatedHashRef Nothing href) + tx <- mkRefLogUpdateFrom (coerce reflog) payload - ECCWrite bnum fh sn -> do - atomically (readTBQueue sourceQ) >>= \case - Nothing -> loop (ECCFinalize False fh sn) - Just s -> do - lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat + r <- callRpcWaitMay @RpcRefLogPost (TimeoutSec 2) reflogAPI tx + >>= orThrowUser "rpc timeout" - sz_ <- newTVarIO 0 + rm fn + next - sn1 <- writeCompressedChunkZstd (write sz_ fh) sn (Just lbs) + link hbs2 - sz <- readTVarIO sz_ <&> fromIntegral - atomically $ modifyTVar bytes_ (+ fromIntegral sz) - - loop (ECCWrite (bnum + sz) fh sn1) - - ECCFinalize again fh sn -> do - void $ writeCompressedChunkZstd (write bytes_ fh) sn Nothing - hClose fh - when again $ loop ECCInit - - link l + l <- lift (async (segmentWriter env bytes_ sourceQ hbs2Q) >>= \x -> link x >> pure x) let chunkSize = if total > tn*2 then total `div` tn else total let commitz = chunksOf chunkSize r progress_ <- newTVarIO 0 - -- (pool, gitCatBatchQ) <- lift $ limitedResourceWorkerRequestQ tn startGitCat stopProcess - - -- link pool - gitCatBatchQ <- contWorkerPool 16 do che <- ContT withGitCat pure $ gitReadObjectMaybe che @@ -1216,20 +1221,69 @@ theDict = do next (t1,b) + mapM_ link workers mapM_ wait workers - atomically $ writeTBQueue sourceQ Nothing + atomically do + writeTBQueue sourceQ Nothing - debug "writing refs" + mapM_ wait [hbs2,l] - wait l + where + + writeLogEntry e = do + path <- getConfigPath <&> ( "log") + touch path + liftIO (IO.appendFile path (show $ e <> line)) + + segmentWriter env bytes_ sourceQ hbs2Q = flip runReaderT env do + maxW <- getPackedSegmetSize + level <- getCompressionLevel + lift $ flip fix ECCInit $ \loop -> \case + ECCInit -> do + zstd <- ZstdS.compress level + fn <- emptySystemTempFile "hbs2-git-export" + logFile <- IO.openBinaryFile fn WriteMode + debug $ red "NEW FILE" <+> pretty fn + loop $ ECCWrite 0 fn logFile zstd + + ECCWrite bnum fn fh sn | bnum >= maxW -> do + loop (ECCFinalize True fn fh sn) + + ECCWrite bnum fn fh sn -> do + atomically (readTBQueue sourceQ) >>= \case + Nothing -> loop (ECCFinalize False fn fh sn) + Just s -> do + lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat + + sz_ <- newTVarIO 0 + + sn1 <- writeCompressedChunkZstd (write sz_ fh) sn (Just lbs) + + sz <- readTVarIO sz_ <&> fromIntegral + atomically $ modifyTVar bytes_ (+ fromIntegral sz) + + loop (ECCWrite (bnum + sz) fn fh sn1) + + ECCFinalize again fn fh sn -> do + void $ writeCompressedChunkZstd (write bytes_ fh) sn Nothing + hClose fh + atomically $ writeTBQueue hbs2Q (Just fn) + debug $ "POST SEGMENT" <+> pretty fn + when again $ loop ECCInit + atomically $ writeTBQueue hbs2Q Nothing + + where + write sz_ fh ss = do + LBS.hPutStr fh ss + atomically $ modifyTVar sz_ (+ LBS.length ss) contWorkerPool :: (MonadUnliftIO m) => Int -> ContT () m (a -> m b) -> ContT () m (a -> m b) -contWorkerPool n w = (fmap . fmap) join $ contWorkerPool' n w +contWorkerPool n w = fmap join <$> contWorkerPool' n w -- | здесь: a -> m (m b) -- первое m - чтобы задать вопрос