diff --git a/hbs2-git/lib/HBS2Git/Import.hs b/hbs2-git/lib/HBS2Git/Import.hs index 404673f1..8a201e20 100644 --- a/hbs2-git/lib/HBS2Git/Import.hs +++ b/hbs2-git/lib/HBS2Git/Import.hs @@ -7,6 +7,7 @@ import HBS2.OrDie import HBS2.System.Logger.Simple import HBS2.Merkle import HBS2.Hash +import HBS2.Storage import HBS2.Storage.Operations.Class import HBS2.Storage.Operations.ByteString(TreeKey(..)) import HBS2.Net.Auth.GroupKeySymm @@ -45,6 +46,7 @@ import Data.HashMap.Strict qualified as HashMap import Data.Text qualified as Text import Data.Either +import Streaming.Prelude qualified as S import Streaming.ByteString qualified as SB import Streaming.Zip qualified as SZip @@ -61,12 +63,40 @@ makeLenses 'RunImportOpts isRunImportDry :: RunImportOpts -> Bool isRunImportDry o = view runImportDry o == Just True + +findMissedBlocks :: (MonadIO m, HasStorage m) => HashRef -> m [HashRef] +findMissedBlocks href = do + + sto <- getStorage + + S.toList_ $ + + walkMerkle (fromHashRef href) (lift . getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do + case hr of + Left hx -> S.yield (HashRef hx) + Right (hrr :: [HashRef]) -> do + forM_ hrr $ \hx -> runMaybeT do + blk <- lift $ getBlock sto (fromHashRef hx) + + unless (isJust blk) do + lift $ S.yield hx + + maybe1 blk none $ \bs -> do + let w = tryDetect (fromHashRef hx) bs + r <- case w of + Merkle{} -> lift $ lift $ findMissedBlocks hx + MerkleAnn{} -> lift $ lift $ findMissedBlocks hx + _ -> pure mempty + + lift $ mapM_ S.yield r + walkHashes :: (MonadIO m, HasStorage m) => TQueue HashRef -> Hash HbSync -> m () walkHashes q h = walkMerkle h (readBlock . HashRef) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do case hr of Left hx -> die $ show $ pretty "missed block:" <+> pretty hx Right (hrr :: [HashRef]) -> do - forM_ hrr $ liftIO . atomically . Q.writeTQueue q + forM_ hrr $ \hx -> do + liftIO $ atomically $ Q.writeTQueue q hx blockSource :: (MonadIO m, HasStorage m) => HashRef -> SB.ByteStream m Integer blockSource h = do @@ -157,7 +187,19 @@ importRefLogNew opts ref = runResourceT do lift $ walkHashes logQ (fromHashRef logRoot) let notSkip n = force || not (Set.member n trans) - entries <- liftIO $ atomically $ flushTQueue logQ <&> filter notSkip + + entries' <- liftIO $ atomically $ flushTQueue logQ <&> filter notSkip + + pMiss <- newProgressMonitor [qc|scan for missed blocks|] (length entries') + + -- TODO: might-be-slow + entries <- S.toList_ $ forM_ entries' $ \e -> do + updateProgress pMiss 1 + missed <- lift $ findMissedBlocks e + if null missed then do + S.yield e + else do + debug $ "missed blocks in tree" <+> pretty e pCommit <- liftIO $ startGitHashObject Commit pTree <- liftIO $ startGitHashObject Tree @@ -185,7 +227,7 @@ importRefLogNew opts ref = runResourceT do -- guard =<< withDB db (not <$> stateGetProcessed kDone) - rd <- toMPlus =<< parseRef e + rd <- toMPlus =<< parseTx e let (SequentialRef _ (AnnotatedHashRef ann' h)) = rd forM_ ann' (withDB db . importKeysAnnotations ref e) @@ -206,7 +248,7 @@ importRefLogNew opts ref = runResourceT do runMaybeT $ do - refData <- toMPlus =<< parseRef e + refData <- toMPlus =<< parseTx e -- NOTE: good-place-to-process-hash-log-update-first let (SequentialRef _ (AnnotatedHashRef ann' h)) = refData @@ -368,7 +410,7 @@ importRefLogNew opts ref = runResourceT do where - parseRef e = runMaybeT do + parseTx e = runMaybeT do bs <- MaybeT $ readBlock e refupd <- toMPlus $ deserialiseOrFail @(RefLogUpdate HBS2L4Proto) bs toMPlus $ deserialiseOrFail (LBS.fromStrict $ view refLogUpdData refupd)