wip, renamed some stuff

This commit is contained in:
Dmitry Zuikov 2023-07-18 10:55:46 +03:00
parent 758efdd455
commit 697c79133e
3 changed files with 24 additions and 22 deletions

View File

@ -178,11 +178,12 @@ deriving stock instance ForRefChans L4Proto
instance Expires (SessionKey L4Proto (RefChanHeadBlock L4Proto)) where instance Expires (SessionKey L4Proto (RefChanHeadBlock L4Proto)) where
expiresIn = const (Just defCookieTimeoutSec) expiresIn = const (Just defCookieTimeoutSec)
-- FIXME: rename
data RefChanHeadAdapter e m = data RefChanAdapter e m =
RefChanHeadAdapter RefChanAdapter
{ refChanHeadOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m () { refChanOnHead :: RefChanId e -> RefChanHeadBlockTran e -> m ()
, refChanHeadSubscribed :: RefChanId e -> m Bool , refChanSubscribed :: RefChanId e -> m Bool
, refChanWriteTran :: RefChanId e -> RefChanUpdate e -> m ()
} }
refChanHeadProto :: forall e s m . ( MonadIO m refChanHeadProto :: forall e s m . ( MonadIO m
@ -202,7 +203,7 @@ refChanHeadProto :: forall e s m . ( MonadIO m
, s ~ Encryption e , s ~ Encryption e
) )
=> Bool => Bool
-> RefChanHeadAdapter e m -> RefChanAdapter e m
-> RefChanHead e -> RefChanHead e
-> m () -> m ()
@ -220,13 +221,13 @@ refChanHeadProto self adapter msg = do
case msg of case msg of
RefChanHead chan pkt -> do RefChanHead chan pkt -> do
guard =<< lift (refChanHeadSubscribed adapter chan) guard =<< lift (refChanSubscribed adapter chan)
trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan) trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan)
-- TODO: notify-others-for-new-head -- TODO: notify-others-for-new-head
-- нужно ли уведомить остальных, что голова поменялась? -- нужно ли уведомить остальных, что голова поменялась?
-- всех, от кого мы еще не получали данное сообщение -- всех, от кого мы еще не получали данное сообщение
-- откуда мы знаем, от кого мы получали данное сообщение? -- откуда мы знаем, от кого мы получали данное сообщение?
lift $ refChanHeadOnHead adapter chan pkt lift $ refChanOnHead adapter chan pkt
RefChanGetHead chan -> deferred proto do RefChanGetHead chan -> deferred proto do
trace $ "RefChanGetHead" <+> pretty self <+> pretty (AsBase58 chan) trace $ "RefChanGetHead" <+> pretty self <+> pretty (AsBase58 chan)
@ -264,7 +265,7 @@ refChanUpdateProto :: forall e s m . ( MonadIO m
) )
=> Bool => Bool
-> PeerCredentials s -> PeerCredentials s
-> RefChanHeadAdapter e m -> RefChanAdapter e m
-> RefChanUpdate e -> RefChanUpdate e
-> m () -> m ()
@ -296,7 +297,7 @@ refChanUpdateProto self pc adapter msg = do
case msg of case msg of
Propose chan box -> do Propose chan box -> do
guard =<< lift (refChanHeadSubscribed adapter chan) guard =<< lift (refChanSubscribed adapter chan)
debug "RefChanUpdate/Propose" debug "RefChanUpdate/Propose"
deferred proto do deferred proto do
@ -359,7 +360,7 @@ refChanUpdateProto self pc adapter msg = do
pure () pure ()
Accept chan box -> deferred proto do Accept chan box -> deferred proto do
guard =<< lift (refChanHeadSubscribed adapter chan) guard =<< lift (refChanSubscribed adapter chan)
debug "RefChanUpdate/ACCEPT" debug "RefChanUpdate/ACCEPT"

View File

