forced Cache cleanup

This commit is contained in:
voidlizard 2024-11-03 06:33:36 +03:00
parent 0661a74788
commit a8a58be27e
6 changed files with 42 additions and 26 deletions

View File

@ -26,7 +26,8 @@ import Data.Config.Suckless.KeyValue (HasConf(..))
import Control.Monad import Control.Monad
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Concurrent.Async import Control.Monad.Trans.Cont
-- import Control.Concurrent.Async
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Writer.CPS qualified as CPS import Control.Monad.Writer.CPS qualified as CPS
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
@ -39,12 +40,14 @@ import GHC.TypeLits
import Lens.Micro.Platform as Lens import Lens.Micro.Platform as Lens
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Control.Concurrent.STM.TVar -- import Control.Concurrent.STM.TVar
import Control.Concurrent.STM -- import Control.Concurrent.STM
import Control.Monad.IO.Unlift import Control.Monad.IO.Unlift
import Data.List qualified as L import Data.List qualified as L
import Data.Monoid qualified as Monoid import Data.Monoid qualified as Monoid
import UnliftIO
import Codec.Serialise (serialise, deserialiseOrFail) import Codec.Serialise (serialise, deserialiseOrFail)
@ -421,32 +424,40 @@ peerEnvCollectProbes PeerEnv {..} = do
where where
calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length) calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length)
liftReadTVar = liftIO . atomically . readTVar liftReadTVar = liftIO . readTVarIO
item k v = CPS.tell [(k, fromIntegral v)] item k v = CPS.tell [(k, fromIntegral v)]
runPeerM :: forall e m . ( MonadIO m runPeerM :: forall e m . ( MonadUnliftIO m
, HasPeer e , HasPeer e
, Ord (Peer e) , Ord (Peer e)
, Pretty (Peer e) , Pretty (Peer e)
, Hashable (Encoded e)
, HasNonces () m , HasNonces () m
) )
=> PeerEnv e => PeerEnv e
-> PeerM e m () -> PeerM e m ()
-> m () -> m ()
runPeerM env f = do runPeerM env@PeerEnv{..} f = flip runContT pure do
let de = view envDeferred env as <- liftIO $ replicateM 16 $ async $ runPipeline _envDeferred
as <- liftIO $ replicateM 16 $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout pause defSweepTimeout
se <- asks (view envSessions)
liftIO $ Cache.purgeExpired se
sweep
void $ runReaderT (fromPeerM f) env liftIO do
void $ liftIO $ stopPipeline de Cache.purgeExpired _envSessions
Cache.purgeExpired _envReqMsgLimit
Cache.purgeExpired _envReqProtoLimit
sweep
void $ ContT $ bracket none $ const $ do
pure ()
lift $ void $ runReaderT (fromPeerM f) env
void $ liftIO $ stopPipeline _envDeferred
liftIO $ mapM_ cancel (as <> [sw]) liftIO $ mapM_ cancel (as <> [sw])
withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a

View File

@ -323,7 +323,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
cookie <- handshake Client env so cookie <- handshake Client env so
let connId = connectionId cookie myCookie let connId = connectionId cookie myCookie
when (cookie == myCookie) $ exit () when (cookie == myCookie) $ do
debug $ "same peer, exit" <+> pretty remoteAddr
exit ()
here <- atomically do here <- atomically do
n <- readTVar _tcpPeerCookie <&> HM.member cookie n <- readTVar _tcpPeerCookie <&> HM.member cookie

View File

@ -942,7 +942,7 @@ newBasicBrains cfg = liftIO do
<*> newTQueueIO <*> newTQueueIO
<*> newTQueueIO <*> newTQueueIO
<*> newTQueueIO <*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (1200:: Timeout 'Seconds))) <*> Cache.newCache (Just (toTimeSpec (600:: Timeout 'Seconds)))
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO (AnyProbe ()) <*> newTVarIO (AnyProbe ())
@ -1041,14 +1041,16 @@ runBasicBrains cfg brains@BasicBrains{..} = do
-- commitNow brains True -- commitNow brains True
void $ forever do void $ forever do
pause @'Seconds 20 pause @'Seconds 30
ee <- liftIO $ Cache.toList expire ee <- liftIO $ Cache.toList expire
let eee = [ h | (h,_,Just{}) <- ee ] let eee = [ h | (h,_,Just{}) <- ee ]
forM_ eee $ \h -> do forM_ eee $ \h -> do
cleanupPostponed brains h cleanupPostponed brains h
liftIO $ Cache.purgeExpired expire liftIO do
liftIO $ Cache.purgeExpired sizes Cache.purgeExpired expire
Cache.purgeExpired sizes
Cache.purgeExpired _brainsRemoved
del <- liftIO $ atomically $ flushTQueue _brainsDelDownload del <- liftIO $ atomically $ flushTQueue _brainsDelDownload
for_ del $ \d -> do for_ del $ \d -> do

View File

@ -1,4 +1,4 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language AllowAmbiguousTypes #-} {-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}

View File

@ -16,7 +16,6 @@ import HBS2.Polling
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Clock import HBS2.Clock
import HBS2.Net.Auth.Schema import HBS2.Net.Auth.Schema
import HBS2.Net.Auth.Credentials
import HBS2.Data.Types.SignedBox import HBS2.Data.Types.SignedBox
import HBS2.Data.Types.Peer import HBS2.Data.Types.Peer
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
@ -25,7 +24,6 @@ import HBS2.Events
import HBS2.Hash import HBS2.Hash
import HBS2.Merkle (AnnMetaData) import HBS2.Merkle (AnnMetaData)
import HBS2.Net.IP.Addr import HBS2.Net.IP.Addr
import HBS2.Net.Proto
import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.Peer
import HBS2.Peer.Proto.BlockInfo import HBS2.Peer.Proto.BlockInfo
import HBS2.Peer.Proto.LWWRef import HBS2.Peer.Proto.LWWRef
@ -44,11 +42,8 @@ import Prelude hiding (log)
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Writer qualified as W import Control.Monad.Writer qualified as W
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet)
import Data.List qualified as L import Data.List qualified as L
import Data.Maybe import Data.Maybe
import Lens.Micro.Platform import Lens.Micro.Platform

View File

@ -364,7 +364,13 @@ refChanWorkerInitNotifiers env = do
debug "Rely notification request" debug "Rely notification request"
request @UNIX (fromString sa) req request @UNIX (fromString sa) req
r <- waitAnyCatchCancel [msg, disp, rely] kill <- ContT $ withAsync $ forever do
pause @'Seconds 30
let RefChanWorkerEnv{..} = env
liftIO $ Cache.purgeExpired _refChanWorkerNotifiersDone
liftIO $ Cache.purgeExpired _refChanWorkerLocalRelyDone
r <- waitAnyCatchCancel [msg, disp, rely, kill]
warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r) warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r)