From deeea557607983b62d7ba772d8af75fe04277727 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 11 Feb 2025 19:53:45 +0300 Subject: [PATCH] wip --- hbs2-git3/app/GitRemoteHelper.hs | 46 ++++--- hbs2-git3/lib/HBS2/Git3/Import.hs | 199 +++++++++++++++++------------- hbs2-git3/lib/HBS2/Git3/Run.hs | 2 +- hbs2-git3/lib/HBS2/Git3/State.hs | 74 +++++------ hbs2-peer/app/PeerMain.hs | 2 +- 5 files changed, 177 insertions(+), 146 deletions(-) diff --git a/hbs2-git3/app/GitRemoteHelper.hs b/hbs2-git3/app/GitRemoteHelper.hs index 59db3456..bb27c86c 100644 --- a/hbs2-git3/app/GitRemoteHelper.hs +++ b/hbs2-git3/app/GitRemoteHelper.hs @@ -137,31 +137,32 @@ main = flip runContT pure do setupLogger - origStderr <- liftIO $ dup stdError - (readEnd, writeEnd) <- liftIO createPipe - liftIO $ dupTo writeEnd stdError - liftIO $ closeFd writeEnd + -- origStderr <- liftIO $ dup stdError + -- (readEnd, writeEnd) <- liftIO createPipe + -- liftIO $ dupTo writeEnd stdError + -- liftIO $ closeFd writeEnd - rStderr <- liftIO $ fdToHandle readEnd - origHandle <- liftIO $ fdToHandle origStderr + -- rStderr <- liftIO $ fdToHandle readEnd + -- origHandle <- liftIO $ fdToHandle origStderr - liftIO $ hSetBuffering origHandle NoBuffering + -- liftIO $ hSetBuffering origHandle NoBuffering -- liftIO $ IO.hPutStr origHandle "\n" - ContT $ withAsync $ liftIO $ forever do + -- ContT $ withAsync $ liftIO $ forever do -- pause @'Seconds 0.25 - wut <- IO.hGetContents rStderr <&> lines - for_ wut $ \s -> do - IO.hPutStr origHandle (replicate 100 ' ') - IO.hPutStr origHandle "\r" - IO.hPutStr origHandle s - IO.hPutStr origHandle "\r" - pause @'Seconds 0.05 + -- wut <- IO.hGetContents rStderr <&> lines + -- for_ wut $ \s -> do + -- IO.hPutStrLn rStderr s + -- IO.hPutStr origHandle (replicate 100 ' ') + -- IO.hPutStr origHandle "\r" + -- IO.hPutStr origHandle s + -- IO.hPutStr origHandle "\r" + -- pause @'Seconds 0.05 - ContT $ bracket none $ const do - IO.hPutStr origHandle (replicate 100 ' ') - IO.hPutStr origHandle "\r" - silence + -- ContT $ bracket none $ const do + -- IO.hPutStr origHandle (replicate 100 ' ') + -- IO.hPutStr origHandle "\r" + -- silence lift $ void $ installHandler sigPIPE Ignore Nothing env <- nullGit3Env @@ -177,6 +178,7 @@ main = flip runContT pure do -- d_ <- asks gitRuntimeDict -- atomically $ writeTVar d_ (Just (RuntimeDict fuck)) + -- conf <- readLocalConf @@ -191,7 +193,13 @@ main = flip runContT pure do _ -> pure Nothing recover $ connectedDo $ withStateDo do + + waitRepo Nothing =<< getGitRepoKeyThrow + + notice "WAIT-FOR-REPO-DONE" + void $ run dict conf + for_ url updateRepoKey flip fix Plain $ \next -> \case diff --git a/hbs2-git3/lib/HBS2/Git3/Import.hs b/hbs2-git3/lib/HBS2/Git3/Import.hs index a284a0b1..d0bb0575 100644 --- a/hbs2-git3/lib/HBS2/Git3/Import.hs +++ b/hbs2-git3/lib/HBS2/Git3/Import.hs @@ -18,6 +18,7 @@ import HBS2.System.Dir import Data.Config.Suckless.Almost.RPC import Data.Config.Suckless.Script +import Control.Applicative import Codec.Compression.Zlib qualified as Zlib import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString.Lazy qualified as LBS @@ -116,8 +117,8 @@ writeAsGitPack dir href = do data ImportStage = ImportStart - | ImportWIP Int (Maybe HashRef) - | ImportWait (Maybe Int) ImportStage + | ImportWIP (Timeout 'Seconds) Int (Maybe HashRef) + | ImportWait (Timeout 'Seconds) (Maybe Int) ImportStage | ImportDone (Maybe HashRef) {- HLINT ignore "Functor law" -} @@ -126,130 +127,158 @@ importGitRefLog :: forall m . ( HBS2GitPerks m ) => Git3 m (Maybe HashRef) -importGitRefLog = withStateDo $ ask >>= \case - Git3Disconnected{} -> throwIO Git3PeerNotConnected - env@Git3Connected{..} -> do - +importGitRefLog = do packs <- gitDir >>= orThrow NoGitDir <&> ( "objects/pack") mkdir packs - sto <- getStorage + doImport packs `catch` (\( e :: OperationError) -> err (viaShow e) >> pause @'Seconds 1 >> doImport packs) - already_ <- newTVarIO (mempty :: HashSet HashRef) + where + doImport packs = withStateDo $ ask >>= \case + Git3Disconnected{} -> throwIO Git3PeerNotConnected + Git3Connected{..} -> flip runContT pure do - flip fix ImportStart $ \again -> \case - ImportDone x -> do - notice "import done" - updateReflogIndex - for_ x updateImportedCheckpoint + sto <- getStorage - refs <- importedRefs + already_ <- newTVarIO (mempty :: HashSet HashRef) - if not (null refs && isJust x) then do - pure x - else do - notice $ "no refs arrived - go again" - again ImportStart + oldRvl <- gitRefLogVal & readTVarIO + reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet + newRvl_ <- newTVarIO Nothing - ImportWait d next -> do + void $ ContT $ withAsync $ forever do + void $ lift (callRpcWaitMay @RpcRefLogFetch (TimeoutSec 2) reflogAPI reflog) - pause @'Seconds 1.15 + lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) reflogAPI reflog) + >>= \case + Just (Just x) | Just x /= oldRvl -> atomically (writeTVar newRvl_ (Just x)) + _ -> none - down <- callRpcWaitRetry @RpcGetProbes (TimeoutSec 1) 3 peerAPI () - >>= orThrow RpcTimeout - <&> maybe 0 fromIntegral . headMay . mapMaybe \case - ProbeSnapshotElement "Download.wip" n -> Just n - _ -> Nothing + pause @'Seconds 10 - notice $ "wait some time..." <+> parens (pretty down) + lift $ flip fix ImportStart $ \again -> \case + ImportDone x -> do + notice "import done" - case d of - Just n | down /= n || down == 0 -> again next + newRlv <- readTVarIO newRvl_ + let doAgain = newRlv /= oldRvl - _ -> pause @'Seconds 2.85 >> again (ImportWait (Just down) next) + updateReflogIndex + for_ x updateImportedCheckpoint - ImportStart -> do + refs <- importedRefs - rvl <- readTVarIO gitRefLogVal + if not (null refs && isJust x) || doAgain then do + pure x + else do + atomically do + writeTVar newRvl_ Nothing + writeTVar gitRefLogVal (newRlv <|> oldRvl) - importGroupKeys + notice $ "import: go again" + again ImportStart - prev <- importedCheckpoint + ImportWait sec d next -> do - if | isNothing prev -> again $ ImportWIP 0 prev + pause sec - | prev /= rvl -> do - again $ ImportWIP 0 prev + down <- callRpcWaitRetry @RpcGetProbes (TimeoutSec 1) 3 peerAPI () + >>= orThrow RpcTimeout + <&> maybe 0 fromIntegral . headMay . mapMaybe \case + ProbeSnapshotElement "Download.wip" n -> Just n + _ -> Nothing - | otherwise -> again $ ImportDone prev + notice $ "wait some time..." <+> parens (pretty down) - ImportWIP attempt prev -> do + case d of + Just n | down /= n || down == 0 -> again next - notice $ "download wip" <+> pretty attempt + _ -> pause @'Seconds 2.85 >> again (ImportWait (sec*1.10) (Just down) next) - r <- try @_ @OperationError $ do + ImportStart -> do - excl <- maybe1 prev (pure mempty) $ \p -> do - txListAll (Just p) <&> HS.fromList . fmap fst + rvl <- readTVarIO gitRefLogVal - rv <- refLogRef + importGroupKeys - hxs <- txList ( pure . not . flip HS.member excl ) rv + prev <- importedCheckpoint - cp' <- flip fix (fmap snd hxs, Nothing) $ \next -> \case - ([], r) -> pure (gitTxTree <$> r) - (TxSegment{}:xs, l) -> next (xs, l) - (cp@(TxCheckpoint n tree) : xs, l) -> do + if | isNothing prev -> again $ ImportWIP 1.0 0 prev - -- full <- findMissedBlocks sto tree <&> L.null - missed_ <- newTVarIO 0 - deepScan ScanDeep (\_ -> atomically $ modifyTVar missed_ succ) - (coerce tree) - (getBlock sto) - (const none) + | prev /= rvl -> do + again $ ImportWIP 1.0 0 prev - full <- readTVarIO missed_ <&> (==0) + | otherwise -> again $ ImportDone prev - if full && Just n > (getGitTxRank <$> l) then do - next (xs, Just cp) - else do - next (xs, l) + ImportWIP w attempt prev -> do - case cp' of - Nothing -> do - notice "no checkpoints found" - pure Nothing + notice $ "download wip" <+> pretty attempt - Just cp -> do + r <- try @_ @OperationError $ do - notice $ "found checkpoint" <+> pretty cp - txs <- txList ( pure . not . flip HS.member excl ) (Just cp) + excl <- maybe1 prev (pure mempty) $ \p -> do + txListAll (Just p) <&> HS.fromList . fmap fst - forConcurrently_ txs $ \case - (_, TxCheckpoint{}) -> none - (h, TxSegment tree) -> do - new <- readTVarIO already_ <&> not . HS.member tree + rv <- refLogRef - when new do - s <- writeAsGitPack packs tree + hxs <- txList ( pure . not . flip HS.member excl ) rv - for_ s $ \file -> do - gitRunCommand [qc|git index-pack {file}|] - >>= orThrowPassIO + cp' <- flip fix (fmap snd hxs, Nothing) $ \next -> \case + ([], r) -> pure (gitTxTree <$> r) + (TxSegment{}:xs, l) -> next (xs, l) + (cp@(TxCheckpoint n tree) : xs, l) -> do - atomically $ modifyTVar already_ (HS.insert tree) - notice $ "imported" <+> pretty h + missed <- findMissedBlocks sto tree - pure (Just cp) + let full = L.null missed - case r of - Right cp -> again $ ImportDone cp - Left (MissedBlockError2 _) -> notice "missed blocks" >> again (ImportWait Nothing (ImportWIP (succ attempt) prev)) - Left MissedBlockError -> notice "missed blocks" >> again (ImportWait Nothing (ImportWIP (succ attempt) prev)) - Left e -> err (viaShow e) >> throwIO e + if full && Just n > (getGitTxRank <$> l) then do + next (xs, Just cp) + else do + next (xs, l) + + case cp' of + Nothing -> do + notice "no checkpoints found" + pure Nothing + + Just cp -> do + + notice $ "found checkpoint" <+> pretty cp + txs <- txList ( pure . not . flip HS.member excl ) (Just cp) + + forConcurrently_ txs $ \case + (_, TxCheckpoint{}) -> none + (h, TxSegment tree) -> do + new <- readTVarIO already_ <&> not . HS.member tree + + when new do + s <- writeAsGitPack packs tree + + for_ s $ \file -> do + gitRunCommand [qc|git index-pack {file}|] + >>= orThrowPassIO + + atomically $ modifyTVar already_ (HS.insert tree) + notice $ "imported" <+> pretty h + + pure (Just cp) + + case r of + Right cp -> again $ ImportDone cp + + Left (MissedBlockError2 _) -> do + notice "missed blocks" + again (ImportWait w Nothing (ImportWIP (w*1.15) (succ attempt) prev)) + + Left MissedBlockError -> do + notice "missed blocks" + again (ImportWait w Nothing (ImportWIP (w*1.15) (succ attempt) prev)) + + Left e -> err (viaShow e) >> throwIO e groupKeysFile :: (MonadIO m) => Git3 m FilePath @@ -271,7 +300,7 @@ importGroupKeys :: forall m . ( HBS2GitPerks m importGroupKeys = do - debug $ "importGroupKeys" + notice $ "importGroupKeys" sto <- getStorage already <- readGroupKeyFile diff --git a/hbs2-git3/lib/HBS2/Git3/Run.hs b/hbs2-git3/lib/HBS2/Git3/Run.hs index 257277ba..8c028737 100644 --- a/hbs2-git3/lib/HBS2/Git3/Run.hs +++ b/hbs2-git3/lib/HBS2/Git3/Run.hs @@ -566,7 +566,7 @@ compression ; prints compression level entry $ bindMatch "repo:relay-only" $ nil_ $ \case [ SignPubKeyLike repo ] -> lift $ connectedDo do setGitRepoKey repo - waitRepo (Just 2) =<< getGitRepoKeyThrow + waitRepo (Just 3) =<< getGitRepoKeyThrow _ -> throwIO (BadFormException @C nil) diff --git a/hbs2-git3/lib/HBS2/Git3/State.hs b/hbs2-git3/lib/HBS2/Git3/State.hs index 443fd002..be829d67 100644 --- a/hbs2-git3/lib/HBS2/Git3/State.hs +++ b/hbs2-git3/lib/HBS2/Git3/State.hs @@ -256,8 +256,6 @@ waitRepo timeout repoKey = do when (rlv && rlog) $ done () - reflog_ <- newEmptyTMVarIO - let wait w what x = pause @'Seconds w >> what x callCC \forPeer -> do @@ -271,67 +269,63 @@ waitRepo timeout repoKey = do void (callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey)) pause @'Seconds 10 - pFetchRefLog <- ContT $ withAsync do - r <- atomically $ takeTMVar reflog_ - forever do - void (callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI r) - pause @'Seconds 10 - - lww <- flip fix () \next _ -> do + lww <- flip fix 2 \next i -> do notice $ "wait for" <+> pretty (AsBase58 repoKey) lift (callRpcWaitMay @RpcLWWRefGet (TimeoutSec 1) lwwAPI (LWWRefKey repoKey)) >>= \case Just (Just x) -> pure x - _ -> wait 2 next () + _ -> wait i next (i*1.05) setGitRepoKey repoKey notice $ "lwwref value" <+> pretty (lwwValue lww) - mf <- flip fix () $ \next _ -> do - notice $ "wait for manifest" - lift (try @_ @WalkMerkleError getRepoManifest) >>= \case - Left{} -> wait 1 next () + mf <- flip fix 3 $ \next i -> do + notice $ "wait for manifest" <+> pretty i + lift (try @_ @SomeException getRepoManifest) >>= \case + Left{} -> wait i next (i*1.10) Right x -> pure x reflog <- getRefLog mf & orThrow GitRepoManifestMalformed - - atomically $ writeTMVar reflog_ reflog - lift (callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (reflog, "reflog", 11)) >>= orThrow RpcTimeout - rv <- flip fix () \next _ -> do - notice $ "wait for data" <+> pretty (AsBase58 reflog) - lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI reflog) - >>= \case - Just (Just x) -> pure x - _ -> wait 2 next () + let waiter = maybe (forever (pause @'Seconds 3600)) pause timeout - atomically $ writeTVar gitRefLogVal (Just rv) + ContT $ withAsync $ do + pause @'Seconds 1 + flip fix 2 $ \next i -> do + debug $ "fetch reflog" <+> pretty (AsBase58 reflog) + void $ lift (callRpcWaitMay @RpcRefLogFetch (TimeoutSec 2) reflogAPI reflog) + pause @'Seconds i + next (i*1.05) - okay <- newEmptyTMVarIO - flip fix () $ \next _ -> do - notice $ "wait for data (2)" <+> pretty (AsBase58 reflog) - -- missed <- findMissedBlocks sto rv - missed_ <- newTVarIO 0 - lift $ deepScan ScanDeep (\_ -> atomically $ modifyTVar missed_ succ) (coerce rv) (getBlock sto) (const none) - missed <- readTVarIO missed_ + void $ lift $ race waiter do - when (missed > 0) do - notice $ "still missed blocks:" <+> pretty missed - wait 5 next () + rv <- flip fix 1 \next i -> do + notice $ "wait for reflog" <+> pretty i <+> pretty (AsBase58 reflog) + lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) reflogAPI reflog) + >>= \case + Just (Just x) -> pure x + Nothing -> debug "fucking RPC timeout!" >> wait i next (i*1.05) + _ -> wait i next (i*1.05) - atomically $ writeTMVar okay True + atomically $ writeTVar gitRefLogVal (Just rv) - pWait <- ContT $ withAsync $ race ( pause (fromMaybe 300 timeout) ) do - void $ atomically $ takeTMVar okay + cancel pFetch - waitAnyCatchCancel [pWait, pFetch, pFetchRefLog] + notice $ "reflog" <+> pretty (AsBase58 reflog) <+> pretty rv - lift $ updateRepoKey repoKey + flip fix 5 $ \next w -> do - debug $ "reflog" <+> pretty (AsBase58 reflog) <+> pretty rv + handle (\(e :: OperationError) -> pause @'Seconds w >> next (w*1.10)) do + missed <- findMissedBlocks sto rv + if L.null missed then do + updateRepoKey repoKey + else do + notice $ "wait reflog to sync in consistent state" <+> pretty w + pause @'Seconds w + next (w*1.01) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 67338576..75587ddc 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -832,7 +832,7 @@ runPeer opts = respawnOnError opts $ do stn <- getNumCapabilities <&> max 2 . div 4 - w <- replicateM 2 $ async $ liftIO $ simpleStorageWorker s + w <- replicateM stn $ async $ liftIO $ simpleStorageWorker s localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast) <&> fmap (fromSockAddr @'UDP . addrAddress) )