diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 62a15fe8..a68f3c46 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -183,7 +183,7 @@ data RefChanAdapter e m = RefChanAdapter { refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () , refChanSubscribed :: RefChanId e -> m Bool - , refChanWriteTran :: RefChanId e -> RefChanUpdate e -> m () + , refChanWriteTran :: HashRef -> m () } refChanHeadProto :: forall e s m . ( MonadIO m @@ -362,7 +362,9 @@ refChanUpdateProto self pc adapter msg = do Accept chan box -> deferred proto do guard =<< lift (refChanSubscribed adapter chan) - debug "RefChanUpdate/ACCEPT" + let h0 = hashObject @HbSync (serialise msg) + + debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 (peerKey, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box @@ -373,6 +375,8 @@ refChanUpdateProto self pc adapter msg = do lift $ gossip msg + lift $ refChanUpdateProto True pc adapter msg + tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just @@ -423,7 +427,14 @@ refChanUpdateProto self pc adapter msg = do trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList forM_ trans $ \t -> do - debug $ "ABOUT TO STORE TRAN:" <+> pretty t + lift $ refChanWriteTran adapter t + + + let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList + votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys + + when (pips `HashSet.isSubsetOf` votes) do + debug $ "CLOSING ROUND" <+> pretty hashRef pure () -- TODO: expire-round-if-all-confirmations diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index b33fd05c..9ad47fcb 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -565,7 +565,7 @@ runPeer opts = U.handle (\e -> myException e let refChanAdapter = RefChanAdapter { refChanOnHead = refChanOnHeadFn rce , refChanSubscribed = isPolledRef @e brains - , refChanWriteTran = \_ _ -> pure () + , refChanWriteTran = refChanWriteTranFn rce } let pexFilt pips = do diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index a7e4b022..776b2b0f 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -5,6 +5,7 @@ module RefChan ( , refChanWorkerEnvHeadQ , refChanWorkerEnvDownload , refChanOnHeadFn + , refChanWriteTranFn , refChanWorker , refChanWorkerEnv , refChanNotifyOnUpdated @@ -22,7 +23,6 @@ import HBS2.Data.Types.Refs import HBS2.Net.Auth.Credentials import HBS2.Net.Proto import HBS2.Net.Proto.RefChan -import HBS2.Net.Proto.Types import HBS2.Net.Proto.Definition() import HBS2.Storage @@ -37,6 +37,7 @@ import Control.Exception () import Control.Monad.Except (throwError, runExceptT) import Control.Monad.Reader import Control.Monad.Trans.Maybe +import Control.Concurrent.STM (flushTQueue) import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.HashMap.Strict (HashMap) @@ -65,6 +66,7 @@ data RefChanWorkerEnv e = , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) + , _refChanWorkerEnvWriteQ :: TQueue HashRef } makeLenses 'RefChanWorkerEnv @@ -77,12 +79,18 @@ refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO <*> newTVarIO mempty <*> newTVarIO mempty + <*> newTQueueIO refChanOnHeadFn :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) + +refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m () +refChanWriteTranFn env href = do + atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href + -- FIXME: leak-when-block-never-really-updated refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m () refChanNotifyOnUpdated env chan = do @@ -144,14 +152,26 @@ refChanWorker env brains = do polls <- async refChanHeadPoll + wtrans <- async refChanWriter + forever do pause @'Seconds 10 debug "I'm refchan worker" - mapM_ waitCatch [hw,downloads,polls] + mapM_ waitCatch [hw,downloads,polls,wtrans] where + refChanWriter = forever do + pause @'Seconds 1 + _ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env) + + trans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env) + + forM_ trans $ \t -> do + debug $ "ABOUT TO WRITE TRANS" <+> pretty t + + refChanHeadPoll = do let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) )