wip, polling function

This commit is contained in:
Dmitry Zuikov 2023-07-15 11:39:46 +03:00
parent 51ac42f5c0
commit 39c0ca71cc
4 changed files with 60 additions and 38 deletions

View File

@ -41,16 +41,15 @@ class IsTimeout a => MonadPause a m where
instance (IsTimeout a, MonadIO m) => MonadPause a m where
pause x = liftIO $ threadDelay (toMicroSeconds x)
instance Pretty (Fixed E9) where
instance HasResolution a => Pretty (Fixed a) where
pretty = pretty . show
newtype instance Timeout 'MilliSeconds =
TimeoutMSec (Fixed E9)
deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty)
newtype instance Timeout 'Seconds =
TimeoutSec (Fixed E9)
TimeoutSec (Fixed E12)
deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty)
newtype instance Timeout 'Minutes =

View File

@ -43,7 +43,10 @@ import Data.IntMap (IntMap)
import Data.IntSet (IntSet)
import Data.Text qualified as Text
import Data.Text.Encoding qualified as TE
import Data.Time.Clock (NominalDiffTime)
import Data.Heap qualified as Heap
import Data.Heap (Heap,Entry(..))
import Data.Time.Clock
data PeerInfo e =
PeerInfo
@ -402,3 +405,51 @@ mkPeerMeta conf = do
, mTcpPort <&> \p -> ("listen-tcp", TE.encodeUtf8 . Text.pack . show $ p)
]
data Polling =
Polling
{ waitBefore :: NominalDiffTime
, waitOnEmpty :: NominalDiffTime
}
polling :: forall a m . (MonadIO m, Hashable a)
=> Polling
-> m [(a, NominalDiffTime)]
-> (a -> m ())
-> m ()
polling o listEntries action = do
pause (TimeoutSec (nominalDiffTimeToSeconds (waitBefore o)))
now0 <- getTimeCoarse
refs0 <- listEntries <&> fmap (set _2 now0) <&> HashMap.fromList
fix (\next mon -> do
now <- getTimeCoarse
refs <- listEntries <&> HashMap.fromList
let mon' = mon `HashMap.union`
HashMap.fromList [ (e, now + fromNanoSecs (round (realToFrac (nominalDiffTimeToSeconds t) * 1e9)))
| (e, t) <- HashMap.toList refs
]
let q = Heap.fromList [ Entry t e
| (e, t) <- HashMap.toList mon'
]
case Heap.uncons q of
Just (Entry t r, _) | t <= now -> do
action r
next (HashMap.delete r mon')
Just (Entry t _, _) | otherwise -> do
pause @'Seconds $ fromInteger $ round $ realToFrac (toNanoSecs (t - now)) / 1e9
next mon'
Nothing -> do
pause (TimeoutSec (nominalDiffTimeToSeconds (waitOnEmpty o)))
next mon'
) refs0

View File

@ -173,42 +173,12 @@ refChanWorker env brains = do
where
refChanHeadPoll = do
pause @'Seconds 2
now0 <- getTimeCoarse
refs0 <- listPolledRefs @e brains "refchan" <&> fmap (set _2 now0) <&> HashMap.fromList
-- debug $ "POLL SHIT!" <+> pretty (fmap AsBase58 (HashMap.keys refs0))
fix (\next mon -> do
now <- getTimeCoarse
refs <- listPolledRefs @e brains "refchan" <&> HashMap.fromList
let mon' = mon `HashMap.union`
HashMap.fromList [ (e, now + fromNanoSecs (floor (1e9 * 60 * realToFrac t)))
| (e, t) <- HashMap.toList refs
]
let q = Heap.fromList [ Entry t e
| (e, t) <- HashMap.toList mon'
]
case Heap.uncons q of
Just (Entry t (r :: RefChanId e), rest) | t <= now -> do
debug $ "POLLING REFCHAN" <+> pretty (AsBase58 r)
broadCastMessage (RefChanGetHead @e r)
-- TODO: send-poll-request
next (HashMap.delete r mon')
Just (Entry t (r :: RefChanId e), _) | otherwise -> do
pause @'Seconds $ fromInteger $ floor $ realToFrac (toNanoSecs (t - now)) / 1e9
next mon'
Nothing -> do
pause @'Seconds 5
next mon'
) refs0
let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) )
polling (Polling 2 5) listRefs $ \ref -> do
debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref)
broadCastMessage (RefChanGetHead @e ref)
monitorDownloads = forever do
pause @'Seconds 2
@ -216,6 +186,7 @@ refChanWorker env brains = do
now <- getTimeCoarse
-- FIXME: change-to-polling-functions
-- FIXME: consider-timeouts-or-leak-is-possible
rest <- forM all $ \(r,item@(chan,t)) -> do
here <- checkDownloaded r

View File

@ -51,6 +51,7 @@ common common-deps
, sqlite-simple
, temporary
, text
, time
, timeit
, transformers
, uniplate