diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index eeec45be..5a1c4743 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -33,6 +33,7 @@ import HBS2.Git3.Types import HBS2.Git3.State.Direct import HBS2.Git3.Config.Local import HBS2.Git3.Git +import HBS2.Git3.Export import Data.Config.Suckless.Script import Data.Config.Suckless.Script.File @@ -262,59 +263,6 @@ instance (Ord k, Hashable k) => Cached (CacheFixedHPSQ k v) k v where pure v -data HCC = - HCC { hccHeight :: Int - , hccRest :: [GitHash] - , hccResult :: HashPSQ GitHash Int (HashSet GitHash) - } - -readCommitChainHPSQ :: ( HBS2GitPerks m - , MonadUnliftIO m - , MonadReader Git3Env m - , HasStorage m - ) - => (GitHash -> m Bool) - -> Maybe GitRef - -> GitHash - -> (GitHash -> m ()) - -> m (HashPSQ GitHash Int (HashSet GitHash)) - -readCommitChainHPSQ filt _ h0 action = flip runContT pure $ callCC \_ -> do - theReader <- ContT $ withGitCat - void $ ContT $ bracket (pure theReader) dontHandle -- stopProcess - flip fix (HCC 0 [h0] HPSQ.empty) $ \next -> \case - - HCC _ [] result -> pure result - - HCC n ( h : hs ) result | HPSQ.member h result -> do - next ( HCC n hs result ) - - HCC n ( h : hs ) result -> do - - done <- not <$> lift (filt h) - - if done then next ( HCC n hs result ) else do - - co <- gitReadObjectMaybe theReader h - >>= orThrow(GitReadError $ show $ pretty "object not found" <+> pretty h) - - parents <- gitReadCommitParents (Just h) (snd co) - - lift $ action h - next $ HCC (n-1) ( parents <> hs ) (snd $ HPSQ.alter (addParents () n parents) h result ) - - - where - addParents :: a - -> Int - -> [GitHash] - -> Maybe (Int, HashSet GitHash) - -> (a, Maybe (Int, HashSet GitHash)) - - addParents a n p = \case - Nothing -> (a, Just (n, HS.fromList p)) - Just (l,s) -> (a, Just (min l n, s <> HS.fromList p)) - readIndexFromFile :: forall m . MonadIO m => FilePath @@ -340,67 +288,6 @@ readIndexFromFile fname = do pure $ HS.fromList r --- FIXME: move-to-suckless-script -splitOpts :: [(Id,Int)] - -> [Syntax C] - -> ([Syntax C], [Syntax C]) - -splitOpts def opts' = flip fix (mempty, opts) $ \go -> \case - (acc, []) -> acc - ( (o,a), r@(StringLike x) : rs ) -> do - case HM.lookup (fromString x) omap of - Nothing -> go ((o, a <> [r]), rs) - Just n -> do - let (w, rest) = L.splitAt n rs - let result = mkList @C ( r : w ) - go ( (o <> [result], a), rest ) - ( (o,a), r : rs ) -> do - go ((o, a <> [r]), rs) - - where - omap = HM.fromList [ (p, x) | (p,x) <- def ] - opts = opts' - -data ECC = - ECCInit - | ECCWrite Int FilePath Handle Result - | ECCFinalize Int Bool FilePath Handle Result - - - -mergeSortedFiles :: forall m . MonadUnliftIO m - => (ByteString -> ByteString) - -> FilePath - -> FilePath - -> FilePath - -> m () - -mergeSortedFiles getKey file1 file2 outFile = do - l1 <- parseFile file1 - l2 <- parseFile file2 - - UIO.withBinaryFileAtomic outFile WriteMode $ \hOut -> - mergeEntries l1 l2 getKey (\s -> writeSection s (liftIO . LBS.hPutStr hOut)) - - mapM_ rm [file1, file2] - - where - parseFile :: FilePath -> m [ByteString] - parseFile path = do - lbs <- liftIO $ LBS.readFile path - S.toList_ $ runConsumeLBS lbs $ readSections $ \_ sdata -> lift $ S.yield sdata - - mergeEntries :: [ByteString] - -> [ByteString] - -> (ByteString -> ByteString) - -> (ByteString -> m ()) -> m () - - mergeEntries [] ys _ write = mapM_ write ys - mergeEntries xs [] _ write = mapM_ write xs - mergeEntries (x:xs) (y:ys) extractKey write - | extractKey x <= extractKey y = write x >> mergeEntries xs (y:ys) extractKey write - | otherwise = write y >> mergeEntries (x:xs) ys extractKey write - theDict :: forall m . ( HBS2GitPerks m @@ -517,15 +404,6 @@ theDict = do r <- callRpcWaitRetry @RpcPoke (TimeoutSec 0.5) 2 peer () >>= orThrowUser "hbs2-peer not found" notice $ pretty r - entry $ bindMatch "test:git:read-commits" $ nil_ $ \syn -> do - let hdr = headDef "HEAD" [ w | StringLike w <- syn ] :: String - - commits <- gitRunCommand [qc|git rev-list -100000 {hdr}|] - >>= orThrowPassIO - <&> mapMaybe (fromStringMay @GitHash . LBS8.unpack) . LBS8.lines - - liftIO $ print $ pretty $ length commits - entry $ bindMatch "test:git:hash:blob" $ nil_ $ const $ liftIO do co <- LBS.hGetContents stdin print $ pretty $ gitHashBlobPure co @@ -1327,285 +1205,7 @@ theDict = do notice $ pretty tree none - entry $ bindMatch "reflog:export" $ nil_ $ \syn -> lift $ connectedDo do - let (opts, argz) = splitOpts [("--dry",0),("--ref",1)] syn - - let dry = or [ True | ListVal [StringLike "--dry"] <- opts ] - - let hd = headDef "HEAD" [ x | StringLike x <- argz] - h <- gitRevParseThrow hd - - let refs = [ gitNormaliseRef (fromString x) - | ListVal [StringLike "--ref", StringLike x] <- opts - ] - - updateReflogIndex - - idx <- openIndex - - _already <- newTVarIO ( mempty :: HashSet GitHash ) - _exported <- newTVarIO 0 - - enumEntries idx $ \bs -> do - atomically $ modifyTVar _already (HS.insert (coerce $ BS.take 20 bs)) - - level <- getCompressionLevel - segment <- getPackedSegmetSize - env <- ask - - let - notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool - notWrittenYet x = do - already <- readTVarIO _already <&> HS.member x - pure (not already) -- && not alsoInIdx) - - hpsq <- readCommitChainHPSQ notWrittenYet Nothing h (\c -> debug $ "commit" <+> pretty c) - - let r = HPSQ.toList hpsq - & sortBy (comparing (view _2)) - & fmap (view _1) - - let total = HPSQ.size hpsq - bytes_ <- newTVarIO 0 - - debug $ "TOTAL" <+> pretty total - - liftIO $ flip runContT pure do - - tn <- getNumCapabilities - - sourceQ <- newTBQueueIO (fromIntegral tn * 1024) - hbs2Q <- newTBQueueIO @_ @(Maybe (FilePath, Int)) 100 - - hbs2 <- liftIO $ async $ void $ withGit3Env env do - sto <- getStorage - reflogAPI <- getClientAPI @RefLogAPI @UNIX - - reflog <- getGitRemoteKey - >>= orThrowUser "reflog not set" - - lift $ fix \next -> atomically (readTBQueue hbs2Q) >>= \case - Nothing -> none - Just (fn,_) -> void $ flip runContT pure do - ContT $ bracket none (const $ rm fn) - lift do - ts <- liftIO getPOSIXTime <&> round - lbs <- LBS.readFile fn - let meta = mempty - let gk = Nothing - - exported <- readTVarIO _exported - debug $ red "EXPORTED" <+> pretty exported - - when (not dry && exported > 0) do - href <- createTreeWithMetadata sto gk meta lbs >>= orThrowPassIO - writeLogEntry ("tree" <+> pretty ts <+> pretty href) - debug $ "SENDING" <+> pretty href <+> pretty fn - - let payload = pure $ LBS.toStrict $ serialise (AnnotatedHashRef Nothing href) - tx <- mkRefLogUpdateFrom (coerce reflog) payload - - callRpcWaitMay @RpcRefLogPost (TimeoutSec 2) reflogAPI tx - >>= orThrowUser "rpc timeout" - - rm fn - next - - link hbs2 - - l <- lift (async (segmentWriter env bytes_ sourceQ hbs2Q) >>= \x -> link x >> pure x) - - let chunkSize = if total > tn*2 then total `div` tn else total - let commitz = chunksOf chunkSize r - - progress_ <- newTVarIO 0 - - gitCatBatchQ <- contWorkerPool 16 do - che <- ContT withGitCat - pure $ gitReadObjectMaybe che - - -- void $ ContT $ bracket (pure pool) cancel - - let lastCommit = last r - - workers <- lift $ forM (zip [0..] commitz) $ \(i,chunk) -> async $ flip runContT pure do - - -- let gitCatBatchQ commit = gitReadObjectMaybe theReader commit - - for_ chunk \commit -> do - - atomically $ modifyTVar progress_ succ - - (_,self) <- lift (gitCatBatchQ commit) - >>= orThrow (GitReadError (show $ pretty commit)) - - tree <- gitReadCommitTree self - - hashes <- gitReadTreeObjectsOnly commit - <&> ([commit,tree]<>) - >>= filterM notWrittenYet - - for_ hashes $ \gh -> do - atomically do - modifyTVar _already (HS.insert gh) - modifyTVar _exported succ - - -- debug $ "object" <+> pretty gh - (_t,lbs) <- lift (gitCatBatchQ gh) - >>= orThrow (GitReadError (show $ pretty gh)) - - let e = [ Builder.byteString (coerce gh) - , Builder.char8 (headDef 'B' $ show $ pretty $ Short _t) - , Builder.lazyByteString lbs - ] & Builder.toLazyByteString . mconcat - - atomically do - writeTBQueue sourceQ (Just e) - - when (commit == lastCommit) do - - ts <- liftIO $ getPOSIXTime <&> round - - let brefs = [ LBS8.pack (show $ pretty ts <+> pretty commit <+> pretty x) - | x <- refs - ] & LBS8.unlines - - let sha1 = gitHashBlobPure brefs - - debug $ green "THIS IS THE LAST COMMIT BLOCK" <+> pretty commit <+> "ADDING REF INFO" <+> pretty sha1 - - let e = [ Builder.byteString (coerce sha1) - , Builder.char8 'R' - , Builder.lazyByteString brefs - ] & Builder.toLazyByteString . mconcat - - atomically do - writeTBQueue sourceQ (Just e) - - t0 <- getTimeCoarse - ContT $ withAsync $ do - - liftIO $ hPrint stderr $ - "segment" <+> pretty segment <> comma - <> "compression level" <+> pretty level - - flip fix (t0,0) $ \next (tPrev,bytesPrev) -> do - - pause @'Seconds 1 - - p <- readTVarIO progress_ - b <- readTVarIO bytes_ - - let pp = fromIntegral p / (fromIntegral total :: Double) * 100 - & realToFrac @_ @(Fixed E2) - - t1 <- getTimeCoarse - - let dt = realToFrac @_ @Double (t1 - tPrev) * 1e-9 - & realToFrac @_ @(Fixed E2) - - let tspent = realToFrac (t1 - t0) * 1e-9 & realToFrac @_ @(Fixed E2) - - let mbytes = realToFrac b / 1024/1024 & realToFrac @_ @(Fixed E2) - - let dbdt = mbytes / tspent - - liftIO $ IO.hPutStr stderr $ show $ - " \r" - <+> pretty tspent <> "s" - <+> pretty mbytes <> "mb" - <+> pretty dbdt <> "mbs" - <+> pretty pp <> "%" - - next (t1,b) - - - mapM_ link workers - mapM_ wait workers - - atomically do - writeTBQueue sourceQ Nothing - - mapM_ wait [hbs2,l] - - where - - writeLogEntry e = do - path <- getConfigPath <&> ( "log") - touch path - liftIO (IO.appendFile path (show $ e <> line)) - - segmentWriter env bytes_ sourceQ hbs2Q = flip runReaderT env do - maxW <- getPackedSegmetSize - level <- getCompressionLevel - lift $ flip fix ECCInit $ \loop -> \case - ECCInit -> do - zstd <- ZstdS.compress level - fn <- emptySystemTempFile "hbs2-git-export" - logFile <- IO.openBinaryFile fn WriteMode - debug $ red "NEW FILE" <+> pretty fn - loop $ ECCWrite 0 fn logFile zstd - - ECCWrite bnum fn fh sn | bnum >= maxW -> do - loop (ECCFinalize bnum True fn fh sn) - - ECCWrite bnum fn fh sn -> do - atomically (readTBQueue sourceQ) >>= \case - Nothing -> loop (ECCFinalize bnum False fn fh sn) - Just s -> do - lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat - - sz_ <- newTVarIO 0 - - sn1 <- writeCompressedChunkZstd (write sz_ fh) sn (Just lbs) - - sz <- readTVarIO sz_ <&> fromIntegral - atomically $ modifyTVar bytes_ (+ fromIntegral sz) - - loop (ECCWrite (bnum + sz) fn fh sn1) - - ECCFinalize bnum again fn fh sn -> do - void $ writeCompressedChunkZstd (write bytes_ fh) sn Nothing - hClose fh - atomically $ writeTBQueue hbs2Q (Just (fn, bnum)) - notice $ "SEGMENT" <+> pretty bnum <+> pretty fn - when again $ loop ECCInit - atomically $ writeTBQueue hbs2Q Nothing - - where - write sz_ fh ss = do - LBS.hPutStr fh ss - atomically $ modifyTVar sz_ (+ LBS.length ss) - -contWorkerPool :: (MonadUnliftIO m) - => Int - -> ContT () m (a -> m b) - -> ContT () m (a -> m b) -contWorkerPool n w = fmap join <$> contWorkerPool' n w - --- | здесь: a -> m (m b) --- первое m - чтобы задать вопрос --- второе m - чтобы получить ответ -contWorkerPool' :: (MonadUnliftIO m) - => Int - -> ContT () m (a -> m b) - -> ContT () m (a -> m (m b)) -contWorkerPool' n contWorker = do - inQ <- newTQueueIO - -- запускаем воркеров - replicateM_ n do - (link <=< ContT . withAsync) do - runContT contWorker \w -> do - (fix . (>>)) do - (a, reply) <- atomically $ readTQueue inQ - reply =<< tryAny (w a) - -- возвращаем функцию, с помощью которой отправлять воркерам запрос - -- и получать ответ - pure \a -> do - tmv <- newEmptyTMVarIO - atomically $ writeTQueue inQ (a, atomically . STM.putTMVar tmv) - pure do - either throwIO pure =<< atomically (readTMVar tmv) + exportEntries "reflog:" limitedResourceWorkerRequestQ :: MonadUnliftIO m diff --git a/hbs2-git3/hbs2-git3.cabal b/hbs2-git3/hbs2-git3.cabal index 2c6b267a..3812bc93 100644 --- a/hbs2-git3/hbs2-git3.cabal +++ b/hbs2-git3/hbs2-git3.cabal @@ -123,6 +123,7 @@ library exposed-modules: HBS2.Git3.Types HBS2.Git3.Prelude + HBS2.Git3.Export HBS2.Git3.State.Types HBS2.Git3.State.Direct HBS2.Git3.State.Index diff --git a/hbs2-git3/lib/HBS2/Git3/Export.hs b/hbs2-git3/lib/HBS2/Git3/Export.hs new file mode 100644 index 00000000..becae3ae --- /dev/null +++ b/hbs2-git3/lib/HBS2/Git3/Export.hs @@ -0,0 +1,364 @@ +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} + +module HBS2.Git3.Export (exportEntries) where + +import HBS2.Git3.Prelude +import HBS2.Git3.State.Index +import HBS2.Git3.Git +import HBS2.Data.Detect + +import HBS2.Data.Log.Structured + +import HBS2.CLI.Run.Internal.Merkle (createTreeWithMetadata) +import HBS2.CLI.Run.RefLog (mkRefLogUpdateFrom) + +import HBS2.System.Dir + +import HBS2.Git3.Config.Local + +import Data.Config.Suckless.Script + +import Codec.Compression.Zstd.Streaming qualified as ZstdS +import Codec.Compression.Zstd.Streaming (Result(..)) +import Data.ByteString.Builder as Builder +import Data.ByteString.Lazy.Char8 qualified as LBS8 +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Data.Fixed +import Data.HashPSQ qualified as HPSQ +import Data.HashPSQ (HashPSQ) +import Data.HashSet (HashSet) +import Data.HashSet qualified as HS +import Data.List (sortBy) +import Data.List.Split (chunksOf) +import Data.Ord (comparing) +import Lens.Micro.Platform +import Streaming.Prelude qualified as S +import System.IO (hPrint) +import System.IO qualified as IO +import System.IO.Temp as Temp +import UnliftIO.Concurrent + +data ExportException = + ExportWriteTimeout + | ExportRefLogNotSet + deriving stock (Show,Typeable) + +instance Exception ExportException + +data ECC = + ECCInit + | ECCWrite Int FilePath Handle Result + | ECCFinalize Int Bool FilePath Handle Result + +exportEntries :: forall m . (HBS2GitPerks m) => Id -> MakeDictM C (Git3 m) () +exportEntries prefix = do + entry $ bindMatch (prefix <> "export") $ nil_ $ \syn -> lift $ connectedDo do + let (opts, argz) = splitOpts [("--dry",0),("--ref",1)] syn + + let dry = or [ True | ListVal [StringLike "--dry"] <- opts ] + + let hd = headDef "HEAD" [ x | StringLike x <- argz] + h <- gitRevParseThrow hd + + let refs = [ gitNormaliseRef (fromString x) + | ListVal [StringLike "--ref", StringLike x] <- opts + ] + + tn <- getNumCapabilities + + updateReflogIndex + + idx <- openIndex + + _already <- newTVarIO ( mempty :: HashSet GitHash ) + _exported <- newTVarIO 0 + + enumEntries idx $ \bs -> do + atomically $ modifyTVar _already (HS.insert (coerce $ BS.take 20 bs)) + + level <- getCompressionLevel + segment <- getPackedSegmetSize + env <- ask + sto <- getStorage + + let + notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool + notWrittenYet x = do + already <- readTVarIO _already <&> HS.member x + pure (not already) -- && not alsoInIdx) + + hpsq <- readCommitChainHPSQ notWrittenYet Nothing h (\c -> debug $ "commit" <+> pretty c) + + txCheckQ <- newTVarIO ( mempty :: HashSet HashRef ) + + let r = HPSQ.toList hpsq + & sortBy (comparing (view _2)) + & fmap (view _1) + + let total = HPSQ.size hpsq + bytes_ <- newTVarIO 0 + + debug $ "TOTAL" <+> pretty total + + liftIO $ flip runContT pure do + + sourceQ <- newTBQueueIO (fromIntegral tn * 1024) + hbs2Q <- newTBQueueIO @_ @(Maybe (FilePath, Int)) 100 + + hbs2 <- liftIO $ async $ void $ withGit3Env env do + sto <- getStorage + reflogAPI <- getClientAPI @RefLogAPI @UNIX + + reflog <- getGitRemoteKey + >>= orThrowUser "reflog not set" + + lift $ fix \next -> atomically (readTBQueue hbs2Q) >>= \case + Nothing -> none + Just (fn,_) -> void $ flip runContT pure do + ContT $ bracket none (const $ rm fn) + lift do + now <- getTimeCoarse + ts <- liftIO getPOSIXTime <&> round + lbs <- LBS.readFile fn + let meta = mempty + let gk = Nothing + + exported <- readTVarIO _exported + debug $ red "EXPORTED" <+> pretty exported + + when (not dry && exported > 0) do + href <- createTreeWithMetadata sto gk meta lbs >>= orThrowPassIO + writeLogEntry ("tree" <+> pretty ts <+> pretty href) + debug $ "SENDING" <+> pretty href <+> pretty fn + + let payload = pure $ LBS.toStrict $ serialise (AnnotatedHashRef Nothing href) + tx <- mkRefLogUpdateFrom (coerce reflog) payload + + let txh = hashObject @HbSync (serialise tx) & HashRef + + atomically (modifyTVar txCheckQ (HS.insert txh)) + + callRpcWaitMay @RpcRefLogPost (TimeoutSec 2) reflogAPI tx + >>= orThrowUser "rpc timeout" + + rm fn + next + + link hbs2 + + l <- lift (async (segmentWriter env bytes_ sourceQ hbs2Q) >>= \x -> link x >> pure x) + + let chunkSize = if total > tn*2 then total `div` tn else total + let commitz = chunksOf chunkSize r + + progress_ <- newTVarIO 0 + + gitCatBatchQ <- contWorkerPool tn do + che <- ContT withGitCat + pure $ gitReadObjectMaybe che + + -- void $ ContT $ bracket (pure pool) cancel + + let lastCommit = last r + + workers <- lift $ forM (zip [0..] commitz) $ \(i,chunk) -> async $ flip runContT pure do + + -- let gitCatBatchQ commit = gitReadObjectMaybe theReader commit + + for_ chunk \commit -> do + + atomically $ modifyTVar progress_ succ + + (_,self) <- lift (gitCatBatchQ commit) + >>= orThrow (GitReadError (show $ pretty commit)) + + tree <- gitReadCommitTree self + + hashes <- gitReadTreeObjectsOnly commit + <&> ([commit,tree]<>) + >>= filterM notWrittenYet + + for_ hashes $ \gh -> do + atomically do + modifyTVar _already (HS.insert gh) + modifyTVar _exported succ + + -- debug $ "object" <+> pretty gh + (_t,lbs) <- lift (gitCatBatchQ gh) + >>= orThrow (GitReadError (show $ pretty gh)) + + let e = [ Builder.byteString (coerce gh) + , Builder.char8 (headDef 'B' $ show $ pretty $ Short _t) + , Builder.lazyByteString lbs + ] & Builder.toLazyByteString . mconcat + + atomically do + writeTBQueue sourceQ (Just e) + + when (commit == lastCommit) do + + ts <- liftIO $ getPOSIXTime <&> round + + let brefs = [ LBS8.pack (show $ pretty ts <+> pretty commit <+> pretty x) + | x <- refs + ] & LBS8.unlines + + let sha1 = gitHashBlobPure brefs + + debug $ green "THIS IS THE LAST COMMIT BLOCK" <+> pretty commit <+> "ADDING REF INFO" <+> pretty sha1 + + let e = [ Builder.byteString (coerce sha1) + , Builder.char8 'R' + , Builder.lazyByteString brefs + ] & Builder.toLazyByteString . mconcat + + atomically do + writeTBQueue sourceQ (Just e) + + t0 <- getTimeCoarse + ContT $ withAsync $ do + + liftIO $ hPrint stderr $ + "segment" <+> pretty segment <> comma + <> "compression level" <+> pretty level + + flip fix (t0,0) $ \next (tPrev,bytesPrev) -> do + + pause @'Seconds 1 + + p <- readTVarIO progress_ + b <- readTVarIO bytes_ + + let pp = fromIntegral p / (fromIntegral total :: Double) * 100 + & realToFrac @_ @(Fixed E2) + + t1 <- getTimeCoarse + + let dt = realToFrac @_ @Double (t1 - tPrev) * 1e-9 + & realToFrac @_ @(Fixed E2) + + let tspent = realToFrac (t1 - t0) * 1e-9 & realToFrac @_ @(Fixed E2) + + let mbytes = realToFrac b / 1024/1024 & realToFrac @_ @(Fixed E2) + + let dbdt = mbytes / tspent + + liftIO $ IO.hPutStr stderr $ show $ + " \r" + <+> pretty tspent <> "s" + <+> pretty mbytes <> "mb" + <+> pretty dbdt <> "mbs" + <+> pretty pp <> "%" + + next (t1,b) + + mapM_ link workers + mapM_ wait workers + + atomically do + writeTBQueue sourceQ Nothing + + mapM_ wait [hbs2,l] + + txh <- liftIO $ withGit3Env env (postCheckPoint 30.0 =<< readTVarIO txCheckQ) + + notice $ "checkpoint" <+> pretty txh + + where + + writeLogEntry e = do + path <- getConfigPath <&> ( "log") + touch path + liftIO (IO.appendFile path (show $ e <> line)) + + segmentWriter env bytes_ sourceQ hbs2Q = flip runReaderT env do + maxW <- getPackedSegmetSize + level <- getCompressionLevel + lift $ flip fix ECCInit $ \loop -> \case + ECCInit -> do + zstd <- ZstdS.compress level + fn <- emptySystemTempFile "hbs2-git-export" + logFile <- IO.openBinaryFile fn WriteMode + debug $ red "NEW FILE" <+> pretty fn + loop $ ECCWrite 0 fn logFile zstd + + ECCWrite bnum fn fh sn | bnum >= maxW -> do + loop (ECCFinalize bnum True fn fh sn) + + ECCWrite bnum fn fh sn -> do + atomically (readTBQueue sourceQ) >>= \case + Nothing -> loop (ECCFinalize bnum False fn fh sn) + Just s -> do + lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat + + sz_ <- newTVarIO 0 + + sn1 <- writeCompressedChunkZstd (write sz_ fh) sn (Just lbs) + + sz <- readTVarIO sz_ <&> fromIntegral + atomically $ modifyTVar bytes_ (+ fromIntegral sz) + + loop (ECCWrite (bnum + sz) fn fh sn1) + + ECCFinalize bnum again fn fh sn -> do + void $ writeCompressedChunkZstd (write bytes_ fh) sn Nothing + hClose fh + atomically $ writeTBQueue hbs2Q (Just (fn, bnum)) + notice $ "SEGMENT" <+> pretty bnum <+> pretty fn + when again $ loop ECCInit + atomically $ writeTBQueue hbs2Q Nothing + + where + write sz_ fh ss = do + LBS.hPutStr fh ss + atomically $ modifyTVar sz_ (+ LBS.length ss) + + -- checks if all transactions written to reflog + -- post tx with current reflog value + postCheckPoint :: forall m1 . ( MonadUnliftIO m1 + , HasStorage m1 + , HasClientAPI RefLogAPI UNIX m1 + , HasGitRemoteKey m1 + ) + => Timeout 'Seconds + -> HashSet HashRef + -> m1 HashRef + + postCheckPoint t txq = perform >>= either (const $ throwIO ExportWriteTimeout) pure + where + perform = race (pause t) do + notice "wait reflog write to complete" + sto <- getStorage + api <- getClientAPI @RefLogAPI @UNIX + reflog <- getGitRemoteKey >>= orThrow ExportRefLogNotSet + + cp <- flip fix txq $ \next q -> do + + let wnext w = pause @'Seconds 0.85 >> next w + + rv <- runMaybeT do + lift (callRpcWaitRetry @RpcRefLogGet (TimeoutSec 1) 2 api reflog) + >>= toMPlus + >>= toMPlus + + maybe1 rv (wnext q) $ \x -> do + rset <- HS.fromList <$> readLogThrow (getBlock sto) x + + let diff = txq `HS.difference` rset + + if not (HS.null diff) then do + debug "again" + wnext diff + else + pure x + + let payload = pure $ LBS.toStrict $ serialise (AnnotatedHashRef Nothing cp) + tx <- mkRefLogUpdateFrom (coerce reflog) payload + + callRpcWaitMay @RpcRefLogPost (TimeoutSec 2) api tx + >>= orThrow ExportWriteTimeout + + pure $ HashRef (hashObject @HbSync (serialise tx)) + diff --git a/hbs2-git3/lib/HBS2/Git3/Git.hs b/hbs2-git3/lib/HBS2/Git3/Git.hs index 8c2f690e..9ae716f6 100644 --- a/hbs2-git3/lib/HBS2/Git3/Git.hs +++ b/hbs2-git3/lib/HBS2/Git3/Git.hs @@ -4,7 +4,7 @@ module HBS2.Git3.Git , module HBS2.Git.Local.CLI ) where -import HBS2.Prelude.Plated +import HBS2.Git3.Prelude import HBS2.OrDie import HBS2.Git3.Types @@ -13,17 +13,21 @@ import HBS2.Git.Local.CLI import Data.Config.Suckless.Script +import Control.Monad.Trans.Maybe import Crypto.Hash (hashlazy) import Crypto.Hash qualified as Crypton -import Control.Monad.Trans.Maybe import Data.ByteArray qualified as BA import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Data.ByteString.Lazy ( ByteString ) -import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.Either +import Data.HashSet qualified as HS +import Data.HashSet (HashSet) import Data.HashMap.Strict qualified as HM +import Data.HashPSQ (HashPSQ) +import Data.HashPSQ qualified as HPSQ import Data.List (sortOn) import Data.Maybe import Data.Word @@ -264,3 +268,59 @@ gitHashBlobPure body = do let preamble = [qc|{pretty Blob} {pretty $ LBS.length body}|] <> "\x00" :: LBS8.ByteString GitHash $ BS.pack $ BA.unpack $ hashlazy @Crypton.SHA1 (preamble <> body) + + +data HCC = + HCC { hccHeight :: Int + , hccRest :: [GitHash] + , hccResult :: HashPSQ GitHash Int (HashSet GitHash) + } + +readCommitChainHPSQ :: ( HBS2GitPerks m + , MonadUnliftIO m + , MonadReader Git3Env m + , HasStorage m + ) + => (GitHash -> m Bool) + -> Maybe GitRef + -> GitHash + -> (GitHash -> m ()) + -> m (HashPSQ GitHash Int (HashSet GitHash)) + +readCommitChainHPSQ filt _ h0 action = flip runContT pure $ callCC \_ -> do + theReader <- ContT $ withGitCat + void $ ContT $ bracket (pure theReader) dontHandle -- stopProcess + flip fix (HCC 0 [h0] HPSQ.empty) $ \next -> \case + + HCC _ [] result -> pure result + + HCC n ( h : hs ) result | HPSQ.member h result -> do + next ( HCC n hs result ) + + HCC n ( h : hs ) result -> do + + done <- not <$> lift (filt h) + + if done then next ( HCC n hs result ) else do + + co <- gitReadObjectMaybe theReader h + >>= orThrow(GitReadError $ show $ pretty "object not found" <+> pretty h) + + parents <- gitReadCommitParents (Just h) (snd co) + + lift $ action h + next $ HCC (n-1) ( parents <> hs ) (snd $ HPSQ.alter (addParents () n parents) h result ) + + + where + addParents :: a + -> Int + -> [GitHash] + -> Maybe (Int, HashSet GitHash) + -> (a, Maybe (Int, HashSet GitHash)) + + addParents a n p = \case + Nothing -> (a, Just (n, HS.fromList p)) + Just (l,s) -> (a, Just (min l n, s <> HS.fromList p)) + + diff --git a/hbs2-git3/lib/HBS2/Git3/Types.hs b/hbs2-git3/lib/HBS2/Git3/Types.hs index fee71c67..e2c28b49 100644 --- a/hbs2-git3/lib/HBS2/Git3/Types.hs +++ b/hbs2-git3/lib/HBS2/Git3/Types.hs @@ -6,6 +6,10 @@ module HBS2.Git3.Types import HBS2.Prelude.Plated import HBS2.Net.Auth.Credentials import HBS2.Git.Local as Exported +import UnliftIO +import Control.Monad.Trans.Cont +import Control.Concurrent.STM qualified as STM + type GitRemoteKey = PubKey 'Sign 'HBS2Basic @@ -39,3 +43,34 @@ data SegmentObjectType = | RefObject +contWorkerPool :: (MonadUnliftIO m) + => Int + -> ContT () m (a -> m b) + -> ContT () m (a -> m b) +contWorkerPool n w = fmap join <$> contWorkerPool' n w + +-- | здесь: a -> m (m b) +-- первое m - чтобы задать вопрос +-- второе m - чтобы получить ответ +contWorkerPool' :: (MonadUnliftIO m) + => Int + -> ContT () m (a -> m b) + -> ContT () m (a -> m (m b)) +contWorkerPool' n contWorker = do + inQ <- newTQueueIO + -- запускаем воркеров + replicateM_ n do + (link <=< ContT . withAsync) do + runContT contWorker \w -> do + (fix . (>>)) do + (a, reply) <- atomically $ readTQueue inQ + reply =<< tryAny (w a) + -- возвращаем функцию, с помощью которой отправлять воркерам запрос + -- и получать ответ + pure \a -> do + tmv <- newEmptyTMVarIO + atomically $ writeTQueue inQ (a, atomically . STM.putTMVar tmv) + pure do + either throwIO pure =<< atomically (readTMVar tmv) + + diff --git a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script.hs b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script.hs index cbf1d511..577aac37 100644 --- a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script.hs +++ b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Script.hs @@ -15,6 +15,7 @@ import Prettyprinter import Prettyprinter.Render.Terminal import Data.List qualified as List import Data.Text qualified as Text +import Data.String import UnliftIO @@ -48,3 +49,26 @@ helpEntry what = do pattern HelpEntryBound :: forall {c}. Id -> [Syntax c] pattern HelpEntryBound what <- [ListVal (SymbolVal "builtin:lambda" : SymbolVal what : _ )] + +-- FIXME: move-to-suckless-script +splitOpts :: [(Id,Int)] + -> [Syntax C] + -> ([Syntax C], [Syntax C]) + +splitOpts def opts' = flip fix (mempty, opts) $ \go -> \case + (acc, []) -> acc + ( (o,a), r@(StringLike x) : rs ) -> do + case HM.lookup (fromString x) omap of + Nothing -> go ((o, a <> [r]), rs) + Just n -> do + let (w, rest) = List.splitAt n rs + let result = mkList @C ( r : w ) + go ( (o <> [result], a), rest ) + ( (o,a), r : rs ) -> do + go ((o, a <> [r]), rs) + + where + omap = HM.fromList [ (p, x) | (p,x) <- def ] + opts = opts' + + diff --git a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Syntax.hs b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Syntax.hs index 9067aefe..1bf9c763 100644 --- a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Syntax.hs +++ b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/Syntax.hs @@ -139,7 +139,7 @@ class IsLiteral a where newtype Id = Id Text - deriving newtype (IsString,Pretty) + deriving newtype (IsString,Pretty,Semigroup,Monoid) deriving stock (Data,Generic,Show,Eq,Ord) type ForOpaque a = (Typeable a, Eq a)