ASAP-start-only-one-instance-for-link-monitor

This commit is contained in:
Dmitry Zuikov 2023-03-24 09:28:55 +03:00
parent b3ec2418a7
commit 885682eb8a
2 changed files with 46 additions and 36 deletions

View File

@ -302,4 +302,6 @@ fixme-del "F8cffipg87"
(fixme-set "workflow" "test" "RsTry2C5Gk") (fixme-set "workflow" "test" "RsTry2C5Gk")
(fixme-set "workflow" "done" "4QshZka3se") (fixme-set "workflow" "done" "4QshZka3se")
(fixme-set "workflow" "done" "8BrTPZMcQ8") (fixme-set "workflow" "done" "8BrTPZMcQ8")
(fixme-set "workflow" "done" "7YxXxvxtmx") (fixme-set "workflow" "done" "7YxXxvxtmx")
(fixme-set "workflow" "test" "9NTjeVtQwV")
(fixme-set "workflow" "done" "8TFq4jSHUM")

View File

@ -28,7 +28,6 @@ import Data.Functor
import Data.Function(fix) import Data.Function(fix)
import Data.Maybe import Data.Maybe
import Data.Foldable(for_) import Data.Foldable(for_)
import Data.List qualified as List
import Data.Text qualified as Text import Data.Text qualified as Text
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Monad import Control.Monad
@ -37,6 +36,7 @@ import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Codec.Serialise import Codec.Serialise
import Data.HashSet qualified as HashSet import Data.HashSet qualified as HashSet
import Data.HashSet (HashSet)
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Lens.Micro.Platform import Lens.Micro.Platform
@ -130,10 +130,14 @@ reflogWorker conf adapter = do
q <- liftIO newTQueueIO q <- liftIO newTQueueIO
let reflogUpdate reflog ha tran = do let reflogTimeout puk h = do
-- FIXME: fix-time-hardcode-again
pause @'Seconds 1200
err $ "reflog dowload timeout" <+> pretty (AsBase58 puk) <+> pretty h
let reflogUpdate reflog _ tran = do
signed <- verifyRefLogUpdate tran signed <- verifyRefLogUpdate tran
when signed do when signed do
-- trace $ "GOT PRETTY VALID REFLOG UPDATE TRANSACTION" <+> pretty ha
liftIO $ atomically $ writeTQueue q (reflog, [tran]) liftIO $ atomically $ writeTQueue q (reflog, [tran])
@ -147,6 +151,7 @@ reflogWorker conf adapter = do
AnnRef ref -> do AnnRef ref -> do
liftIO $ reflogDownload adapter ref liftIO $ reflogDownload adapter ref
-- TODO: support-other-data-structures
_ -> pure () _ -> pure ()
subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v)) -> do subscribe @e RefLogUpdateEvKey $ \(RefLogUpdateEvData (reflog,v)) -> do
@ -154,36 +159,43 @@ reflogWorker conf adapter = do
liftIO $ reflogUpdate reflog Nothing v liftIO $ reflogUpdate reflog Nothing v
liftIO $ atomically $ writeTQueue q (reflog, [v]) liftIO $ atomically $ writeTQueue q (reflog, [v])
reflogMon <- liftIO $ newTVarIO (mempty :: HashSet (Hash HbSync))
subscribe @e RefLogReqAnswerKey $ \(RefLogReqAnswerData reflog h) -> do subscribe @e RefLogReqAnswerKey $ \(RefLogReqAnswerData reflog h) -> do
trace $ "reflog worker. GOT REFLOG ANSWER" <+> pretty (AsBase58 reflog) <+> pretty h
-- TODO: ASAP-only-process-link-if-we-subscribed -- TODO: ASAP-only-process-link-if-we-subscribed
-- TODO: ASAP-start-only-one-instance-for-link-monitor -- TODO: ASAP-start-only-one-instance-for-link-monitor
-- TODO: periodically-check-if-reflog-is-done
-- TODO: ASAP-when-done-delete-monitor
-- TODO: ASAP-dont-do-if-already-done -- TODO: ASAP-dont-do-if-already-done
void $ liftIO $ race (pause @'Seconds 3600) do
-- FIXME: log-this-situation
-- FIXME: fix-time-hardcode-again
reflogDownload adapter h
fix \next -> do
missed <- missedEntries sto h
if missed /= 0 then do
pause @'Seconds 1
trace $ "reflogWorker: missed refs for" <+> pretty h <+> pretty missed
next
else do
trace $ "block" <+> pretty h <+> "is downloaded"
hashes <- readHashesFromBlock sto (Just h)
for_ hashes $ \ha -> runMaybeT do
bss <- liftIO $ getBlock sto (fromHashRef ha)
when (isNothing bss) do here <- liftIO $ readTVarIO reflogMon <&> HashSet.member h
liftIO $ reflogDownload adapter (fromHashRef ha) unless here do
void $ liftIO $ async $ do
timeout <- async (reflogTimeout reflog h)
work <- async $ do
trace $ "reflog worker. GOT REFLOG ANSWER" <+> pretty (AsBase58 reflog) <+> pretty h
reflogDownload adapter h
fix \next -> do
missed <- missedEntries sto h
if missed /= 0 then do
pause @'Seconds 1
trace $ "reflogWorker: missed refs for" <+> pretty h <+> pretty missed
next
else do
trace $ "block" <+> pretty h <+> "is downloaded"
hashes <- readHashesFromBlock sto (Just h)
for_ hashes $ \ha -> runMaybeT do
bss <- liftIO $ getBlock sto (fromHashRef ha)
bs <- MaybeT $ pure bss when (isNothing bss) do
liftIO $ reflogDownload adapter (fromHashRef ha)
tran <- MaybeT $ pure $ deserialiseOrFail @(RefLogUpdate e) bs & either (const Nothing) Just bs <- MaybeT $ pure bss
liftIO $ reflogUpdate reflog (Just ha) tran
tran <- MaybeT $ pure $ deserialiseOrFail @(RefLogUpdate e) bs & either (const Nothing) Just
liftIO $ reflogUpdate reflog (Just ha) tran
void $ waitAnyCatchCancel [timeout,work]
atomically $ modifyTVar' reflogMon (HashSet.delete h)
let (PeerConfig syn) = conf let (PeerConfig syn) = conf
@ -217,16 +229,17 @@ reflogWorker conf adapter = do
pause (fromIntegral i :: Timeout 'Minutes) pause (fromIntegral i :: Timeout 'Minutes)
w1 <- liftIO $ async $ forever $ do w1 <- liftIO $ async $ forever $ do
el0 <- liftIO $ atomically $ readTQueue q
-- TODO: reflog-process-period-to-config
pause @'Seconds 10
els <- liftIO $ atomically $ flushTQueue q els <- liftIO $ atomically $ flushTQueue q
let byRef = HashMap.fromListWith (<>) (el0 : els) let byRef = HashMap.fromListWith (<>) els
for_ (HashMap.toList byRef) $ \(r,x) -> do for_ (HashMap.toList byRef) $ \(r,x) -> do
let reflogkey = RefLogKey r let reflogkey = RefLogKey r
-- trace $ "UPDATE REFLOG" <+> pretty (hashObject @HbSync reflogkey) <+> pretty (fmap AsBase58 x)
h' <- liftIO $! getRef sto (RefLogKey r) h' <- liftIO $! getRef sto (RefLogKey r)
-- trace $ "UPDATE REGLOG OKAY" <+> pretty (isJust h')
hashes <- liftIO $ readHashesFromBlock sto h' hashes <- liftIO $ readHashesFromBlock sto h'
@ -250,11 +263,6 @@ reflogWorker conf adapter = do
trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty newRoot trace $ "new reflog value" <+> pretty (AsBase58 r) <+> pretty newRoot
-- TODO: read-reflog-value
-- TODO: read-reflog-hashes
-- TODO: store-all-values
-- TODO: get all hashes
trace "I'm a reflog update worker" trace "I'm a reflog update worker"
pollers <- liftIO $ wait pollers' pollers <- liftIO $ wait pollers'