diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 009d4c08..974f2adc 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -5,6 +5,8 @@ {-# LANGUAGE ImplicitParams #-} module HBS2.Net.Proto.RefChan where +-- import HBS2.Actors.Peer.Types +import HBS2.Data.Types.Peer import HBS2.Prelude.Plated import HBS2.Hash import HBS2.Data.Detect @@ -151,6 +153,9 @@ data RefChanHead e = | RefChanGetHead (RefChanId e) deriving stock (Generic) +instance Show (RefChanHead e) where + show _ = "RefChanHead" + instance ForRefChans e => Serialise (RefChanHead e) @@ -221,6 +226,9 @@ data RefChanUpdate e = | Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира deriving stock (Generic) +instance Show (RefChanUpdate e) where + show _ = "RefChanUpdate" + instance ForRefChans e => Serialise (RefChanUpdate e) data RefChanRequest e = @@ -228,6 +236,9 @@ data RefChanRequest e = | RefChanResponse (RefChanId e) HashRef deriving stock (Generic,Typeable) +instance Show (RefChanRequest e) where + show _ = "RefChanRequest" + instance ForRefChans e => Serialise (RefChanRequest e) data instance EventKey e (RefChanRequest e) = @@ -469,85 +480,89 @@ refChanUpdateProto self pc adapter msg = do -- -- рассылаем ли себе? что бы был хоть один accept lift $ refChanUpdateProto True pc adapter accept - Accept chan box -> deferred proto do + Accept chan box -> undefined + -- TODO: fix refchain + -- deferred proto do - debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 +-- debug $ "RefChanUpdate/ACCEPT" <+> pretty h0 - (peerKey, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box +-- (peerKey, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box - let refchanKey = RefChanHeadKey @s chan - h <- MaybeT $ liftIO $ getRef sto refchanKey +-- let refchanKey = RefChanHeadKey @s chan +-- h <- MaybeT $ liftIO $ getRef sto refchanKey - guard (HashRef h == headRef) +-- guard (HashRef h == headRef) - lift $ gossip msg +-- lift $ gossip msg - -- тут может так случиться, что propose еще нет - -- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы - -- почти одновременно. ну или не успело записаться. и что делать? +-- -- тут может так случиться, что propose еще нет +-- -- UDP вообще не гарантирует порядок доставки, а отправляем мы транзы +-- -- почти одновременно. ну или не успело записаться. и что делать? - here <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust +-- here <- liftIO (hasBlock sto (fromHashRef hashRef)) <&> isJust - unless here do - warn $ "No propose transaction saved yet!" <+> pretty hashRef +-- unless here do +-- warn $ "No propose transaction saved yet!" <+> pretty hashRef - tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) +-- tranBs <- MaybeT $ liftIO $ getBlock sto (fromHashRef hashRef) - tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just +-- tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) tranBs & either (const Nothing) Just - headBlock <- MaybeT $ getActualRefChanHead @e refchanKey +-- headBlock <- MaybeT $ getActualRefChanHead @e refchanKey - proposed <- MaybeT $ pure $ case tran of - Propose _ pbox -> Just pbox - _ -> Nothing +-- proposed <- MaybeT $ pure $ case tran of +-- Propose _ pbox -> Just pbox +-- _ -> Nothing - (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed +-- (_, ptran) <- MaybeT $ pure $ unboxSignedBox0 @(ProposeTran e) @e proposed - debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 +-- debug $ "ACCEPT FROM:" <+> pretty (AsBase58 peerKey) <+> pretty h0 - -- compiler bug? - let (ProposeTran _ pbox) = ptran +-- -- compiler bug? +-- let (ProposeTran _ pbox) = ptran - (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox +-- (authorKey, _) <- MaybeT $ pure $ unboxSignedBox0 pbox - -- может, и не надо второй раз проверять - guard $ checkACL headBlock peerKey authorKey +-- -- может, и не надо второй раз проверять +-- guard $ checkACL headBlock peerKey authorKey - debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef +-- debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef - rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id +-- rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id - atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) +-- atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) - -- TODO: garbage-collection-strongly-required - ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) +-- -- TODO: garbage-collection-strongly-required +-- ha <- MaybeT $ liftIO $ putBlock sto (serialise msg) - atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) - -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? +-- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert (HashRef ha)) +-- -- atomically $ modifyTVar (view refChanRoundTrans rcRound) (HashSet.insert hashRef) -- propose just in case we missed it? - accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size +-- accepts <- atomically $ readTVar (view refChanRoundAccepts rcRound) <&> HashMap.size - debug $ "ACCEPTS:" <+> pretty accepts +-- debug $ "ACCEPTS:" <+> pretty accepts - closed <- readTVarIO (view refChanRoundClosed rcRound) +-- closed <- readTVarIO (view refChanRoundClosed rcRound) - -- FIXME: round! - when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do - debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef +-- -- FIXME: round! +-- when (fromIntegral accepts >= view refChanHeadQuorum headBlock && not closed) do +-- debug $ "ROUND!" <+> pretty accepts <+> pretty hashRef - trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList +-- trans <- atomically $ readTVar (view refChanRoundTrans rcRound) <&> HashSet.toList - forM_ trans $ \t -> do - lift $ refChanWriteTran adapter t - debug $ "WRITING TRANS" <+> pretty t +-- forM_ trans $ \t -> do +-- lift $ refChanWriteTran adapter t +-- debug $ "WRITING TRANS" <+> pretty t - let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList - votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys +-- let pips = view refChanHeadPeers headBlock & HashMap.keys & HashSet.fromList +-- votes <- readTVarIO (view refChanRoundAccepts rcRound) <&> HashSet.fromList . HashMap.keys - when (pips `HashSet.isSubsetOf` votes) do - debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans) - atomically $ writeTVar (view refChanRoundClosed rcRound) True +-- when (pips `HashSet.isSubsetOf` votes) do +-- debug $ "CLOSING ROUND" <+> pretty hashRef <+> pretty (length trans) +-- atomically $ writeTVar (view refChanRoundClosed rcRound) True + +-- lift $ refChanUpdateProto True pc adapter msg where proto = Proxy @(RefChanUpdate e) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 0df51ee2..174e7094 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1135,31 +1135,36 @@ runPeer opts = U.handle (\e -> myException e void $ liftIO $ async $ withPeerM penv $ do gossip (RefChanRequest @e puk) - let arpc = RpcAdapter pokeAction - dieAction - dontHandle - dontHandle - annAction - pingAction - dontHandle - fetchAction - peersAction - dontHandle - logLevelAction - reflogUpdateAction - reflogFetchAction - reflogGetAction - dontHandle - refChanHeadSendAction -- rpcOnRefChanHeadSend - refChanHeadGetAction - dontHandle - refChanHeadFetchAction + let arpc = RpcAdapter + { rpcOnPoke = pokeAction + , rpcOnDie = dieAction + , rpcOnPokeAnswer = dontHandle + , rpcOnPokeAnswerFull = dontHandle + , rpcOnAnnounce = annAction + , rpcOnPing = pingAction + , rpcOnPong = dontHandle + , rpcOnFetch = fetchAction + , rpcOnPeers = peersAction + , rpcOnPeersAnswer = dontHandle + , rpcOnPexInfo = pexInfoAction + , rpcOnPexInfoAnswer = dontHandle + , rpcOnLogLevel = logLevelAction + , rpcOnRefLogUpdate = reflogUpdateAction + , rpcOnRefLogFetch = reflogFetchAction + , rpcOnRefLogGet = reflogGetAction + , rpcOnRefLogGetAnsw = dontHandle - refChanFetchAction - refChanGetAction - dontHandle -- rpcOnRefChanGetAnsw + , rpcOnRefChanHeadSend = refChanHeadSendAction + , rpcOnRefChanHeadGet = refChanHeadGetAction + , rpcOnRefChanHeadGetAnsw = dontHandle + , rpcOnRefChanHeadFetch = refChanHeadFetchAction - refChanProposeAction + , rpcOnRefChanFetch = refChanFetchAction + , rpcOnRefChanGet = refChanGetAction + , rpcOnRefChanGetAnsw = dontHandle -- rpcOnRefChanGetAnsw + + , rpcOnRefChanPropose = refChanProposeAction + } rpc <- async $ runRPC udp1 do runProto @e diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index da101920..b0fbc44c 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -289,42 +289,36 @@ withRPC o cmd = rpcClientMain o $ runResourceT do rchangetMVar <- liftIO newEmptyMVar - let adapter = - RpcAdapter dontHandle - dontHandle - (liftIO . atomically . writeTQueue pokeQ) - (liftIO . atomically . writeTQueue pokeFQ) - (const $ liftIO exitSuccess) - (const $ notice "ping?") - (liftIO . atomically . writeTQueue pingQ) - dontHandle - dontHandle + let adapter = RpcAdapter + { rpcOnPoke = dontHandle + , rpcOnDie = dontHandle + , rpcOnPokeAnswer = (liftIO . atomically . writeTQueue pokeQ) + , rpcOnPokeAnswerFull = (liftIO . atomically . writeTQueue pokeFQ) + , rpcOnAnnounce = (const $ liftIO exitSuccess) + , rpcOnPing = (const $ notice "ping?") + , rpcOnPong = (liftIO . atomically . writeTQueue pingQ) + , rpcOnFetch = dontHandle + , rpcOnPeers = dontHandle + , rpcOnPeersAnswer = (\(pa, k) -> Log.info $ pretty (AsBase58 k) <+> pretty pa) + , rpcOnPexInfo = dontHandle + , rpcOnPexInfoAnswer = dontHandle + , rpcOnLogLevel = dontHandle + , rpcOnRefLogUpdate = dontHandle + , rpcOnRefLogFetch = dontHandle + , rpcOnRefLogGet = dontHandle + , rpcOnRefLogGetAnsw = ( liftIO . atomically . writeTQueue refQ ) - (\(pa, k) -> Log.info $ pretty (AsBase58 k) <+> pretty pa - ) + , rpcOnRefChanHeadSend = dontHandle + , rpcOnRefChanHeadGet = dontHandle + , rpcOnRefChanHeadGetAnsw = (liftIO . putMVar rchanheadMVar) + , rpcOnRefChanHeadFetch = dontHandle - dontHandle - dontHandle - dontHandle - dontHandle - - ( liftIO . atomically . writeTQueue refQ ) - - dontHandle - - dontHandle -- rpcOnRefChanHeadGet - - (liftIO . putMVar rchanheadMVar) -- rpcOnRefChanHeadGetAnsw - - dontHandle -- rpcOnRefChanHeadFetch - - dontHandle -- rpcOnRefChanFetch - dontHandle -- rpcOnRefChanGet - - (liftIO . putMVar rchangetMVar) -- rpcOnRefChanHeadGetAnsw - - dontHandle -- rpcOnRefChanPropose + , rpcOnRefChanFetch = dontHandle + , rpcOnRefChanGet = dontHandle + , rpcOnRefChanGetAnsw = (liftIO . putMVar rchangetMVar) + , rpcOnRefChanPropose = dontHandle + } prpc <- async $ runRPC udp1 do env <- ask