This commit is contained in:
voidlizard 2025-02-11 19:53:45 +03:00
parent 96b5b051b3
commit deeea55760
5 changed files with 177 additions and 146 deletions

View File

@ -137,31 +137,32 @@ main = flip runContT pure do
setupLogger
origStderr <- liftIO $ dup stdError
(readEnd, writeEnd) <- liftIO createPipe
liftIO $ dupTo writeEnd stdError
liftIO $ closeFd writeEnd
-- origStderr <- liftIO $ dup stdError
-- (readEnd, writeEnd) <- liftIO createPipe
-- liftIO $ dupTo writeEnd stdError
-- liftIO $ closeFd writeEnd
rStderr <- liftIO $ fdToHandle readEnd
origHandle <- liftIO $ fdToHandle origStderr
-- rStderr <- liftIO $ fdToHandle readEnd
-- origHandle <- liftIO $ fdToHandle origStderr
liftIO $ hSetBuffering origHandle NoBuffering
-- liftIO $ hSetBuffering origHandle NoBuffering
-- liftIO $ IO.hPutStr origHandle "\n"
ContT $ withAsync $ liftIO $ forever do
-- ContT $ withAsync $ liftIO $ forever do
-- pause @'Seconds 0.25
wut <- IO.hGetContents rStderr <&> lines
for_ wut $ \s -> do
IO.hPutStr origHandle (replicate 100 ' ')
IO.hPutStr origHandle "\r"
IO.hPutStr origHandle s
IO.hPutStr origHandle "\r"
pause @'Seconds 0.05
-- wut <- IO.hGetContents rStderr <&> lines
-- for_ wut $ \s -> do
-- IO.hPutStrLn rStderr s
-- IO.hPutStr origHandle (replicate 100 ' ')
-- IO.hPutStr origHandle "\r"
-- IO.hPutStr origHandle s
-- IO.hPutStr origHandle "\r"
-- pause @'Seconds 0.05
ContT $ bracket none $ const do
IO.hPutStr origHandle (replicate 100 ' ')
IO.hPutStr origHandle "\r"
silence
-- ContT $ bracket none $ const do
-- IO.hPutStr origHandle (replicate 100 ' ')
-- IO.hPutStr origHandle "\r"
-- silence
lift $ void $ installHandler sigPIPE Ignore Nothing
env <- nullGit3Env
@ -177,6 +178,7 @@ main = flip runContT pure do
-- d_ <- asks gitRuntimeDict
-- atomically $ writeTVar d_ (Just (RuntimeDict fuck))
--
conf <- readLocalConf
@ -191,7 +193,13 @@ main = flip runContT pure do
_ -> pure Nothing
recover $ connectedDo $ withStateDo do
waitRepo Nothing =<< getGitRepoKeyThrow
notice "WAIT-FOR-REPO-DONE"
void $ run dict conf
for_ url updateRepoKey
flip fix Plain $ \next -> \case

View File

