diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 5a7d79ad..343d9e4d 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -13,7 +13,7 @@ import HBS2.Net.Proto import HBS2.Net.Auth.Credentials import HBS2.Base58 import HBS2.Defaults --- import HBS2.Events +import HBS2.Events import HBS2.Net.Proto.Peer import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.Sessions @@ -39,6 +39,7 @@ import Data.Maybe import Data.Text qualified as Text import Lens.Micro.Platform import Data.Hashable hiding (Hashed) +import Type.Reflection (someTypeRep) import UnliftIO @@ -131,6 +132,7 @@ instance ForRefChans e => Serialise (AcceptTran e) data RefChanRound e = RefChanRound { _refChanRoundKey :: HashRef -- ^ hash of the Propose transaction + , _refChanHeadKey :: RefChanHeadKey (Encryption e) , _refChanRoundTS :: TimeSpec , _refChanRoundClosed :: TVar Bool , _refChanRoundPropose :: TVar (Maybe (ProposeTran e)) -- ^ propose transaction itself @@ -144,6 +146,7 @@ makeLenses 'RefChanRound newtype instance SessionKey e (RefChanRound e) = RefChanRoundKey HashRef deriving stock (Generic, Eq, Typeable) + deriving newtype (Pretty) deriving newtype instance Hashable (SessionKey e (RefChanRound e)) @@ -152,6 +155,26 @@ type instance SessionData e (RefChanRound e) = RefChanRound e instance Expires (SessionKey e (RefChanRound e)) where expiresIn _ = Just 300 +data instance EventKey e (RefChanRound e) = + RefChanRoundEventKey + deriving (Generic,Typeable,Eq) + +newtype instance Event e (RefChanRound e) = + RefChanRoundEvent (SessionKey e (RefChanRound e)) + deriving (Typeable,Generic) + deriving newtype (Pretty) + +instance Typeable (RefChanRound e) => Hashable (EventKey e (RefChanRound e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(RefChanRound e) + +instance EventType ( Event e (RefChanRound e) ) where + isPersistent = True + +instance Expires (EventKey e (RefChanRound e)) where + expiresIn = const Nothing + -- TODO: find-out-sure-transaction-size -- транзакция должна быть маленькая! -- хочешь что-то большое просунуть -- шли хэши. @@ -198,7 +221,6 @@ refChanHeadProto :: forall e s m . ( MonadIO m , Pretty (Peer e) , Sessions e (KnownPeer e) m , HasStorage m - -- , HasGossip (RefChanHead e) e m , Signatures s , IsRefPubKey s , Pretty (AsBase58 (PubKey 'Sign s)) @@ -256,6 +278,7 @@ refChanUpdateProto :: forall e s m . ( MonadIO m , Sessions e (KnownPeer e) m , Sessions e (RefChanHeadBlock e) m , Sessions e (RefChanRound e) m + , EventEmitter e (RefChanRound e) m , HasStorage m , HasGossip e (RefChanUpdate e) m , Signatures s @@ -339,13 +362,16 @@ refChanUpdateProto self pc adapter msg = do hash <- MaybeT $ liftIO $ putBlock sto (serialise msg) ts <- liftIO getTimeCoarse - defRound <- RefChanRound @e (HashRef hash) ts + defRound <- RefChanRound @e (HashRef hash) refchanKey ts <$> newTVarIO False <*> newTVarIO Nothing <*> newTVarIO (HashSet.singleton (HashRef hash)) -- save propose <*> newTVarIO (HashMap.singleton peerKey ()) - void $ lift $ update defRound (RefChanRoundKey (HashRef hash)) id + let rcrk = RefChanRoundKey (HashRef hash) + void $ lift $ update defRound rcrk id + + lift $ emit @e RefChanRoundEventKey (RefChanRoundEvent rcrk) lift $ gossip msg @@ -425,17 +451,9 @@ refChanUpdateProto self pc adapter msg = do -- может, и не надо второй раз проверять guard $ checkACL headBlock peerKey authorKey - ts <- liftIO getTimeCoarse - - defRound <- RefChanRound @e hashRef ts - <$> newTVarIO False - <*> newTVarIO Nothing - <*> newTVarIO (HashSet.singleton hashRef) -- save propose - <*> newTVarIO (HashMap.singleton peerKey ()) - debug $ "JUST GOT TRANSACTION FROM STORAGE! ABOUT TO CHECK IT" <+> pretty hashRef - rcRound <- lift $ fetch True defRound (RefChanRoundKey hashRef) id + rcRound <- MaybeT $ find (RefChanRoundKey @e hashRef) id atomically $ modifyTVar (view refChanRoundAccepts rcRound) (HashMap.insert peerKey ()) @@ -466,12 +484,6 @@ refChanUpdateProto self pc adapter msg = do debug $ "CLOSING ROUND" <+> pretty hashRef atomically $ writeTVar (view refChanRoundClosed rcRound) True - -- TODO: expire-round-if-all-confirmations - -- если получили accept от всех пиров - -- закрываем раунд досрочно. - -- иначе ждём wait -- нам нужен процесс для этого - -- куда его деть-то? - where proto = Proxy @(RefChanUpdate e) diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 776b2b0f..34b73f45 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -16,6 +16,7 @@ import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock +import HBS2.Events import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Sessions import HBS2.Data.Detect @@ -135,6 +136,7 @@ refChanWorker :: forall e s m . ( MonadIO m , IsRefPubKey s , Pretty (AsBase58 (PubKey 'Sign s)) , ForRefChans e + , EventListener e (RefChanRound e) m , m ~ PeerM e IO ) => RefChanWorkerEnv e @@ -145,6 +147,10 @@ refChanWorker env brains = do penv <- ask + subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do + debug $ "ON ROUND STARTED" <+> pretty rcrk + pure () + -- FIXME: resume-on-exception hw <- async (refChanHeadMon penv) @@ -154,14 +160,20 @@ refChanWorker env brains = do wtrans <- async refChanWriter + cleanup1 <- async cleanupRounds + forever do pause @'Seconds 10 debug "I'm refchan worker" - mapM_ waitCatch [hw,downloads,polls,wtrans] + mapM_ waitCatch [hw,downloads,polls,wtrans,cleanup1] where + cleanupRounds = forever do + pause @'Seconds 20 + pure () + refChanWriter = forever do pause @'Seconds 1 _ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env) @@ -171,7 +183,6 @@ refChanWorker env brains = do forM_ trans $ \t -> do debug $ "ABOUT TO WRITE TRANS" <+> pretty t - refChanHeadPoll = do let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) )