From 39c0ca71ccd1624b8466f67e71bafbac9751a294 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 15 Jul 2023 11:39:46 +0300 Subject: [PATCH] wip, polling function --- hbs2-core/lib/HBS2/Clock.hs | 5 ++-- hbs2-peer/app/PeerTypes.hs | 53 ++++++++++++++++++++++++++++++++++++- hbs2-peer/app/RefChan.hs | 39 ++++----------------------- hbs2-peer/hbs2-peer.cabal | 1 + 4 files changed, 60 insertions(+), 38 deletions(-) diff --git a/hbs2-core/lib/HBS2/Clock.hs b/hbs2-core/lib/HBS2/Clock.hs index 3c124ff5..c9947d24 100644 --- a/hbs2-core/lib/HBS2/Clock.hs +++ b/hbs2-core/lib/HBS2/Clock.hs @@ -41,16 +41,15 @@ class IsTimeout a => MonadPause a m where instance (IsTimeout a, MonadIO m) => MonadPause a m where pause x = liftIO $ threadDelay (toMicroSeconds x) -instance Pretty (Fixed E9) where +instance HasResolution a => Pretty (Fixed a) where pretty = pretty . show - newtype instance Timeout 'MilliSeconds = TimeoutMSec (Fixed E9) deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty) newtype instance Timeout 'Seconds = - TimeoutSec (Fixed E9) + TimeoutSec (Fixed E12) deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty) newtype instance Timeout 'Minutes = diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 4a1c4310..57e67cde 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -43,7 +43,10 @@ import Data.IntMap (IntMap) import Data.IntSet (IntSet) import Data.Text qualified as Text import Data.Text.Encoding qualified as TE - +import Data.Time.Clock (NominalDiffTime) +import Data.Heap qualified as Heap +import Data.Heap (Heap,Entry(..)) +import Data.Time.Clock data PeerInfo e = PeerInfo @@ -402,3 +405,51 @@ mkPeerMeta conf = do , mTcpPort <&> \p -> ("listen-tcp", TE.encodeUtf8 . Text.pack . show $ p) ] + +data Polling = + Polling + { waitBefore :: NominalDiffTime + , waitOnEmpty :: NominalDiffTime + } + +polling :: forall a m . (MonadIO m, Hashable a) + => Polling + -> m [(a, NominalDiffTime)] + -> (a -> m ()) + -> m () + +polling o listEntries action = do + + pause (TimeoutSec (nominalDiffTimeToSeconds (waitBefore o))) + + now0 <- getTimeCoarse + refs0 <- listEntries <&> fmap (set _2 now0) <&> HashMap.fromList + + fix (\next mon -> do + now <- getTimeCoarse + refs <- listEntries <&> HashMap.fromList + let mon' = mon `HashMap.union` + HashMap.fromList [ (e, now + fromNanoSecs (round (realToFrac (nominalDiffTimeToSeconds t) * 1e9))) + | (e, t) <- HashMap.toList refs + ] + + let q = Heap.fromList [ Entry t e + | (e, t) <- HashMap.toList mon' + ] + + case Heap.uncons q of + Just (Entry t r, _) | t <= now -> do + action r + next (HashMap.delete r mon') + + Just (Entry t _, _) | otherwise -> do + pause @'Seconds $ fromInteger $ round $ realToFrac (toNanoSecs (t - now)) / 1e9 + next mon' + + Nothing -> do + pause (TimeoutSec (nominalDiffTimeToSeconds (waitOnEmpty o))) + next mon' + + ) refs0 + + diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 2b361dcf..7c4de984 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -173,42 +173,12 @@ refChanWorker env brains = do where refChanHeadPoll = do - pause @'Seconds 2 - now0 <- getTimeCoarse - refs0 <- listPolledRefs @e brains "refchan" <&> fmap (set _2 now0) <&> HashMap.fromList - - -- debug $ "POLL SHIT!" <+> pretty (fmap AsBase58 (HashMap.keys refs0)) - - fix (\next mon -> do - now <- getTimeCoarse - refs <- listPolledRefs @e brains "refchan" <&> HashMap.fromList - let mon' = mon `HashMap.union` - HashMap.fromList [ (e, now + fromNanoSecs (floor (1e9 * 60 * realToFrac t))) - | (e, t) <- HashMap.toList refs - ] - - let q = Heap.fromList [ Entry t e - | (e, t) <- HashMap.toList mon' - ] - - case Heap.uncons q of - Just (Entry t (r :: RefChanId e), rest) | t <= now -> do - debug $ "POLLING REFCHAN" <+> pretty (AsBase58 r) - broadCastMessage (RefChanGetHead @e r) - -- TODO: send-poll-request - next (HashMap.delete r mon') - - Just (Entry t (r :: RefChanId e), _) | otherwise -> do - pause @'Seconds $ fromInteger $ floor $ realToFrac (toNanoSecs (t - now)) / 1e9 - next mon' - - Nothing -> do - pause @'Seconds 5 - next mon' - - ) refs0 + let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) + polling (Polling 2 5) listRefs $ \ref -> do + debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref) + broadCastMessage (RefChanGetHead @e ref) monitorDownloads = forever do pause @'Seconds 2 @@ -216,6 +186,7 @@ refChanWorker env brains = do now <- getTimeCoarse + -- FIXME: change-to-polling-functions -- FIXME: consider-timeouts-or-leak-is-possible rest <- forM all $ \(r,item@(chan,t)) -> do here <- checkDownloaded r diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index ed846145..f088af5c 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -51,6 +51,7 @@ common common-deps , sqlite-simple , temporary , text + , time , timeit , transformers , uniplate