diff --git a/.fixme/log b/.fixme/log index 16eaf9da..7d674a69 100644 --- a/.fixme/log +++ b/.fixme/log @@ -101,3 +101,5 @@ fixme-del "6DBdanQ5zn" fixme-set "resolution" "done" "F7whmzJkZX" + +fixme-set "workflow" "testing" "4MmfVifgBS" diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 90e46e25..6467429c 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -130,17 +130,18 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) data PeerEnv e = PeerEnv - { _envSelf :: Peer e - , _envPeerNonce :: PeerNonce - , _envFab :: Fabriq e - , _envStorage :: AnyStorage - , _envPeerLocator :: AnyPeerLocator e - , _envDeferred :: Pipeline IO () - , _envSessions :: Cache SKey Dynamic - , _envEvents :: TVar (HashMap SKey [Dynamic]) - , _envExpireTimes :: Cache SKey () - , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) - , _envReqLimit :: Cache (Peer e, Integer, Encoded e) () + { _envSelf :: Peer e + , _envPeerNonce :: PeerNonce + , _envFab :: Fabriq e + , _envStorage :: AnyStorage + , _envPeerLocator :: AnyPeerLocator e + , _envDeferred :: Pipeline IO () + , _envSessions :: Cache SKey Dynamic + , _envEvents :: TVar (HashMap SKey [Dynamic]) + , _envExpireTimes :: Cache SKey () + , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) + , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () + , _envReqProtoLimit :: Cache (Peer e, Integer) () } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -236,27 +237,39 @@ instance ( MonadIO m se <- asks (view envSessions) liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) -class (HasProtocol e p) => HasTimeLimits e p m where - withTimeLimit :: Peer e -> p -> m () -> m () +class HasProtocol e p => HasTimeLimits e p m where + tryLockForPeriod :: Peer e -> p -> m Bool instance {-# OVERLAPPABLE #-} - (Monad m, HasProtocol e p) => HasTimeLimits e p m where - withTimeLimit _ p m = m + (MonadIO (t m), Monad m, MonadTrans t, HasProtocol e p, HasTimeLimits e p m) => HasTimeLimits e p (t m) where + tryLockForPeriod p m = lift (tryLockForPeriod p m) + -- pure True + -- liftIO $ print "LIMIT DOES NOT WORK" + -- pure True instance (MonadIO m, HasProtocol e p, Hashable (Encoded e)) => HasTimeLimits e p (PeerM e m) where - withTimeLimit peer msg m = case requestMinPeriod @e @p of - Nothing -> m + tryLockForPeriod peer msg = case requestPeriodLim @e @p of + NoLimit -> pure True - Just lim -> do + ReqLimPerMessage lim -> do let proto = protoId @e @p (Proxy @p) - ex <- asks (view envReqLimit) + ex <- asks (view envReqMsgLimit) let bin = encode @e msg let key = (peer, proto, bin) here <- liftIO $ Cache.lookup ex key <&> isJust unless here $ do liftIO $ Cache.insert' ex (Just (toTimeSpec lim)) key () - m + pure (not here) + + ReqLimPerProto lim -> do + let proto = protoId @e @p (Proxy @p) + ex <- asks (view envReqProtoLimit) + let key = (peer, proto) + here <- liftIO $ Cache.lookup ex key <&> isJust + unless here $ do + liftIO $ Cache.insert' ex (Just (toTimeSpec lim)) key () + pure (not here) instance ( MonadIO m , HasProtocol e p @@ -275,8 +288,11 @@ instance ( MonadIO m -- -- TODO: where to store the timeout? -- TODO: where the timeout come from? - withTimeLimit @e @p p msg $ do + -- withTimeLimit @e @p p msg $ do -- liftIO $ print "request!" + allowed <- tryLockForPeriod p msg + + when allowed do sendTo pipe (To p) (From me) (AnyMessage @(Encoded e) @e proto (encode msg)) @@ -378,7 +394,8 @@ newPeerEnv s bus p = do <*> liftIO (newTVarIO mempty) <*> liftIO (Cache.newCache (Just defCookieTimeout)) <*> liftIO (newTVarIO mempty) - <*> liftIO (Cache.newCache (Just defCookieTimeout)) + <*> liftIO (Cache.newCache (Just defRequestLimit)) + <*> liftIO (Cache.newCache (Just defRequestLimit)) runPeerM :: forall e m . ( MonadIO m , HasPeer e diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 0b03fbfd..5a0d265b 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -47,6 +47,12 @@ defCookieTimeoutSec = 1200 defCookieTimeout :: TimeSpec defCookieTimeout = toTimeSpec defCookieTimeoutSec +defRequestLimit :: TimeSpec +defRequestLimit = toTimeSpec defRequestLimitSec + +defRequestLimitSec :: Timeout 'Seconds +defRequestLimitSec = 60 + defBlockWipTimeout :: TimeSpec defBlockWipTimeout = toTimeSpec defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 9177a103..bf82e3b2 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -55,7 +55,7 @@ instance HasProtocol UDP (BlockInfo UDP) where -- FIXME: requestMinPeriod-breaks-fast-block-download -- - requestMinPeriod = Just 10 + requestPeriodLim = ReqLimPerMessage 10 instance HasProtocol UDP (BlockChunks UDP) where type instance ProtocolId (BlockChunks UDP) = 2 @@ -78,6 +78,8 @@ instance HasProtocol UDP (PeerHandshake UDP) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise + requestPeriodLim = ReqLimPerProto 10 + instance HasProtocol UDP (PeerAnnounce UDP) where type instance ProtocolId (PeerAnnounce UDP) = 5 type instance Encoded UDP = ByteString diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index a28cfc54..4ffa26e6 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -53,7 +53,12 @@ newtype instance SessionKey e (PeerHandshake e) = type instance SessionData e (PeerHandshake e) = PingNonce --- FIXME: enormous-request-amount-during-handshake +-- FIXME: enormous-request-amount-during-handshake-2 +-- несмотря на то, что проблема решается введением ReqLimPeriod +-- и HasTimeLimits, хорошо бы разобраться, что именно вызывает +-- шквал пингов и в какой момент (Pex? PeerAnnounce?) +-- это не очень правильное поведение, возможно там нужно +-- что-то делать с peerNonce sendPing :: forall e m . ( MonadIO m , Request e (PeerHandshake e) m diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 1c7aaf8f..593a545d 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -73,6 +73,10 @@ class ( MonadIO m class Request e p (m :: Type -> Type) | p -> e where request :: Peer e -> p -> m () +data ReqLimPeriod = NoLimit + | ReqLimPerProto (Timeout 'Seconds) + | ReqLimPerMessage (Timeout 'Seconds) + class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where type family ProtocolId p = (id :: Nat) | id -> p type family Encoded e :: Type @@ -83,8 +87,8 @@ class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where decode :: Encoded e -> Maybe p encode :: p -> Encoded e - requestMinPeriod :: Maybe (Timeout 'Seconds) - requestMinPeriod = Nothing + requestPeriodLim :: ReqLimPeriod + requestPeriodLim = NoLimit -- FIXME: slow and dumb instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e m where diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index bc962805..0baecce9 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -235,6 +235,10 @@ withCredentials :: forall e m a . (HasOwnPeer e m, Monad m) withCredentials pc m = runReaderT (fromCredentials m) pc + +instance (Monad m, HasTimeLimits e p m) => HasTimeLimits e p (CredentialsM e m) where + tryLockForPeriod p m = lift $ tryLockForPeriod p m + instance (HasOwnPeer e m) => HasOwnPeer e (CredentialsM e m) where ownPeer = lift ownPeer diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 133f6b0e..106c5d1f 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -77,6 +77,9 @@ instance Monad m => HasFabriq UDP (RpcM m) where instance Monad m => HasOwnPeer UDP (RpcM m) where ownPeer = asks (view rpcSelf) +instance (Monad m, HasProtocol UDP p) => HasTimeLimits UDP p (RpcM m) where + tryLockForPeriod _ _ = pure True + rpcHandler :: forall e m . ( MonadIO m , Response e (RPC e) m , HasProtocol e (RPC e) diff --git a/hbs2-tests/test/TestUDP.hs b/hbs2-tests/test/TestUDP.hs index 56c42ad6..53793c14 100644 --- a/hbs2-tests/test/TestUDP.hs +++ b/hbs2-tests/test/TestUDP.hs @@ -74,6 +74,8 @@ instance Monad m => HasFabriq UDP (PingPongM m) where instance Monad m => HasOwnPeer UDP (PingPongM m) where ownPeer = asks (view ppSelf) +instance HasTimeLimits UDP (PingPong UDP) IO where + tryLockForPeriod _ _ = pure True main :: IO () main = do