This commit is contained in:
voidlizard 2024-12-29 09:30:08 +03:00
parent 2d98966ec6
commit a304510d02
1 changed files with 105 additions and 51 deletions

View File

@ -19,6 +19,7 @@ import HBS2.Data.Detect qualified as Detect
import HBS2.Storage import HBS2.Storage
import HBS2.Storage.Operations.Class import HBS2.Storage.Operations.Class
import HBS2.Storage.Operations.ByteString import HBS2.Storage.Operations.ByteString
import HBS2.Peer.Proto.RefLog
import HBS2.Peer.CLI.Detect import HBS2.Peer.CLI.Detect
import HBS2.Peer.RPC.Client import HBS2.Peer.RPC.Client
import HBS2.Peer.RPC.Client.Unix import HBS2.Peer.RPC.Client.Unix
@ -36,6 +37,7 @@ import HBS2.Data.Log.Structured
import HBS2.CLI.Run.Internal.Merkle (createTreeWithMetadata) import HBS2.CLI.Run.Internal.Merkle (createTreeWithMetadata)
import HBS2.CLI.Run.RefLog (getCredentialsForReflog,mkRefLogUpdateFrom)
import HBS2.System.Logger.Simple.ANSI as Exported import HBS2.System.Logger.Simple.ANSI as Exported
import HBS2.System.Dir import HBS2.System.Dir
@ -100,6 +102,7 @@ import System.Random hiding (next)
import System.IO.MMap (mmapFileByteString) import System.IO.MMap (mmapFileByteString)
import System.IO qualified as IO import System.IO qualified as IO
import System.IO (hPrint) import System.IO (hPrint)
import System.IO.Temp as Temp
import Data.Either import Data.Either
import Data.Coerce import Data.Coerce
@ -115,6 +118,11 @@ import UnliftIO.IO.File qualified as UIO
{- HLINT ignore "Eta reduce" -} {- HLINT ignore "Eta reduce" -}
defSegmentSize :: Int
defSegmentSize = 50 * 1024 * 1024
defCompressionLevel :: Int
defCompressionLevel = maxCLevel
type HBS2GitPerks m = (MonadUnliftIO m) type HBS2GitPerks m = (MonadUnliftIO m)
@ -159,6 +167,7 @@ data Git3Env =
, peerSocket :: FilePath , peerSocket :: FilePath
, peerStorage :: AnyStorage , peerStorage :: AnyStorage
, peerAPI :: ServiceCaller PeerAPI UNIX , peerAPI :: ServiceCaller PeerAPI UNIX
, reflogAPI :: ServiceCaller RefLogAPI UNIX
, gitRefLog :: TVar (Maybe GitRemoteKey) , gitRefLog :: TVar (Maybe GitRemoteKey)
, gitPackedSegmentSize :: TVar Int , gitPackedSegmentSize :: TVar Int
, gitCompressionLevel :: TVar Int , gitCompressionLevel :: TVar Int
@ -205,6 +214,7 @@ newtype Git3 (m :: Type -> Type) a = Git3M { fromGit3 :: ReaderT Git3Env m a }
, MonadIO , MonadIO
, MonadUnliftIO , MonadUnliftIO
, MonadReader Git3Env , MonadReader Git3Env
, MonadTrans
) )
type Git3Perks m = ( MonadIO m type Git3Perks m = ( MonadIO m
@ -218,11 +228,17 @@ instance MonadUnliftIO m => HasClientAPI PeerAPI UNIX (Git3 m) where
Git3Disconnected{} -> throwIO Git3PeerNotConnected Git3Disconnected{} -> throwIO Git3PeerNotConnected
Git3Connected{..} -> pure peerAPI Git3Connected{..} -> pure peerAPI
instance (MonadUnliftIO m, MonadReader Git3Env m) => HasClientAPI RefLogAPI UNIX m where
getClientAPI = do
ask >>= \case
Git3Disconnected{} -> throwIO Git3PeerNotConnected
Git3Connected{..} -> pure reflogAPI
nullGit3Env :: MonadIO m => m Git3Env nullGit3Env :: MonadIO m => m Git3Env
nullGit3Env = Git3Disconnected nullGit3Env = Git3Disconnected
<$> newTVarIO Nothing <$> newTVarIO Nothing
<*> newTVarIO ( 100 * 1024 * 1024 ) <*> newTVarIO defSegmentSize
<*> newTVarIO maxCLevel <*> newTVarIO defCompressionLevel
connectedDo :: (MonadIO m, MonadReader Git3Env m) => m a -> m a connectedDo :: (MonadIO m, MonadReader Git3Env m) => m a -> m a
connectedDo what = do connectedDo what = do
@ -279,10 +295,10 @@ recover m = fix \again -> do
let sto = AnyStorage (StorageClient storageAPI) let sto = AnyStorage (StorageClient storageAPI)
connected <- Git3Connected db soname sto peerAPI connected <- Git3Connected db soname sto peerAPI refLogAPI
<$> newTVarIO (Just ref) <$> newTVarIO (Just ref)
<*> newTVarIO (100 * 1024 * 1024 ) <*> newTVarIO defSegmentSize
<*> newTVarIO maxCLevel <*> newTVarIO defCompressionLevel
liftIO $ withGit3Env connected (evolveState >> again) liftIO $ withGit3Env connected (evolveState >> again)
@ -522,8 +538,8 @@ splitOpts def opts' = flip fix (mempty, opts) $ \go -> \case
data ECC = data ECC =
ECCInit ECCInit
| ECCWrite Int Handle Result | ECCWrite Int FilePath Handle Result
| ECCFinalize Bool Handle Result | ECCFinalize Bool FilePath Handle Result
class HasExportOpts m where class HasExportOpts m where
setPackedSegmedSize :: Int -> m () setPackedSegmedSize :: Int -> m ()
@ -1018,11 +1034,9 @@ theDict = do
let contents = Zlib.compressWith params o let contents = Zlib.compressWith params o
LBS.hPutStr fh contents LBS.hPutStr fh contents
entry $ bindMatch "test:git:export" $ nil_ $ \syn -> lift do entry $ bindMatch "test:git:export" $ nil_ $ \syn -> lift $ connectedDo do
let (opts, argz) = splitOpts [("--index",1),("--ref",1)] syn let (opts, argz) = splitOpts [("--index",1),("--ref",1)] syn
maxW <- getPackedSegmetSize
let useIndex = headMay [ f | ListVal [StringLike "--index", StringLike f] <- opts ] let useIndex = headMay [ f | ListVal [StringLike "--index", StringLike f] <- opts ]
let hd = headDef "HEAD" [ x | StringLike x <- argz] let hd = headDef "HEAD" [ x | StringLike x <- argz]
@ -1040,6 +1054,7 @@ theDict = do
level <- getCompressionLevel level <- getCompressionLevel
segment <- getPackedSegmetSize segment <- getPackedSegmetSize
env <- ask
let let
notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool
@ -1070,56 +1085,46 @@ theDict = do
tn <- getNumCapabilities tn <- getNumCapabilities
sourceQ <- newTBQueueIO (fromIntegral tn * 1024) sourceQ <- newTBQueueIO (fromIntegral tn * 1024)
hbs2Q <- newTBQueueIO @_ @(Maybe FilePath) 100
let write sz_ fh ss = do hbs2 <- liftIO $ async $ void $ withGit3Env env do
LBS.hPutStr fh ss sto <- getStorage
atomically $ modifyTVar sz_ (+ LBS.length ss) reflogAPI <- getClientAPI @RefLogAPI @UNIX
l <- lift $ async $ do reflog <- getGitRemoteKey
>>= orThrowUser "reflog not set"
flip fix ECCInit $ \loop -> \case lift $ fix \next -> atomically (readTBQueue hbs2Q) >>= \case
ECCInit -> do Nothing -> none
zstd <- ZstdS.compress level Just fn -> void $ flip runContT pure do
seed <- randomIO @Word16 ContT $ bracket none (const $ rm fn)
let fn = show $ "export-" <> pretty seed <> ".log" lift do
logFile <- IO.openBinaryFile fn WriteMode ts <- liftIO getPOSIXTime <&> round
debug $ red "NEW FILE" <+> pretty fn lbs <- LBS.readFile fn
loop $ ECCWrite 0 logFile zstd let meta = mempty
let gk = Nothing
href <- createTreeWithMetadata sto gk meta lbs >>= orThrowPassIO
writeLogEntry ("tree" <+> pretty ts <+> pretty href)
debug $ "SENDING" <+> pretty href <+> pretty fn
ECCWrite bnum fh sn | bnum >= maxW -> do let payload = pure $ LBS.toStrict $ serialise (AnnotatedHashRef Nothing href)
loop (ECCFinalize True fh sn) tx <- mkRefLogUpdateFrom (coerce reflog) payload
ECCWrite bnum fh sn -> do r <- callRpcWaitMay @RpcRefLogPost (TimeoutSec 2) reflogAPI tx
atomically (readTBQueue sourceQ) >>= \case >>= orThrowUser "rpc timeout"
Nothing -> loop (ECCFinalize False fh sn)
Just s -> do
lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat
sz_ <- newTVarIO 0 rm fn
next
sn1 <- writeCompressedChunkZstd (write sz_ fh) sn (Just lbs) link hbs2
sz <- readTVarIO sz_ <&> fromIntegral l <- lift (async (segmentWriter env bytes_ sourceQ hbs2Q) >>= \x -> link x >> pure x)
atomically $ modifyTVar bytes_ (+ fromIntegral sz)
loop (ECCWrite (bnum + sz) fh sn1)
ECCFinalize again fh sn -> do
void $ writeCompressedChunkZstd (write bytes_ fh) sn Nothing
hClose fh
when again $ loop ECCInit
link l
let chunkSize = if total > tn*2 then total `div` tn else total let chunkSize = if total > tn*2 then total `div` tn else total
let commitz = chunksOf chunkSize r let commitz = chunksOf chunkSize r
progress_ <- newTVarIO 0 progress_ <- newTVarIO 0
-- (pool, gitCatBatchQ) <- lift $ limitedResourceWorkerRequestQ tn startGitCat stopProcess
-- link pool
gitCatBatchQ <- contWorkerPool 16 do gitCatBatchQ <- contWorkerPool 16 do
che <- ContT withGitCat che <- ContT withGitCat
pure $ gitReadObjectMaybe che pure $ gitReadObjectMaybe che
@ -1216,20 +1221,69 @@ theDict = do
next (t1,b) next (t1,b)
mapM_ link workers mapM_ link workers
mapM_ wait workers mapM_ wait workers
atomically $ writeTBQueue sourceQ Nothing atomically do
writeTBQueue sourceQ Nothing
debug "writing refs" mapM_ wait [hbs2,l]
wait l where
writeLogEntry e = do
path <- getConfigPath <&> (</> "log")
touch path
liftIO (IO.appendFile path (show $ e <> line))
segmentWriter env bytes_ sourceQ hbs2Q = flip runReaderT env do
maxW <- getPackedSegmetSize
level <- getCompressionLevel
lift $ flip fix ECCInit $ \loop -> \case
ECCInit -> do
zstd <- ZstdS.compress level
fn <- emptySystemTempFile "hbs2-git-export"
logFile <- IO.openBinaryFile fn WriteMode
debug $ red "NEW FILE" <+> pretty fn
loop $ ECCWrite 0 fn logFile zstd
ECCWrite bnum fn fh sn | bnum >= maxW -> do
loop (ECCFinalize True fn fh sn)
ECCWrite bnum fn fh sn -> do
atomically (readTBQueue sourceQ) >>= \case
Nothing -> loop (ECCFinalize False fn fh sn)
Just s -> do
lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat
sz_ <- newTVarIO 0
sn1 <- writeCompressedChunkZstd (write sz_ fh) sn (Just lbs)
sz <- readTVarIO sz_ <&> fromIntegral
atomically $ modifyTVar bytes_ (+ fromIntegral sz)
loop (ECCWrite (bnum + sz) fn fh sn1)
ECCFinalize again fn fh sn -> do
void $ writeCompressedChunkZstd (write bytes_ fh) sn Nothing
hClose fh
atomically $ writeTBQueue hbs2Q (Just fn)
debug $ "POST SEGMENT" <+> pretty fn
when again $ loop ECCInit
atomically $ writeTBQueue hbs2Q Nothing
where
write sz_ fh ss = do
LBS.hPutStr fh ss
atomically $ modifyTVar sz_ (+ LBS.length ss)
contWorkerPool :: (MonadUnliftIO m) contWorkerPool :: (MonadUnliftIO m)
=> Int => Int
-> ContT () m (a -> m b) -> ContT () m (a -> m b)
-> ContT () m (a -> m b) -> ContT () m (a -> m b)
contWorkerPool n w = (fmap . fmap) join $ contWorkerPool' n w contWorkerPool n w = fmap join <$> contWorkerPool' n w
-- | здесь: a -> m (m b) -- | здесь: a -> m (m b)
-- первое m - чтобы задать вопрос -- первое m - чтобы задать вопрос