@ -562,9 +562,10 @@ runPeer opts = U.handle (\e -> myException e
rce <- refChanWorkerEnv conf denv rce <- refChanWorkerEnv conf denv
let refChanHeadAdapter = RefChanHeadAdapter let refChanAdapter = RefChanAdapter
{ refChanHeadOnHead = refChanOnHead rce { refChanOnHead = refChanOnHeadFn rce
, refChanHeadSubscribed = isPolledRef @e brains , refChanSubscribed = isPolledRef @e brains
, refChanWriteTran = \_ _ -> pure ()
} }
let pexFilt pips = do let pexFilt pips = do
@ -890,8 +891,8 @@ runPeer opts = U.handle (\e -> myException e
, makeResponse (refLogUpdateProto reflogAdapter) , makeResponse (refLogUpdateProto reflogAdapter)
, makeResponse (refLogRequestProto reflogReqAdapter) , makeResponse (refLogRequestProto reflogReqAdapter)
, makeResponse (peerMetaProto (mkPeerMeta conf)) , makeResponse (peerMetaProto (mkPeerMeta conf))
, makeResponse (refChanHeadProto False refChanHeadAdapter) , makeResponse (refChanHeadProto False refChanAdapter)
, makeResponse (refChanUpdateProto False pc refChanHeadAdapter) , makeResponse (refChanUpdateProto False pc refChanAdapter)
] ]
void $ liftIO $ waitAnyCancel workers void $ liftIO $ waitAnyCancel workers
@ -993,7 +994,7 @@ runPeer opts = U.handle (\e -> myException e
Right (SignedBox k _ _) -> do Right (SignedBox k _ _) -> do
let msg = RefChanHead k (RefChanHeadBlockTran (HashRef h)) let msg = RefChanHead k (RefChanHeadBlockTran (HashRef h))
refChanNotifyOnUpdated rce k refChanNotifyOnUpdated rce k
runResponseM me $ refChanHeadProto @e True refChanHeadAdapter msg runResponseM me $ refChanHeadProto @e True refChanAdapter msg
let refChanHeadGetAction puk = do let refChanHeadGetAction puk = do
trace $ "refChanHeadGetAction" <+> pretty (AsBase58 puk) trace $ "refChanHeadGetAction" <+> pretty (AsBase58 puk)
@ -1022,7 +1023,7 @@ runPeer opts = U.handle (\e -> myException e
-- FIXME: remove-this-debug-stuff -- FIXME: remove-this-debug-stuff
-- или оставить? нода будет сама себе -- или оставить? нода будет сама себе
-- консенсус слать тогда. может, и оставить -- консенсус слать тогда. может, и оставить
lift $ runResponseM me $ refChanUpdateProto @e True pc refChanHeadAdapter (Propose @e puk proposed) lift $ runResponseM me $ refChanUpdateProto @e True pc refChanAdapter (Propose @e puk proposed)
let arpc = RpcAdapter pokeAction let arpc = RpcAdapter pokeAction
dieAction dieAction

View File

@ -4,7 +4,7 @@ module RefChan (
RefChanWorkerEnv(..) RefChanWorkerEnv(..)
, refChanWorkerEnvHeadQ , refChanWorkerEnvHeadQ
, refChanWorkerEnvDownload , refChanWorkerEnvDownload
, refChanOnHead , refChanOnHeadFn
, refChanWorker , refChanWorker
, refChanWorkerEnv , refChanWorkerEnv
, refChanNotifyOnUpdated , refChanNotifyOnUpdated
@ -79,8 +79,8 @@ refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO
<*> newTVarIO mempty <*> newTVarIO mempty
refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn :: MonadIO m => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHead env chan tran = do refChanOnHeadFn env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran) atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
-- FIXME: leak-when-block-never-really-updated -- FIXME: leak-when-block-never-really-updated
@ -171,7 +171,7 @@ refChanWorker env brains = do
rest <- forM all $ \(r,item@(chan,t)) -> do rest <- forM all $ \(r,item@(chan,t)) -> do
here <- checkDownloaded r here <- checkDownloaded r
if here then do if here then do
refChanOnHead env chan (RefChanHeadBlockTran r) refChanOnHeadFn env chan (RefChanHeadBlockTran r)
pure mempty pure mempty
else do else do
-- FIXME: fix-timeout-hardcode -- FIXME: fix-timeout-hardcode