mirror of https://github.com/voidlizard/hbs2
git-scan-for-missed-blocks-first
This commit is contained in:
parent
246518cd34
commit
7dde27345f
|
@ -7,6 +7,7 @@ import HBS2.OrDie
|
||||||
import HBS2.System.Logger.Simple
|
import HBS2.System.Logger.Simple
|
||||||
import HBS2.Merkle
|
import HBS2.Merkle
|
||||||
import HBS2.Hash
|
import HBS2.Hash
|
||||||
|
import HBS2.Storage
|
||||||
import HBS2.Storage.Operations.Class
|
import HBS2.Storage.Operations.Class
|
||||||
import HBS2.Storage.Operations.ByteString(TreeKey(..))
|
import HBS2.Storage.Operations.ByteString(TreeKey(..))
|
||||||
import HBS2.Net.Auth.GroupKeySymm
|
import HBS2.Net.Auth.GroupKeySymm
|
||||||
|
@ -45,6 +46,7 @@ import Data.HashMap.Strict qualified as HashMap
|
||||||
import Data.Text qualified as Text
|
import Data.Text qualified as Text
|
||||||
import Data.Either
|
import Data.Either
|
||||||
|
|
||||||
|
import Streaming.Prelude qualified as S
|
||||||
import Streaming.ByteString qualified as SB
|
import Streaming.ByteString qualified as SB
|
||||||
import Streaming.Zip qualified as SZip
|
import Streaming.Zip qualified as SZip
|
||||||
|
|
||||||
|
@ -61,12 +63,40 @@ makeLenses 'RunImportOpts
|
||||||
isRunImportDry :: RunImportOpts -> Bool
|
isRunImportDry :: RunImportOpts -> Bool
|
||||||
isRunImportDry o = view runImportDry o == Just True
|
isRunImportDry o = view runImportDry o == Just True
|
||||||
|
|
||||||
|
|
||||||
|
findMissedBlocks :: (MonadIO m, HasStorage m) => HashRef -> m [HashRef]
|
||||||
|
findMissedBlocks href = do
|
||||||
|
|
||||||
|
sto <- getStorage
|
||||||
|
|
||||||
|
S.toList_ $
|
||||||
|
|
||||||
|
walkMerkle (fromHashRef href) (lift . getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do
|
||||||
|
case hr of
|
||||||
|
Left hx -> S.yield (HashRef hx)
|
||||||
|
Right (hrr :: [HashRef]) -> do
|
||||||
|
forM_ hrr $ \hx -> runMaybeT do
|
||||||
|
blk <- lift $ getBlock sto (fromHashRef hx)
|
||||||
|
|
||||||
|
unless (isJust blk) do
|
||||||
|
lift $ S.yield hx
|
||||||
|
|
||||||
|
maybe1 blk none $ \bs -> do
|
||||||
|
let w = tryDetect (fromHashRef hx) bs
|
||||||
|
r <- case w of
|
||||||
|
Merkle{} -> lift $ lift $ findMissedBlocks hx
|
||||||
|
MerkleAnn{} -> lift $ lift $ findMissedBlocks hx
|
||||||
|
_ -> pure mempty
|
||||||
|
|
||||||
|
lift $ mapM_ S.yield r
|
||||||
|
|
||||||
walkHashes :: (MonadIO m, HasStorage m) => TQueue HashRef -> Hash HbSync -> m ()
|
walkHashes :: (MonadIO m, HasStorage m) => TQueue HashRef -> Hash HbSync -> m ()
|
||||||
walkHashes q h = walkMerkle h (readBlock . HashRef) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do
|
walkHashes q h = walkMerkle h (readBlock . HashRef) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do
|
||||||
case hr of
|
case hr of
|
||||||
Left hx -> die $ show $ pretty "missed block:" <+> pretty hx
|
Left hx -> die $ show $ pretty "missed block:" <+> pretty hx
|
||||||
Right (hrr :: [HashRef]) -> do
|
Right (hrr :: [HashRef]) -> do
|
||||||
forM_ hrr $ liftIO . atomically . Q.writeTQueue q
|
forM_ hrr $ \hx -> do
|
||||||
|
liftIO $ atomically $ Q.writeTQueue q hx
|
||||||
|
|
||||||
blockSource :: (MonadIO m, HasStorage m) => HashRef -> SB.ByteStream m Integer
|
blockSource :: (MonadIO m, HasStorage m) => HashRef -> SB.ByteStream m Integer
|
||||||
blockSource h = do
|
blockSource h = do
|
||||||
|
@ -157,7 +187,19 @@ importRefLogNew opts ref = runResourceT do
|
||||||
lift $ walkHashes logQ (fromHashRef logRoot)
|
lift $ walkHashes logQ (fromHashRef logRoot)
|
||||||
|
|
||||||
let notSkip n = force || not (Set.member n trans)
|
let notSkip n = force || not (Set.member n trans)
|
||||||
entries <- liftIO $ atomically $ flushTQueue logQ <&> filter notSkip
|
|
||||||
|
entries' <- liftIO $ atomically $ flushTQueue logQ <&> filter notSkip
|
||||||
|
|
||||||
|
pMiss <- newProgressMonitor [qc|scan for missed blocks|] (length entries')
|
||||||
|
|
||||||
|
-- TODO: might-be-slow
|
||||||
|
entries <- S.toList_ $ forM_ entries' $ \e -> do
|
||||||
|
updateProgress pMiss 1
|
||||||
|
missed <- lift $ findMissedBlocks e
|
||||||
|
if null missed then do
|
||||||
|
S.yield e
|
||||||
|
else do
|
||||||
|
debug $ "missed blocks in tree" <+> pretty e
|
||||||
|
|
||||||
pCommit <- liftIO $ startGitHashObject Commit
|
pCommit <- liftIO $ startGitHashObject Commit
|
||||||
pTree <- liftIO $ startGitHashObject Tree
|
pTree <- liftIO $ startGitHashObject Tree
|
||||||
|
@ -185,7 +227,7 @@ importRefLogNew opts ref = runResourceT do
|
||||||
|
|
||||||
-- guard =<< withDB db (not <$> stateGetProcessed kDone)
|
-- guard =<< withDB db (not <$> stateGetProcessed kDone)
|
||||||
|
|
||||||
rd <- toMPlus =<< parseRef e
|
rd <- toMPlus =<< parseTx e
|
||||||
let (SequentialRef _ (AnnotatedHashRef ann' h)) = rd
|
let (SequentialRef _ (AnnotatedHashRef ann' h)) = rd
|
||||||
forM_ ann' (withDB db . importKeysAnnotations ref e)
|
forM_ ann' (withDB db . importKeysAnnotations ref e)
|
||||||
|
|
||||||
|
@ -206,7 +248,7 @@ importRefLogNew opts ref = runResourceT do
|
||||||
|
|
||||||
runMaybeT $ do
|
runMaybeT $ do
|
||||||
|
|
||||||
refData <- toMPlus =<< parseRef e
|
refData <- toMPlus =<< parseTx e
|
||||||
-- NOTE: good-place-to-process-hash-log-update-first
|
-- NOTE: good-place-to-process-hash-log-update-first
|
||||||
let (SequentialRef _ (AnnotatedHashRef ann' h)) = refData
|
let (SequentialRef _ (AnnotatedHashRef ann' h)) = refData
|
||||||
|
|
||||||
|
@ -368,7 +410,7 @@ importRefLogNew opts ref = runResourceT do
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
parseRef e = runMaybeT do
|
parseTx e = runMaybeT do
|
||||||
bs <- MaybeT $ readBlock e
|
bs <- MaybeT $ readBlock e
|
||||||
refupd <- toMPlus $ deserialiseOrFail @(RefLogUpdate HBS2L4Proto) bs
|
refupd <- toMPlus $ deserialiseOrFail @(RefLogUpdate HBS2L4Proto) bs
|
||||||
toMPlus $ deserialiseOrFail (LBS.fromStrict $ view refLogUpdData refupd)
|
toMPlus $ deserialiseOrFail (LBS.fromStrict $ view refLogUpdData refupd)
|
||||||
|
|
Loading…
Reference in New Issue