module HBS2.Git.Client.Import where import HBS2.Git.Client.Prelude hiding (info) import HBS2.Git.Client.App.Types import HBS2.Git.Client.Config import HBS2.Git.Client.State import HBS2.Git.Client.RefLog import HBS2.Git.Client.Progress import HBS2.Git.Data.RefLog import HBS2.Git.Data.Tx import Data.ByteString.Lazy qualified as LBS import Text.InterpolatedString.Perl6 (qc) import Streaming.Prelude qualified as S import System.IO (hPrint) import System.Environment import System.Exit data ImportRefLogNotFound = ImportRefLogNotFound deriving stock (Typeable,Show) instance Exception ImportRefLogNotFound data ImportTxError = ImportTxReadError HashRef | ImportOpError OperationError | ImportUnbundleError HashRef | ImportMissed HashRef deriving stock (Typeable) instance Show ImportTxError where show (ImportTxReadError h) = [qc|ImportTxError {pretty h}|] show (ImportOpError o) = show o show (ImportUnbundleError h) = [qc|ImportUnbundleError {pretty h}|] show (ImportMissed h) = [qc|ImportMissed {pretty h}|] instance Exception ImportTxError data IState = IWaitRefLog Int | IScanRefLog HashRef | IApplyTx HashRef | IExit importRepoWait :: (GitPerks m, MonadReader GitEnv m) => RefLogId -> m () importRepoWait puk = do env <- ask subscribeRefLog puk ip <- asks _progress flip fix (IWaitRefLog 20) $ \next -> \case IWaitRefLog w | w <= 0 -> do throwIO ImportRefLogNotFound IWaitRefLog w -> do onProgress ip (ImportRefLogStart puk) try @_ @SomeException (getRefLogMerkle puk) >>= \case Left _ -> do onProgress ip (ImportRefLogDone puk Nothing) pause @'Seconds 2 next (IWaitRefLog (pred w)) Right Nothing -> do onProgress ip (ImportRefLogDone puk Nothing) pause @'Seconds 2 next (IWaitRefLog (pred w)) Right (Just h) -> do onProgress ip (ImportRefLogDone puk (Just h)) next (IScanRefLog h) IScanRefLog h -> do scanRefLog puk h withState (selectMaxSeqTxNotDone puk) >>= \case Just tx -> next (IApplyTx tx) Nothing -> do hasAnyTx <- withState existsAnyTxDone if hasAnyTx then -- existing repo, is' a fetch next IExit else do void $ race (pause @'Seconds 10) do forever do onProgress ip (ImportWaitTx h) pause @'Seconds 0.25 next (IScanRefLog h) IApplyTx h -> do onProgress ip (ImportApplyTx h) r <- runExceptT (applyTx h) case r of Left MissedBlockError -> do next =<< repeatOrExit Left IncompleteData -> do next =<< repeatOrExit Left e -> do err (line <> red (viaShow e)) throwIO (userError "tx apply / state read error") Right{} -> next IExit IExit -> do onProgress ip (ImportSetQuiet True) onProgress ip ImportAllDone where repeatOrExit = do hasAnyTx <- withState existsAnyTxDone if hasAnyTx then do pure IExit else do pause @'Seconds 2 pure (IWaitRefLog 5) scanRefLog :: (GitPerks m, MonadReader GitEnv m) => RefLogId -> HashRef -> m () scanRefLog puk rv = do sto <- asks _storage ip <- asks _progress env <- ask txs <- S.toList_ $ do walkMerkle @[HashRef] (fromHashRef rv) (getBlock sto) $ \case Left he -> do err $ red "missed block" <+> pretty he Right hxs -> do for_ hxs $ \htx -> do here <- lift (withState (existsTx htx)) unless here (S.yield htx) tx <- liftIO $ S.toList_ $ do for_ txs $ \tx -> do onProgress ip (ImportScanTx tx) runExceptT (readTx sto tx <&> (tx,)) >>= either (const none) S.yield withState $ transactional do for_ tx $ \(th,(n,rhh,rh,bundleh)) -> do -- notice $ red "TX" <+> pretty th <+> pretty n insertTx puk th n rhh bundleh applyTx :: (GitPerks m, MonadReader GitEnv m, MonadError OperationError m) => HashRef -> m () applyTx h = do sto <- asks _storage (n,rhh,r,bunh) <- readTx sto h bundles <- readBundleRefs sto bunh >>= orThrowError IncompleteData trace $ red "applyTx" <+> pretty h <+> pretty h <+> pretty bundles withState $ transactional do applyBundles r bundles app <- lift $ asks (view gitApplyHeads) when app do lift $ applyHeads r insertTxDone h where applyHeads rh = do let refs = _repoHeadRefs rh withGitFastImport $ \ps -> do let psin = getStdin ps for_ refs $ \(r,v) -> do unless (r == GitRef "HEAD") do liftIO $ hPrint psin $ "reset" <+> pretty r <> line <> "from" <+> pretty v <> line hClose psin code <- waitExitCode ps trace $ red "git fast-import status" <+> viaShow code pure () applyBundles r bundles = do env <- lift ask sto <- lift $ asks _storage ip <- lift $ asks _progress -- withState $ do for_ (zip [0..] bundles) $ \(n,bu) -> do insertTxBundle h n bu here <- existsBundleDone bu unless here do BundleWithMeta meta bytes <- lift (runExceptT $ readBundle sto r bu) >>= orThrow (ImportUnbundleError bu) (_,_,idx,lbs) <- unpackPackMay bytes & orThrow (ImportUnbundleError bu) trace $ red "reading bundle" <+> pretty bu -- <+> pretty (LBS.length lbs) for_ idx $ \i -> do insertBundleObject bu i let chunks = LBS.toChunks lbs void $ liftIO $ withGitEnv env $ withGitUnpack $ \p -> do let pstdin = getStdin p for_ (zip [1..] chunks) $ \(i,chu) -> do onProgress ip (ImportReadBundleChunk meta (Progress i Nothing)) liftIO $ LBS.hPutStr pstdin (LBS.fromStrict chu) hFlush pstdin >> hClose pstdin code <- waitExitCode p trace $ "unpack objects done:" <+> viaShow code insertBundleDone bu withGitFastImport :: (MonadUnliftIO m, MonadReader GitEnv m) => (Process Handle Handle () -> m a) -> m () withGitFastImport action = do fp <- asks _gitPath let cmd = "git" let args = ["--git-dir", fp, "fast-import"] -- let config = setStdin createPipe $ setStdout createPipe $ setStderr closed $ proc cmd args trc <- asks traceEnabled >>= \case True -> pure id False -> pure $ setStdout closed . setStderr closed let pconfig = setStdin createPipe $ setStdout createPipe $ trc $ proc cmd args p <- startProcess pconfig void $ action p stopProcess p withGitUnpack :: (MonadUnliftIO m, MonadReader GitEnv m) => (Process Handle Handle () -> m a) -> m a withGitUnpack action = do fp <- asks _gitPath let cmd = "git" let args = ["--git-dir", fp, "unpack-objects", "-q"] trc <- asks traceEnabled >>= \case True -> pure id False -> pure $ setStdout closed . setStderr closed let pconfig = setStdin createPipe $ setStdout createPipe $ trc $ proc cmd args p <- startProcess pconfig action p gitPrune :: (MonadUnliftIO m, MonadReader GitEnv m) => m () gitPrune = do fp <- asks _gitPath let cmd = [qc|git --git-dir={fp} prune|] runProcess_ (shell cmd & setStderr closed & setStdout closed) pure ()