This commit is contained in:
Dmitry Zuikov 2023-07-18 11:33:03 +03:00
parent 697c79133e
commit 609551b43e
3 changed files with 37 additions and 6 deletions

View File

@ -183,7 +183,7 @@ data RefChanAdapter e m =
RefChanAdapter RefChanAdapter
{ refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () { refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()
, refChanSubscribed :: RefChanId e -> m Bool , refChanSubscribed :: RefChanId e -> m Bool
, refChanWriteTran :: RefChanId e -> RefChanUpdate e -> m () , refChanWriteTran :: HashRef -> m ()
} }
refChanHeadProto :: forall e s m . ( MonadIO m refChanHeadProto :: forall e s m . ( MonadIO m
@ -362,7 +362,9 @@ refChanUpdateProto self pc adapter msg = do
Accept chan box -> deferred proto do Accept chan box -> deferred proto do
guard =<< lift (refChanSubscribed adapter chan) guard =<< lift (refChanSubscribed adapter chan)
debug "RefChanUpdate/ACCEPT" let h0 = hashObject @HbSync (serialise msg)
debug $ "RefChanUpdate/ACCEPT" <+> pretty h0
(peerKey, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box (peerKey, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
@ -373,6 +375,8 @@ refChanUpdateProto self pc adapter msg = do
lift $ gossip msg lift $ gossip msg
lift $ refChanUpdateProto True pc adapter msg
tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef)
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just
@ -423,7 +427,14 @@ refChanUpdateProto self pc adapter msg = do
trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList
forM_ trans $ \t -> do forM_ trans $ \t -> do
debug $ "ABOUT TO STORE TRAN:" <+> pretty t lift $ refChanWriteTran adapter t
let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList
votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys
when (pips `HashSet.isSubsetOf` votes) do
debug $ "CLOSING ROUND" <+> pretty hashRef
pure () pure ()
-- TODO: expire-round-if-all-confirmations -- TODO: expire-round-if-all-confirmations

View File

@ -565,7 +565,7 @@ runPeer opts = U.handle (\e -> myException e
let refChanAdapter = RefChanAdapter let refChanAdapter = RefChanAdapter
{ refChanOnHead = refChanOnHeadFn rce { refChanOnHead = refChanOnHeadFn rce
, refChanSubscribed = isPolledRef @e brains , refChanSubscribed = isPolledRef @e brains
, refChanWriteTran = \_ _ -> pure () , refChanWriteTran = refChanWriteTranFn rce
} }
let pexFilt pips = do let pexFilt pips = do

View File

@ -5,6 +5,7 @@ module RefChan (
, refChanWorkerEnvHeadQ , refChanWorkerEnvHeadQ
, refChanWorkerEnvDownload , refChanWorkerEnvDownload
, refChanOnHeadFn , refChanOnHeadFn
, refChanWriteTranFn
, refChanWorker , refChanWorker
, refChanWorkerEnv , refChanWorkerEnv
, refChanNotifyOnUpdated , refChanNotifyOnUpdated
@ -22,7 +23,6 @@ import HBS2.Data.Types.Refs
import HBS2.Net.Auth.Credentials import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.Types
import HBS2.Net.Proto.Definition() import HBS2.Net.Proto.Definition()
import HBS2.Storage import HBS2.Storage
@ -37,6 +37,7 @@ import Control.Exception ()
import Control.Monad.Except (throwError, runExceptT) import Control.Monad.Except (throwError, runExceptT)
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Concurrent.STM (flushTQueue)
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
@ -65,6 +66,7 @@ data RefChanWorkerEnv e =
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e) , _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec)) , _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, TimeSpec))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ()) , _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
, _refChanWorkerEnvWriteQ :: TQueue HashRef
} }
makeLenses 'RefChanWorkerEnv makeLenses 'RefChanWorkerEnv
@ -77,12 +79,18 @@ refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTQueueIO
refChanOnHeadFn :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHeadFn env chan tran = do refChanOnHeadFn env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m ()
refChanWriteTranFn env href = do
atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href
-- FIXME: leak-when-block-never-really-updated -- FIXME: leak-when-block-never-really-updated
refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m () refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m ()
refChanNotifyOnUpdated env chan = do refChanNotifyOnUpdated env chan = do
@ -144,14 +152,26 @@ refChanWorker env brains = do
polls <- async refChanHeadPoll polls <- async refChanHeadPoll
wtrans <- async refChanWriter
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
debug "I'm refchan worker" debug "I'm refchan worker"
mapM_ waitCatch [hw,downloads,polls] mapM_ waitCatch [hw,downloads,polls,wtrans]
where where
refChanWriter = forever do
pause @'Seconds 1
_ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env)
trans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env)
forM_ trans $ \t -> do
debug $ "ABOUT TO WRITE TRANS" <+> pretty t
refChanHeadPoll = do refChanHeadPoll = do
let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) ) let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) )