From 885682eb8a39f9ad42f890ea17a64f36582445c3 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 24 Mar 2023 09:28:55 +0300 Subject: [PATCH] ASAP-start-only-one-instance-for-link-monitor --- .fixme/log | 4 ++- hbs2-peer/app/RefLog.hs | 78 +++++++++++++++++++++++------------------ 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/.fixme/log b/.fixme/log index 7a187eb7..d5e12a03 100644 --- a/.fixme/log +++ b/.fixme/log @@ -302,4 +302,6 @@ fixme-del "F8cffipg87" (fixme-set "workflow" "test" "RsTry2C5Gk") (fixme-set "workflow" "done" "4QshZka3se") (fixme-set "workflow" "done" "8BrTPZMcQ8") -(fixme-set "workflow" "done" "7YxXxvxtmx") \ No newline at end of file +(fixme-set "workflow" "done" "7YxXxvxtmx") +(fixme-set "workflow" "test" "9NTjeVtQwV") +(fixme-set "workflow" "done" "8TFq4jSHUM") \ No newline at end of file diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index 797b554d..03c9d1f6 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -28,7 +28,6 @@ import Data.Functor import Data.Function(fix) import Data.Maybe import Data.Foldable(for_) -import Data.List qualified as List import Data.Text qualified as Text import Control.Concurrent.STM import Control.Monad @@ -37,6 +36,7 @@ import Data.ByteString qualified as BS import Data.HashMap.Strict qualified as HashMap import Codec.Serialise import Data.HashSet qualified as HashSet +import Data.HashSet (HashSet) import Control.Concurrent.Async import Control.Monad.Trans.Maybe import Lens.Micro.Platform @@ -130,10 +130,14 @@ reflogWorker conf adapter = do q <- liftIO newTQueueIO - let reflogUpdate reflog ha tran = do + let reflogTimeout puk h = do + -- FIXME: fix-time-hardcode-again + pause @'Seconds 1200 + err $ "reflog dowload timeout" <+> pretty (AsBase58 puk) <+> pretty h + + let reflogUpdate reflog _ tran = do signed <- verifyRefLogUpdate tran when signed do - -- trace $ "GOT PRETTY VALID REFLOG UPDATE TRANSACTION" <+> pretty ha liftIO $ atomically $ writeTQueue q (reflog, [tran]) @@ -147,6 +151,7 @@ reflogWorker conf adapter = do AnnRef ref -> do liftIO $ reflogDownload adapter ref + -- TODO: support-other-data-structures _ -> pure () subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v)) -> do @@ -154,36 +159,43 @@ reflogWorker conf adapter = do liftIO $ reflogUpdate reflog Nothing v liftIO $ atomically $ writeTQueue q (reflog, [v]) + + reflogMon <- liftIO $ newTVarIO (mempty :: HashSet (Hash HbSync)) + subscribe @e RefLogReqAnswerKey $ \(RefLogReqAnswerData reflog h) -> do - trace $ "reflog worker. GOT REFLOG ANSWER" <+> pretty (AsBase58 reflog) <+> pretty h -- TODO: ASAP-only-process-link-if-we-subscribed -- TODO: ASAP-start-only-one-instance-for-link-monitor - -- TODO: periodically-check-if-reflog-is-done - -- TODO: ASAP-when-done-delete-monitor -- TODO: ASAP-dont-do-if-already-done - void $ liftIO $ race (pause @'Seconds 3600) do - -- FIXME: log-this-situation - -- FIXME: fix-time-hardcode-again - reflogDownload adapter h - fix \next -> do - missed <- missedEntries sto h - if missed /= 0 then do - pause @'Seconds 1 - trace $ "reflogWorker: missed refs for" <+> pretty h <+> pretty missed - next - else do - trace $ "block" <+> pretty h <+> "is downloaded" - hashes <- readHashesFromBlock sto (Just h) - for_ hashes $ \ha -> runMaybeT do - bss <- liftIO $ getBlock sto (fromHashRef ha) - when (isNothing bss) do - liftIO $ reflogDownload adapter (fromHashRef ha) + here <- liftIO $ readTVarIO reflogMon <&> HashSet.member h + unless here do + void $ liftIO $ async $ do + timeout <- async (reflogTimeout reflog h) + work <- async $ do + trace $ "reflog worker. GOT REFLOG ANSWER" <+> pretty (AsBase58 reflog) <+> pretty h + reflogDownload adapter h + fix \next -> do + missed <- missedEntries sto h + if missed /= 0 then do + pause @'Seconds 1 + trace $ "reflogWorker: missed refs for" <+> pretty h <+> pretty missed + next + else do + trace $ "block" <+> pretty h <+> "is downloaded" + hashes <- readHashesFromBlock sto (Just h) + for_ hashes $ \ha -> runMaybeT do + bss <- liftIO $ getBlock sto (fromHashRef ha) - bs <- MaybeT $ pure bss + when (isNothing bss) do + liftIO $ reflogDownload adapter (fromHashRef ha) - tran <- MaybeT $ pure $ deserialiseOrFail @(RefLogUpdate e) bs & either (const Nothing) Just - liftIO $ reflogUpdate reflog (Just ha) tran + bs <- MaybeT $ pure bss + + tran <- MaybeT $ pure $ deserialiseOrFail @(RefLogUpdate e) bs & either (const Nothing) Just + liftIO $ reflogUpdate reflog (Just ha) tran + + void $ waitAnyCatchCancel [timeout,work] + atomically $ modifyTVar' reflogMon (HashSet.delete h) let (PeerConfig syn) = conf @@ -217,16 +229,17 @@ reflogWorker conf adapter = do pause (fromIntegral i :: Timeout 'Minutes) w1 <- liftIO $ async $ forever $ do - el0 <- liftIO $ atomically $ readTQueue q + + -- TODO: reflog-process-period-to-config + pause @'Seconds 10 + els <- liftIO $ atomically $ flushTQueue q - let byRef = HashMap.fromListWith (<>) (el0 : els) + let byRef = HashMap.fromListWith (<>) els for_ (HashMap.toList byRef) $ \(r,x) -> do let reflogkey = RefLogKey r - -- trace $ "UPDATE REFLOG" <+> pretty (hashObject @HbSync reflogkey) <+> pretty (fmap AsBase58 x) h' <- liftIO $! getRef sto (RefLogKey r) - -- trace $ "UPDATE REGLOG OKAY" <+> pretty (isJust h') hashes <- liftIO $ readHashesFromBlock sto h' @@ -250,11 +263,6 @@ reflogWorker conf adapter = do trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty newRoot - -- TODO: read-reflog-value - -- TODO: read-reflog-hashes - -- TODO: store-all-values - -- TODO: get all hashes - trace "I'm a reflog update worker" pollers <- liftIO $ wait pollers'