mirror of https://github.com/voidlizard/hbs2
wip, debug
This commit is contained in:
parent
8303a347b6
commit
4fb13f6d17
|
@ -116,14 +116,7 @@ importGitRefLog :: forall m . ( HBS2GitPerks m
|
||||||
|
|
||||||
importGitRefLog = do
|
importGitRefLog = do
|
||||||
|
|
||||||
fix \next -> do
|
updateReflogIndex
|
||||||
updateReflogIndex `catch` \case
|
|
||||||
MissedBlockError -> do
|
|
||||||
pause @'Seconds 2.0
|
|
||||||
warn "missed block on import"
|
|
||||||
next
|
|
||||||
|
|
||||||
e -> throwIO e
|
|
||||||
|
|
||||||
packs <- gitDir
|
packs <- gitDir
|
||||||
>>= orThrowUser "git directory not found"
|
>>= orThrowUser "git directory not found"
|
||||||
|
|
|
@ -16,6 +16,7 @@ import HBS2.CLI.Run.MetaData (getTreeContents)
|
||||||
|
|
||||||
import Data.Config.Suckless
|
import Data.Config.Suckless
|
||||||
|
|
||||||
|
import HBS2.Storage.Operations.Missed
|
||||||
import HBS2.Defaults as Exported
|
import HBS2.Defaults as Exported
|
||||||
import HBS2.OrDie as Exported
|
import HBS2.OrDie as Exported
|
||||||
import HBS2.Data.Types.Refs as Exported
|
import HBS2.Data.Types.Refs as Exported
|
||||||
|
@ -38,6 +39,7 @@ import HBS2.Storage as Exported
|
||||||
import HBS2.Storage.Operations.Class as Exported
|
import HBS2.Storage.Operations.Class as Exported
|
||||||
import HBS2.System.Logger.Simple.ANSI as Exported
|
import HBS2.System.Logger.Simple.ANSI as Exported
|
||||||
|
|
||||||
|
import Data.List qualified as L
|
||||||
import Data.Text.Encoding qualified as TE
|
import Data.Text.Encoding qualified as TE
|
||||||
import Data.Text.Encoding.Error qualified as TE
|
import Data.Text.Encoding.Error qualified as TE
|
||||||
import Data.ByteString.Lazy qualified as LBS
|
import Data.ByteString.Lazy qualified as LBS
|
||||||
|
@ -375,6 +377,12 @@ data ReflogWaitTimeout =
|
||||||
|
|
||||||
instance Exception ReflogWaitTimeout
|
instance Exception ReflogWaitTimeout
|
||||||
|
|
||||||
|
|
||||||
|
data CWRepo =
|
||||||
|
CWaitLWW
|
||||||
|
| CCheckManifest (LWWRef HBS2Basic)
|
||||||
|
| CAborted
|
||||||
|
|
||||||
waitRepo :: forall m . HBS2GitPerks m => Git3 m ()
|
waitRepo :: forall m . HBS2GitPerks m => Git3 m ()
|
||||||
waitRepo = do
|
waitRepo = do
|
||||||
repoKey <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
repoKey <- getGitRepoKey >>= orThrow GitRepoRefNotSet
|
||||||
|
@ -382,67 +390,84 @@ waitRepo = do
|
||||||
lwwAPI <- getClientAPI @LWWRefAPI @UNIX
|
lwwAPI <- getClientAPI @LWWRefAPI @UNIX
|
||||||
peerAPI <- getClientAPI @PeerAPI @UNIX
|
peerAPI <- getClientAPI @PeerAPI @UNIX
|
||||||
reflogAPI <- getClientAPI @RefLogAPI @UNIX
|
reflogAPI <- getClientAPI @RefLogAPI @UNIX
|
||||||
|
sto <- getStorage
|
||||||
|
|
||||||
env <- ask
|
env <- ask
|
||||||
|
|
||||||
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (repoKey, "lwwref", 31)
|
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (repoKey, "lwwref", 31)
|
||||||
>>= orThrowUser "rpc timeout while subscribing to reflog"
|
>>= orThrow RpcTimeout
|
||||||
|
|
||||||
callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey)
|
refLog1_ <- newEmptyTMVarIO
|
||||||
>>= orThrowUser "rpc timeout while subscribing to LWWRef"
|
refLog2_ <- newEmptyTMVarIO
|
||||||
|
|
||||||
let maxTimeout = ceiling 30e9 -- Максимальное время ожидания (30 секунд)
|
void $ flip runContT pure do
|
||||||
startTime <- getTimeCoarse
|
|
||||||
|
|
||||||
flip runContT pure do
|
void $ ContT $ withAsync $ forever do
|
||||||
|
void $ callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey)
|
||||||
|
pause @'Seconds 10
|
||||||
|
|
||||||
let periodicFetch reflog = forever $ do
|
p1 <- ContT $ withAsync $ do
|
||||||
callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI reflog
|
r <- atomically $ takeTMVar refLog1_
|
||||||
>>= orThrowUser "rpc timeout while fetching reflog"
|
forever do
|
||||||
pause @'Seconds 10 -- Засыпаем на 10 секунд
|
notice "FETCH REFLOG!"
|
||||||
|
void $ callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI r
|
||||||
|
pause @'Seconds 10
|
||||||
|
|
||||||
let waitForReflog till reflog = do
|
p2 <- ContT $ withAsync $ do
|
||||||
now <- getTimeCoarse
|
r <- atomically $ takeTMVar refLog2_
|
||||||
|
void $ fix \again -> do
|
||||||
|
notice "AGAIN!"
|
||||||
|
rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI r
|
||||||
|
>>= \case
|
||||||
|
Nothing -> pause @'Seconds 3 >> again
|
||||||
|
Just Nothing -> pause @'Seconds 1.24 >> again
|
||||||
|
Just (Just x) -> pure x
|
||||||
|
|
||||||
if now > till
|
missed <- findMissedBlocks sto rv
|
||||||
then throwIO ReflogWaitTimeout
|
|
||||||
else do
|
|
||||||
mhead <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI (coerce reflog)
|
|
||||||
case mhead of
|
|
||||||
Just headVal -> do
|
|
||||||
debug $ "waitRepo: Reflog data arrived" <+> pretty headVal
|
|
||||||
|
|
||||||
Nothing -> pause @'Seconds 1 >> waitForReflog till reflog
|
if L.null missed then do
|
||||||
|
pure rv
|
||||||
|
else do
|
||||||
|
notice "missed blocks in reflog"
|
||||||
|
pause @'Seconds 5
|
||||||
|
again
|
||||||
|
|
||||||
let waitForLWWRef till = liftIO $ withGit3Env env do
|
liftIO $ withGit3Env env do
|
||||||
now <- getTimeCoarse
|
|
||||||
|
|
||||||
if now > till
|
flip fix CWaitLWW $ \next -> \case
|
||||||
then throwIO RpcTimeout
|
CWaitLWW -> do
|
||||||
else do
|
notice $ "wait" <+> pretty (AsBase58 repoKey)
|
||||||
rv <- getRepoRefMaybe
|
getRepoRefMaybe >>= \case
|
||||||
maybe1 rv (pause @'Seconds 1 >> waitForLWWRef till) $ \LWWRef{..} -> do
|
Nothing -> do
|
||||||
debug $ "waitRepo: LWWRef arrived" <+> pretty lwwValue
|
pause @'Seconds 1
|
||||||
|
next CWaitLWW
|
||||||
|
|
||||||
-- Парсим манифест репозитория
|
Just v -> next $ CCheckManifest v
|
||||||
repo <- getRepoManifest
|
|
||||||
|
|
||||||
-- Достаём `reflog`
|
CCheckManifest LWWRef{} -> do
|
||||||
reflog <- [ x | ListVal [SymbolVal "reflog", SignPubKeyLike x] <- repo ]
|
notice "check manifest"
|
||||||
& headMay
|
r <- try @_ @HBS2GitExcepion getRepoManifest
|
||||||
& orThrowUser "malformed repo manifest"
|
case r of
|
||||||
|
Left GitRepoRefEmpty -> next CWaitLWW
|
||||||
|
|
||||||
-- Подписываемся на `reflog`
|
Left e -> next CAborted
|
||||||
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (reflog, "reflog", 17)
|
|
||||||
>>= orThrowUser "rpc timeout while subscribing to reflog"
|
|
||||||
|
|
||||||
debug $ "waitRepo: Subscribed to reflog" <+> pretty (AsBase58 reflog)
|
Right mf -> do
|
||||||
|
let reflog = lastMay [ x | ListVal [SymbolVal "reflog", SignPubKeyLike x] <- mf ]
|
||||||
|
case reflog of
|
||||||
|
Nothing -> next CAborted
|
||||||
|
Just rf -> do
|
||||||
|
|
||||||
-- Запускаем асинхронную задачу для периодического вызова RpcRefLogFetch
|
callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (rf, "reflog", 17)
|
||||||
withAsync (periodicFetch reflog) $ \_ -> do
|
>>= orThrow RpcTimeout
|
||||||
-- Ждём появления значений в `reflog`
|
|
||||||
waitForReflog till reflog
|
|
||||||
|
|
||||||
liftIO $ withGit3Env env $ waitForLWWRef (startTime + fromNanoSecs maxTimeout)
|
atomically do
|
||||||
|
writeTMVar refLog1_ rf
|
||||||
|
writeTMVar refLog2_ rf
|
||||||
|
|
||||||
|
CAborted -> err "waitRepo aborted" >> none
|
||||||
|
|
||||||
|
|
||||||
|
waitAnyCatchCancel [p1,p2]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue