diff --git a/hbs2-git3/lib/HBS2/Git3/Import.hs b/hbs2-git3/lib/HBS2/Git3/Import.hs index 21ae145e..3f7c01f1 100644 --- a/hbs2-git3/lib/HBS2/Git3/Import.hs +++ b/hbs2-git3/lib/HBS2/Git3/Import.hs @@ -116,14 +116,7 @@ importGitRefLog :: forall m . ( HBS2GitPerks m importGitRefLog = do - fix \next -> do - updateReflogIndex `catch` \case - MissedBlockError -> do - pause @'Seconds 2.0 - warn "missed block on import" - next - - e -> throwIO e + updateReflogIndex packs <- gitDir >>= orThrowUser "git directory not found" diff --git a/hbs2-git3/lib/HBS2/Git3/State/Internal/Types.hs b/hbs2-git3/lib/HBS2/Git3/State/Internal/Types.hs index c3d1bcad..ec547437 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Internal/Types.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Internal/Types.hs @@ -16,6 +16,7 @@ import HBS2.CLI.Run.MetaData (getTreeContents) import Data.Config.Suckless +import HBS2.Storage.Operations.Missed import HBS2.Defaults as Exported import HBS2.OrDie as Exported import HBS2.Data.Types.Refs as Exported @@ -38,6 +39,7 @@ import HBS2.Storage as Exported import HBS2.Storage.Operations.Class as Exported import HBS2.System.Logger.Simple.ANSI as Exported +import Data.List qualified as L import Data.Text.Encoding qualified as TE import Data.Text.Encoding.Error qualified as TE import Data.ByteString.Lazy qualified as LBS @@ -375,6 +377,12 @@ data ReflogWaitTimeout = instance Exception ReflogWaitTimeout + +data CWRepo = + CWaitLWW + | CCheckManifest (LWWRef HBS2Basic) + | CAborted + waitRepo :: forall m . HBS2GitPerks m => Git3 m () waitRepo = do repoKey <- getGitRepoKey >>= orThrow GitRepoRefNotSet @@ -382,67 +390,84 @@ waitRepo = do lwwAPI <- getClientAPI @LWWRefAPI @UNIX peerAPI <- getClientAPI @PeerAPI @UNIX reflogAPI <- getClientAPI @RefLogAPI @UNIX + sto <- getStorage env <- ask callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (repoKey, "lwwref", 31) - >>= orThrowUser "rpc timeout while subscribing to reflog" + >>= orThrow RpcTimeout - callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey) - >>= orThrowUser "rpc timeout while subscribing to LWWRef" + refLog1_ <- newEmptyTMVarIO + refLog2_ <- newEmptyTMVarIO - let maxTimeout = ceiling 30e9 -- Максимальное время ожидания (30 секунд) - startTime <- getTimeCoarse + void $ flip runContT pure do - flip runContT pure do + void $ ContT $ withAsync $ forever do + void $ callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey) + pause @'Seconds 10 - let periodicFetch reflog = forever $ do - callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI reflog - >>= orThrowUser "rpc timeout while fetching reflog" - pause @'Seconds 10 -- Засыпаем на 10 секунд + p1 <- ContT $ withAsync $ do + r <- atomically $ takeTMVar refLog1_ + forever do + notice "FETCH REFLOG!" + void $ callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI r + pause @'Seconds 10 - let waitForReflog till reflog = do - now <- getTimeCoarse + p2 <- ContT $ withAsync $ do + r <- atomically $ takeTMVar refLog2_ + void $ fix \again -> do + notice "AGAIN!" + rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI r + >>= \case + Nothing -> pause @'Seconds 3 >> again + Just Nothing -> pause @'Seconds 1.24 >> again + Just (Just x) -> pure x - if now > till - then throwIO ReflogWaitTimeout - else do - mhead <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI (coerce reflog) - case mhead of - Just headVal -> do - debug $ "waitRepo: Reflog data arrived" <+> pretty headVal + missed <- findMissedBlocks sto rv - Nothing -> pause @'Seconds 1 >> waitForReflog till reflog + if L.null missed then do + pure rv + else do + notice "missed blocks in reflog" + pause @'Seconds 5 + again - let waitForLWWRef till = liftIO $ withGit3Env env do - now <- getTimeCoarse + liftIO $ withGit3Env env do - if now > till - then throwIO RpcTimeout - else do - rv <- getRepoRefMaybe - maybe1 rv (pause @'Seconds 1 >> waitForLWWRef till) $ \LWWRef{..} -> do - debug $ "waitRepo: LWWRef arrived" <+> pretty lwwValue + flip fix CWaitLWW $ \next -> \case + CWaitLWW -> do + notice $ "wait" <+> pretty (AsBase58 repoKey) + getRepoRefMaybe >>= \case + Nothing -> do + pause @'Seconds 1 + next CWaitLWW - -- Парсим манифест репозитория - repo <- getRepoManifest + Just v -> next $ CCheckManifest v - -- Достаём `reflog` - reflog <- [ x | ListVal [SymbolVal "reflog", SignPubKeyLike x] <- repo ] - & headMay - & orThrowUser "malformed repo manifest" + CCheckManifest LWWRef{} -> do + notice "check manifest" + r <- try @_ @HBS2GitExcepion getRepoManifest + case r of + Left GitRepoRefEmpty -> next CWaitLWW - -- Подписываемся на `reflog` - callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (reflog, "reflog", 17) - >>= orThrowUser "rpc timeout while subscribing to reflog" + Left e -> next CAborted - debug $ "waitRepo: Subscribed to reflog" <+> pretty (AsBase58 reflog) + Right mf -> do + let reflog = lastMay [ x | ListVal [SymbolVal "reflog", SignPubKeyLike x] <- mf ] + case reflog of + Nothing -> next CAborted + Just rf -> do - -- Запускаем асинхронную задачу для периодического вызова RpcRefLogFetch - withAsync (periodicFetch reflog) $ \_ -> do - -- Ждём появления значений в `reflog` - waitForReflog till reflog + callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (rf, "reflog", 17) + >>= orThrow RpcTimeout - liftIO $ withGit3Env env $ waitForLWWRef (startTime + fromNanoSecs maxTimeout) + atomically do + writeTMVar refLog1_ rf + writeTMVar refLog2_ rf + + CAborted -> err "waitRepo aborted" >> none + + + waitAnyCatchCancel [p1,p2]