diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 670762a3..e5e9bf43 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -26,7 +26,8 @@ import Data.Config.Suckless.KeyValue (HasConf(..)) import Control.Monad import Control.Monad.Trans.Maybe -import Control.Concurrent.Async +import Control.Monad.Trans.Cont +-- import Control.Concurrent.Async import Control.Monad.Reader import Control.Monad.Writer.CPS qualified as CPS import Data.ByteString.Lazy (ByteString) @@ -39,12 +40,14 @@ import GHC.TypeLits import Lens.Micro.Platform as Lens import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap -import Control.Concurrent.STM.TVar -import Control.Concurrent.STM +-- import Control.Concurrent.STM.TVar +-- import Control.Concurrent.STM import Control.Monad.IO.Unlift import Data.List qualified as L import Data.Monoid qualified as Monoid +import UnliftIO + import Codec.Serialise (serialise, deserialiseOrFail) @@ -421,32 +424,40 @@ peerEnvCollectProbes PeerEnv {..} = do where calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length) - liftReadTVar = liftIO . atomically . readTVar + liftReadTVar = liftIO . readTVarIO item k v = CPS.tell [(k, fromIntegral v)] -runPeerM :: forall e m . ( MonadIO m +runPeerM :: forall e m . ( MonadUnliftIO m , HasPeer e , Ord (Peer e) , Pretty (Peer e) + , Hashable (Encoded e) , HasNonces () m ) => PeerEnv e -> PeerM e m () -> m () -runPeerM env f = do +runPeerM env@PeerEnv{..} f = flip runContT pure do - let de = view envDeferred env - as <- liftIO $ replicateM 16 $ async $ runPipeline de + as <- liftIO $ replicateM 16 $ async $ runPipeline _envDeferred sw <- liftIO $ async $ forever $ withPeerM env $ do - pause defSweepTimeout - se <- asks (view envSessions) - liftIO $ Cache.purgeExpired se - sweep + pause defSweepTimeout - void $ runReaderT (fromPeerM f) env - void $ liftIO $ stopPipeline de + liftIO do + Cache.purgeExpired _envSessions + Cache.purgeExpired _envReqMsgLimit + Cache.purgeExpired _envReqProtoLimit + + sweep + + void $ ContT $ bracket none $ const $ do + pure () + + lift $ void $ runReaderT (fromPeerM f) env + + void $ liftIO $ stopPipeline _envDeferred liftIO $ mapM_ cancel (as <> [sw]) withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 55375e90..39c64ee7 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -323,7 +323,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do cookie <- handshake Client env so let connId = connectionId cookie myCookie - when (cookie == myCookie) $ exit () + when (cookie == myCookie) $ do + debug $ "same peer, exit" <+> pretty remoteAddr + exit () here <- atomically do n <- readTVar _tcpPeerCookie <&> HM.member cookie diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 99e83708..8fa0d4a9 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -942,7 +942,7 @@ newBasicBrains cfg = liftIO do <*> newTQueueIO <*> newTQueueIO <*> newTQueueIO - <*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) + <*> Cache.newCache (Just (toTimeSpec (600:: Timeout 'Seconds))) <*> newTVarIO mempty <*> newTVarIO (AnyProbe ()) @@ -1041,14 +1041,16 @@ runBasicBrains cfg brains@BasicBrains{..} = do -- commitNow brains True void $ forever do - pause @'Seconds 20 + pause @'Seconds 30 ee <- liftIO $ Cache.toList expire let eee = [ h | (h,_,Just{}) <- ee ] forM_ eee $ \h -> do cleanupPostponed brains h - liftIO $ Cache.purgeExpired expire - liftIO $ Cache.purgeExpired sizes + liftIO do + Cache.purgeExpired expire + Cache.purgeExpired sizes + Cache.purgeExpired _brainsRemoved del <- liftIO $ atomically $ flushTQueue _brainsDelDownload for_ del $ \d -> do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index e5b82d20..a4ee79d3 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1,4 +1,4 @@ -{-# OPTIONS_GHC -fno-warn-orphans #-} + {-# Language TemplateHaskell #-} {-# Language AllowAmbiguousTypes #-} {-# Language UndecidableInstances #-} diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 6f6bf2b9..56b22992 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -16,7 +16,6 @@ import HBS2.Polling import HBS2.Actors.Peer import HBS2.Clock import HBS2.Net.Auth.Schema -import HBS2.Net.Auth.Credentials import HBS2.Data.Types.SignedBox import HBS2.Data.Types.Peer import HBS2.Data.Types.Refs @@ -25,7 +24,6 @@ import HBS2.Events import HBS2.Hash import HBS2.Merkle (AnnMetaData) import HBS2.Net.IP.Addr -import HBS2.Net.Proto import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.BlockInfo import HBS2.Peer.Proto.LWWRef @@ -44,11 +42,8 @@ import Prelude hiding (log) import Control.Monad.Reader import Control.Monad.Writer qualified as W import Data.ByteString.Lazy (ByteString) -import Data.Cache (Cache) -import Data.Cache qualified as Cache import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap -import Data.HashSet (HashSet) import Data.List qualified as L import Data.Maybe import Lens.Micro.Platform diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 0169b1c9..bca84594 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -364,7 +364,13 @@ refChanWorkerInitNotifiers env = do debug "Rely notification request" request @UNIX (fromString sa) req - r <- waitAnyCatchCancel [msg, disp, rely] + kill <- ContT $ withAsync $ forever do + pause @'Seconds 30 + let RefChanWorkerEnv{..} = env + liftIO $ Cache.purgeExpired _refChanWorkerNotifiersDone + liftIO $ Cache.purgeExpired _refChanWorkerLocalRelyDone + + r <- waitAnyCatchCancel [msg, disp, rely, kill] warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r)