diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 34b73f45..80414ab3 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -137,6 +137,7 @@ refChanWorker :: forall e s m . ( MonadIO m , Pretty (AsBase58 (PubKey 'Sign s)) , ForRefChans e , EventListener e (RefChanRound e) m + , Sessions e (RefChanRound e) m , m ~ PeerM e IO ) => RefChanWorkerEnv e @@ -147,9 +148,6 @@ refChanWorker env brains = do penv <- ask - subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do - debug $ "ON ROUND STARTED" <+> pretty rcrk - pure () -- FIXME: resume-on-exception hw <- async (refChanHeadMon penv) @@ -170,9 +168,36 @@ refChanWorker env brains = do where - cleanupRounds = forever do - pause @'Seconds 20 - pure () + cleanupRounds = do + + rounds <- newTVarIO HashSet.empty + + subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do + atomically $ modifyTVar rounds (HashSet.insert rcrk) + debug $ "ON ROUND STARTED" <+> pretty rcrk + + forever do + pause @'Seconds 30 + + now <- getTimeCoarse + xs <- readTVarIO rounds <&> HashSet.toList + + forM_ xs $ \x -> do + + void $ runMaybeT do + se <- MaybeT $ find @e x id + + closed <- readTVarIO (view refChanRoundClosed se) + let ts = view refChanRoundTS se + + -- FIXME: use-wait-from-head + let expired = toNanoSecs (now - ts) > toNanoSecs ( round (60*1e9) ) + + when (closed || expired) do + lift $ expire x + atomically $ modifyTVar rounds (HashSet.delete x) + debug $ "CLEANUP ROUND" <+> pretty x + refChanWriter = forever do pause @'Seconds 1