@ -18,6 +18,7 @@ import HBS2.System.Dir
import Data.Config.Suckless.Almost.RPC
import Data.Config.Suckless.Script
import Control.Applicative
import Codec.Compression.Zlib qualified as Zlib
import Data.ByteString.Lazy.Char8 qualified as LBS8
import Data.ByteString.Lazy qualified as LBS
@ -116,8 +117,8 @@ writeAsGitPack dir href = do
data ImportStage =
ImportStart
| ImportWIP Int (Maybe HashRef)
| ImportWait (Maybe Int) ImportStage
| ImportWIP (Timeout 'Seconds) Int (Maybe HashRef)
| ImportWait (Timeout 'Seconds) (Maybe Int) ImportStage
| ImportDone (Maybe HashRef)
{- HLINT ignore "Functor law" -}
@ -126,130 +127,158 @@ importGitRefLog :: forall m . ( HBS2GitPerks m
)
=> Git3 m (Maybe HashRef)
importGitRefLog = withStateDo $ ask >>= \case
Git3Disconnected{} -> throwIO Git3PeerNotConnected
env@Git3Connected{..} -> do
importGitRefLog = do
packs <- gitDir
>>= orThrow NoGitDir
<&> (</> "objects/pack")
mkdir packs
sto <- getStorage
doImport packs `catch` (\( e :: OperationError) -> err (viaShow e) >> pause @'Seconds 1 >> doImport packs)
already_ <- newTVarIO (mempty :: HashSet HashRef)
where
doImport packs = withStateDo $ ask >>= \case
Git3Disconnected{} -> throwIO Git3PeerNotConnected
Git3Connected{..} -> flip runContT pure do
flip fix ImportStart $ \again -> \case
ImportDone x -> do
notice "import done"
updateReflogIndex
for_ x updateImportedCheckpoint
sto <- getStorage
refs <- importedRefs
already_ <- newTVarIO (mempty :: HashSet HashRef)
if not (null refs && isJust x) then do
pure x
else do
notice $ "no refs arrived - go again"
again ImportStart
oldRvl <- gitRefLogVal & readTVarIO
reflog <- getGitRemoteKey >>= orThrow Git3ReflogNotSet
newRvl_ <- newTVarIO Nothing
ImportWait d next -> do
void $ ContT $ withAsync $ forever do
void $ lift (callRpcWaitMay @RpcRefLogFetch (TimeoutSec 2) reflogAPI reflog)
pause @'Seconds 1.15
lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) reflogAPI reflog)
>>= \case
Just (Just x) | Just x /= oldRvl -> atomically (writeTVar newRvl_ (Just x))
_ -> none
down <- callRpcWaitRetry @RpcGetProbes (TimeoutSec 1) 3 peerAPI ()
>>= orThrow RpcTimeout
<&> maybe 0 fromIntegral . headMay . mapMaybe \case
ProbeSnapshotElement "Download.wip" n -> Just n
_ -> Nothing
pause @'Seconds 10
notice $ "wait some time..." <+> parens (pretty down)
lift $ flip fix ImportStart $ \again -> \case
ImportDone x -> do
notice "import done"
case d of
Just n | down /= n || down == 0 -> again next
newRlv <- readTVarIO newRvl_
let doAgain = newRlv /= oldRvl
_ -> pause @'Seconds 2.85 >> again (ImportWait (Just down) next)
updateReflogIndex
for_ x updateImportedCheckpoint
ImportStart -> do
refs <- importedRefs
rvl <- readTVarIO gitRefLogVal
if not (null refs && isJust x) || doAgain then do
pure x
else do
atomically do
writeTVar newRvl_ Nothing
writeTVar gitRefLogVal (newRlv <|> oldRvl)
importGroupKeys
notice $ "import: go again"
again ImportStart
prev <- importedCheckpoint
ImportWait sec d next -> do
if | isNothing prev -> again $ ImportWIP 0 prev
pause sec
| prev /= rvl -> do
again $ ImportWIP 0 prev
down <- callRpcWaitRetry @RpcGetProbes (TimeoutSec 1) 3 peerAPI ()
>>= orThrow RpcTimeout
<&> maybe 0 fromIntegral . headMay . mapMaybe \case
ProbeSnapshotElement "Download.wip" n -> Just n
_ -> Nothing
| otherwise -> again $ ImportDone prev
notice $ "wait some time..." <+> parens (pretty down)
ImportWIP attempt prev -> do
case d of
Just n | down /= n || down == 0 -> again next
notice $ "download wip" <+> pretty attempt
_ -> pause @'Seconds 2.85 >> again (ImportWait (sec*1.10) (Just down) next)
r <- try @_ @OperationError $ do
ImportStart -> do
excl <- maybe1 prev (pure mempty) $ \p -> do
txListAll (Just p) <&> HS.fromList . fmap fst
rvl <- readTVarIO gitRefLogVal
rv <- refLogRef
importGroupKeys
hxs <- txList ( pure . not . flip HS.member excl ) rv
prev <- importedCheckpoint
cp' <- flip fix (fmap snd hxs, Nothing) $ \next -> \case
([], r) -> pure (gitTxTree <$> r)
(TxSegment{}:xs, l) -> next (xs, l)
(cp@(TxCheckpoint n tree) : xs, l) -> do
if | isNothing prev -> again $ ImportWIP 1.0 0 prev
-- full <- findMissedBlocks sto tree <&> L.null
missed_ <- newTVarIO 0
deepScan ScanDeep (\_ -> atomically $ modifyTVar missed_ succ)
(coerce tree)
(getBlock sto)
(const none)
| prev /= rvl -> do
again $ ImportWIP 1.0 0 prev
full <- readTVarIO missed_ <&> (==0)
| otherwise -> again $ ImportDone prev
if full && Just n > (getGitTxRank <$> l) then do
next (xs, Just cp)
else do
next (xs, l)
ImportWIP w attempt prev -> do
case cp' of
Nothing -> do
notice "no checkpoints found"
pure Nothing
notice $ "download wip" <+> pretty attempt
Just cp -> do
r <- try @_ @OperationError $ do
notice $ "found checkpoint" <+> pretty cp
txs <- txList ( pure . not . flip HS.member excl ) (Just cp)
excl <- maybe1 prev (pure mempty) $ \p -> do
txListAll (Just p) <&> HS.fromList . fmap fst
forConcurrently_ txs $ \case
(_, TxCheckpoint{}) -> none
(h, TxSegment tree) -> do
new <- readTVarIO already_ <&> not . HS.member tree
rv <- refLogRef
when new do
s <- writeAsGitPack packs tree
hxs <- txList ( pure . not . flip HS.member excl ) rv
for_ s $ \file -> do
gitRunCommand [qc|git index-pack {file}|]
>>= orThrowPassIO
cp' <- flip fix (fmap snd hxs, Nothing) $ \next -> \case
([], r) -> pure (gitTxTree <$> r)
(TxSegment{}:xs, l) -> next (xs, l)
(cp@(TxCheckpoint n tree) : xs, l) -> do
atomically $ modifyTVar already_ (HS.insert tree)
notice $ "imported" <+> pretty h
missed <- findMissedBlocks sto tree
pure (Just cp)
let full = L.null missed
case r of
Right cp -> again $ ImportDone cp
Left (MissedBlockError2 _) -> notice "missed blocks" >> again (ImportWait Nothing (ImportWIP (succ attempt) prev))
Left MissedBlockError -> notice "missed blocks" >> again (ImportWait Nothing (ImportWIP (succ attempt) prev))
Left e -> err (viaShow e) >> throwIO e
if full && Just n > (getGitTxRank <$> l) then do
next (xs, Just cp)
else do
next (xs, l)
case cp' of
Nothing -> do
notice "no checkpoints found"
pure Nothing
Just cp -> do
notice $ "found checkpoint" <+> pretty cp
txs <- txList ( pure . not . flip HS.member excl ) (Just cp)
forConcurrently_ txs $ \case
(_, TxCheckpoint{}) -> none
(h, TxSegment tree) -> do
new <- readTVarIO already_ <&> not . HS.member tree
when new do
s <- writeAsGitPack packs tree
for_ s $ \file -> do
gitRunCommand [qc|git index-pack {file}|]
>>= orThrowPassIO
atomically $ modifyTVar already_ (HS.insert tree)
notice $ "imported" <+> pretty h
pure (Just cp)
case r of
Right cp -> again $ ImportDone cp
Left (MissedBlockError2 _) -> do
notice "missed blocks"
again (ImportWait w Nothing (ImportWIP (w*1.15) (succ attempt) prev))
Left MissedBlockError -> do
notice "missed blocks"
again (ImportWait w Nothing (ImportWIP (w*1.15) (succ attempt) prev))
Left e -> err (viaShow e) >> throwIO e
groupKeysFile :: (MonadIO m) => Git3 m FilePath
@ -271,7 +300,7 @@ importGroupKeys :: forall m . ( HBS2GitPerks m
importGroupKeys = do
debug $ "importGroupKeys"
notice $ "importGroupKeys"
sto <- getStorage
already <- readGroupKeyFile

View File

@ -566,7 +566,7 @@ compression ; prints compression level
entry $ bindMatch "repo:relay-only" $ nil_ $ \case
[ SignPubKeyLike repo ] -> lift $ connectedDo do
setGitRepoKey repo
waitRepo (Just 2) =<< getGitRepoKeyThrow
waitRepo (Just 3) =<< getGitRepoKeyThrow
_ -> throwIO (BadFormException @C nil)

View File

@ -256,8 +256,6 @@ waitRepo timeout repoKey = do
when (rlv && rlog) $ done ()
reflog_ <- newEmptyTMVarIO
let wait w what x = pause @'Seconds w >> what x
callCC \forPeer -> do
@ -271,67 +269,63 @@ waitRepo timeout repoKey = do
void (callRpcWaitMay @RpcLWWRefFetch (TimeoutSec 1) lwwAPI (LWWRefKey repoKey))
pause @'Seconds 10
pFetchRefLog <- ContT $ withAsync do
r <- atomically $ takeTMVar reflog_
forever do
void (callRpcWaitMay @RpcRefLogFetch (TimeoutSec 1) reflogAPI r)
pause @'Seconds 10
lww <- flip fix () \next _ -> do
lww <- flip fix 2 \next i -> do
notice $ "wait for" <+> pretty (AsBase58 repoKey)
lift (callRpcWaitMay @RpcLWWRefGet (TimeoutSec 1) lwwAPI (LWWRefKey repoKey))
>>= \case
Just (Just x) -> pure x
_ -> wait 2 next ()
_ -> wait i next (i*1.05)
setGitRepoKey repoKey
notice $ "lwwref value" <+> pretty (lwwValue lww)
mf <- flip fix () $ \next _ -> do
notice $ "wait for manifest"
lift (try @_ @WalkMerkleError getRepoManifest) >>= \case
Left{} -> wait 1 next ()
mf <- flip fix 3 $ \next i -> do
notice $ "wait for manifest" <+> pretty i
lift (try @_ @SomeException getRepoManifest) >>= \case
Left{} -> wait i next (i*1.10)
Right x -> pure x
reflog <- getRefLog mf & orThrow GitRepoManifestMalformed
atomically $ writeTMVar reflog_ reflog
lift (callRpcWaitMay @RpcPollAdd (TimeoutSec 1) peerAPI (reflog, "reflog", 11))
>>= orThrow RpcTimeout
rv <- flip fix () \next _ -> do
notice $ "wait for data" <+> pretty (AsBase58 reflog)
lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) reflogAPI reflog)
>>= \case
Just (Just x) -> pure x
_ -> wait 2 next ()
let waiter = maybe (forever (pause @'Seconds 3600)) pause timeout
atomically $ writeTVar gitRefLogVal (Just rv)
ContT $ withAsync $ do
pause @'Seconds 1
flip fix 2 $ \next i -> do
debug $ "fetch reflog" <+> pretty (AsBase58 reflog)
void $ lift (callRpcWaitMay @RpcRefLogFetch (TimeoutSec 2) reflogAPI reflog)
pause @'Seconds i
next (i*1.05)
okay <- newEmptyTMVarIO
flip fix () $ \next _ -> do
notice $ "wait for data (2)" <+> pretty (AsBase58 reflog)
-- missed <- findMissedBlocks sto rv
missed_ <- newTVarIO 0
lift $ deepScan ScanDeep (\_ -> atomically $ modifyTVar missed_ succ) (coerce rv) (getBlock sto) (const none)
missed <- readTVarIO missed_
void $ lift $ race waiter do
when (missed > 0) do
notice $ "still missed blocks:" <+> pretty missed
wait 5 next ()
rv <- flip fix 1 \next i -> do
notice $ "wait for reflog" <+> pretty i <+> pretty (AsBase58 reflog)
lift (callRpcWaitMay @RpcRefLogGet (TimeoutSec 2) reflogAPI reflog)
>>= \case
Just (Just x) -> pure x
Nothing -> debug "fucking RPC timeout!" >> wait i next (i*1.05)
_ -> wait i next (i*1.05)
atomically $ writeTMVar okay True
atomically $ writeTVar gitRefLogVal (Just rv)
pWait <- ContT $ withAsync $ race ( pause (fromMaybe 300 timeout) ) do
void $ atomically $ takeTMVar okay
cancel pFetch
waitAnyCatchCancel [pWait, pFetch, pFetchRefLog]
notice $ "reflog" <+> pretty (AsBase58 reflog) <+> pretty rv
lift $ updateRepoKey repoKey
flip fix 5 $ \next w -> do
debug $ "reflog" <+> pretty (AsBase58 reflog) <+> pretty rv
handle (\(e :: OperationError) -> pause @'Seconds w >> next (w*1.10)) do
missed <- findMissedBlocks sto rv
if L.null missed then do
updateRepoKey repoKey
else do
notice $ "wait reflog to sync in consistent state" <+> pretty w
pause @'Seconds w
next (w*1.01)

View File

@ -832,7 +832,7 @@ runPeer opts = respawnOnError opts $ do
stn <- getNumCapabilities <&> max 2 . div 4
w <- replicateM 2 $ async $ liftIO $ simpleStorageWorker s
w <- replicateM stn $ async $ liftIO $ simpleStorageWorker s
localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast)
<&> fmap (fromSockAddr @'UDP . addrAddress) )