mirror of https://github.com/voidlizard/hbs2
wip, better wait for blocks
This commit is contained in:
parent
b263bfbdc9
commit
3d78882f62
|
@ -110,7 +110,7 @@ writeAsGitPack dir href = do
|
|||
data ImportStage =
|
||||
ImportStart
|
||||
| ImportWIP Int (Maybe HashRef)
|
||||
| ImportWait ImportStage
|
||||
| ImportWait (Maybe Int) ImportStage
|
||||
| ImportDone (Maybe HashRef)
|
||||
|
||||
importGitRefLog :: forall m . ( HBS2GitPerks m
|
||||
|
@ -134,13 +134,26 @@ importGitRefLog = withStateDo $ ask >>= \case
|
|||
|
||||
sto <- getStorage
|
||||
|
||||
flip fix ImportStart $ \again -> \case
|
||||
ImportDone x -> pure x
|
||||
already_ <- newTVarIO (mempty :: HashSet HashRef)
|
||||
|
||||
ImportWait next -> do
|
||||
notice "wait some time..."
|
||||
pause @'Seconds 3
|
||||
again next
|
||||
flip fix ImportStart $ \again -> \case
|
||||
ImportDone x -> do
|
||||
for_ x updateImportedCheckpoint
|
||||
updateReflogIndex
|
||||
pure x
|
||||
|
||||
ImportWait d next -> do
|
||||
|
||||
down <- callRpcWaitMay @RpcDownloadList (TimeoutSec 1) peerAPI ()
|
||||
>>= orThrow RpcTimeout
|
||||
<&> fromIntegral . L.length
|
||||
|
||||
notice $ "wait some time..." <+> parens (pretty down)
|
||||
|
||||
case d of
|
||||
Nothing -> again (ImportWait (Just down) next)
|
||||
Just n | down < n -> pause @'Seconds 3 >> again next
|
||||
| otherwise -> pause @'Seconds 3 >> again next
|
||||
|
||||
ImportStart -> do
|
||||
|
||||
|
@ -156,60 +169,62 @@ importGitRefLog = withStateDo $ ask >>= \case
|
|||
|
||||
ImportWIP attempt prev -> do
|
||||
|
||||
updateReflogIndex
|
||||
r <- try @_ @OperationError $ do
|
||||
|
||||
excl <- maybe1 prev (pure mempty) $ \p -> do
|
||||
txListAll (Just p) <&> HS.fromList . fmap fst
|
||||
excl <- maybe1 prev (pure mempty) $ \p -> do
|
||||
txListAll (Just p) <&> HS.fromList . fmap fst
|
||||
|
||||
rv <- refLogRef
|
||||
rv <- refLogRef
|
||||
|
||||
hxs <- txList ( pure . not . flip HS.member excl ) rv
|
||||
hxs <- txList ( pure . not . flip HS.member excl ) rv
|
||||
|
||||
cp' <- flip fix (fmap snd hxs, Nothing) $ \next -> \case
|
||||
([], r) -> pure (gitTxTree <$> r)
|
||||
(TxSegment{}:xs, l) -> next (xs, l)
|
||||
(cp@(TxCheckpoint n tree) : xs, l) -> do
|
||||
cp' <- flip fix (fmap snd hxs, Nothing) $ \next -> \case
|
||||
([], r) -> pure (gitTxTree <$> r)
|
||||
(TxSegment{}:xs, l) -> next (xs, l)
|
||||
(cp@(TxCheckpoint n tree) : xs, l) -> do
|
||||
|
||||
-- full <- findMissedBlocks sto tree <&> L.null
|
||||
missed_ <- newTVarIO 0
|
||||
deepScan ScanDeep (\_ -> atomically $ modifyTVar missed_ succ)
|
||||
(coerce tree)
|
||||
(getBlock sto)
|
||||
(const none)
|
||||
-- full <- findMissedBlocks sto tree <&> L.null
|
||||
missed_ <- newTVarIO 0
|
||||
deepScan ScanDeep (\_ -> atomically $ modifyTVar missed_ succ)
|
||||
(coerce tree)
|
||||
(getBlock sto)
|
||||
(const none)
|
||||
|
||||
full <- readTVarIO missed_ <&> (==0)
|
||||
full <- readTVarIO missed_ <&> (==0)
|
||||
|
||||
if full && Just n > (getGitTxRank <$> l) then do
|
||||
next (xs, Just cp)
|
||||
else do
|
||||
next (xs, l)
|
||||
if full && Just n > (getGitTxRank <$> l) then do
|
||||
next (xs, Just cp)
|
||||
else do
|
||||
next (xs, l)
|
||||
|
||||
case cp' of
|
||||
Nothing -> again $ ImportDone Nothing
|
||||
Just cp -> do
|
||||
case cp' of
|
||||
Nothing -> pure Nothing
|
||||
Just cp -> do
|
||||
|
||||
notice $ "found checkpoint" <+> pretty cp
|
||||
txs <- txList ( pure . not . flip HS.member excl ) (Just cp)
|
||||
notice $ "found checkpoint" <+> pretty cp
|
||||
txs <- txList ( pure . not . flip HS.member excl ) (Just cp)
|
||||
|
||||
r <- liftIO $ try @_ @SomeException $ withGit3Env env do
|
||||
forConcurrently_ txs $ \case
|
||||
(_, TxCheckpoint{}) -> none
|
||||
(h, TxSegment tree) -> do
|
||||
s <- writeAsGitPack packs tree
|
||||
new <- readTVarIO already_ <&> not . HS.member tree
|
||||
|
||||
for_ s $ \file -> do
|
||||
gitRunCommand [qc|git index-pack {file}|]
|
||||
>>= orThrowPassIO
|
||||
when new do
|
||||
s <- writeAsGitPack packs tree
|
||||
|
||||
notice $ "imported" <+> pretty h
|
||||
for_ s $ \file -> do
|
||||
gitRunCommand [qc|git index-pack {file}|]
|
||||
>>= orThrowPassIO
|
||||
|
||||
updateImportedCheckpoint cp
|
||||
atomically $ modifyTVar already_ (HS.insert tree)
|
||||
notice $ "imported" <+> pretty h
|
||||
|
||||
pure (Just cp)
|
||||
|
||||
case r of
|
||||
Right cp -> again $ ImportDone cp
|
||||
Left (MissedBlockError2 _) -> again $ ImportWait Nothing (ImportWIP (succ attempt) prev)
|
||||
Left MissedBlockError -> again $ ImportWait Nothing (ImportWIP (succ attempt) prev)
|
||||
Left e -> throwIO e
|
||||
|
||||
case r of
|
||||
Right _ -> again $ ImportDone (Just cp)
|
||||
Left e -> do
|
||||
case (fromException e :: Maybe OperationError) of
|
||||
Just (MissedBlockError2 _) -> again $ ImportWait (ImportWIP (succ attempt) prev)
|
||||
Just MissedBlockError -> again $ ImportWait (ImportWIP (succ attempt) prev)
|
||||
_ -> throwIO e
|
||||
|
||||
|
|
Loading…
Reference in New Issue