From e71ff57773cfc0ac19bcb502721aaf85047c2b80 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 15 Jul 2023 10:24:32 +0300 Subject: [PATCH] wip, tryin to fetch refchan head --- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 7 +++- hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 5 ++- hbs2-peer/app/PeerMain.hs | 2 +- hbs2-peer/app/RefChan.hs | 46 ++++++++++++++++++++-- hbs2-peer/hbs2-peer.cabal | 1 + 5 files changed, 54 insertions(+), 7 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 5daddf1e..fb02e33e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -104,7 +104,8 @@ instance HasProtocol L4Proto (RefLogUpdate L4Proto) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise - requestPeriodLim = ReqLimPerMessage 600 + -- TODO: find-out-optimal-max-safe-frequency + requestPeriodLim = ReqLimPerMessage 60 instance HasProtocol L4Proto (RefLogRequest L4Proto) where type instance ProtocolId (RefLogRequest L4Proto) = 8 @@ -126,7 +127,9 @@ instance HasProtocol L4Proto (RefChanHead L4Proto) where type instance Encoded L4Proto = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise - -- requestPeriodLim = ReqLimPerMessage 600 + + -- TODO: find-out-optimal-max-frequency + requestPeriodLim = ReqLimPerMessage 60 instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 5cc7394a..8199653c 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -150,7 +150,10 @@ refChanHeadProto self adapter msg = do RefChanHead chan pkt -> do guard =<< lift (refChanHeadSubscribed adapter chan) trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan) - -- FIXME: check-chan-is-listened + -- TODO: notify-others-for-new-head + -- нужно ли уведомить остальных, что голова поменялась? + -- всех, от кого мы еще не получали данное сообщение + -- откуда мы знаем, от кого мы получали данное сообщение? lift $ refChanHeadOnHead adapter chan pkt RefChanGetHead chan -> deferred proto do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index ade69bf1..1a53ef5c 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -787,7 +787,7 @@ runPeer opts = U.handle (\e -> myException e peerThread "reflogWorker" (reflogWorker @e conf rwa) - peerThread "refChanWorker" (refChanWorker @e rce) + peerThread "refChanWorker" (refChanWorker @e rce (SomeBrains brains)) peerThread "ping pong" $ forever $ do cmd <- liftIO $ atomically $ readTQueue rpcQ diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index fac4ccad..eacc7a9f 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -31,6 +31,7 @@ import HBS2.System.Logger.Simple import PeerTypes import PeerConfig import BlockDownload +import Brains import Control.Exception () import Control.Monad.Except (throwError, runExceptT) @@ -40,9 +41,13 @@ import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet import Data.List qualified as List import Data.Maybe import Lens.Micro.Platform +import Data.Heap qualified as Heap +import Data.Heap (Heap,Entry(..)) import UnliftIO import Streaming.Prelude qualified as S @@ -145,9 +150,10 @@ refChanWorker :: forall e s m . ( MonadIO m , m ~ PeerM e IO ) => RefChanWorkerEnv e + -> SomeBrains e -> m () -refChanWorker env = do +refChanWorker env brains = do penv <- ask @@ -156,14 +162,49 @@ refChanWorker env = do downloads <- async monitorDownloads + polls <- async refChanHeadPoll + forever do pause @'Seconds 10 debug "I'm refchan worker" - mapM_ waitCatch [hw,downloads] + mapM_ waitCatch [hw,downloads,polls] where + refChanHeadPoll = do + pause @'Seconds 2 + + fix (\next mon -> do + now <- getTimeCoarse + refs <- listPolledRefs @e brains "refchan" <&> HashMap.fromList + let mon' = mon `HashMap.union` + HashMap.fromList [ (e, now + fromNanoSecs (floor (1e9 * 60 * realToFrac t))) + | (e, t) <- HashMap.toList refs + ] + + let q = Heap.fromList [ Entry t e + | (e, t) <- HashMap.toList mon' + ] + + case Heap.uncons q of + Just (Entry t (r :: RefChanId e), rest) | t <= now -> do + debug $ "POLLING REFCHAN" <+> pretty (AsBase58 r) + broadCastMessage (RefChanGetHead @e r) + -- TODO: send-poll-request + next (HashMap.delete r mon') + + Just (Entry t (r :: RefChanId e), _) | otherwise -> do + pause @'Seconds $ fromInteger $ floor $ realToFrac (toNanoSecs (t - now)) / 1e9 + next mon' + + Nothing -> do + pause @'Seconds 5 + next mon' + + ) mempty + + monitorDownloads = forever do pause @'Seconds 2 all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList @@ -237,7 +278,6 @@ refChanWorker env = do when notify do debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr)) - pure () else do debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0) diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 92da2b00..ed846145 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -66,6 +66,7 @@ common common-deps , wai-extra , unliftio , unix + , heaps common shared-properties ghc-options: