diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index e5e9bf43..5b65cb6d 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -40,13 +40,11 @@ import GHC.TypeLits import Lens.Micro.Platform as Lens import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap --- import Control.Concurrent.STM.TVar --- import Control.Concurrent.STM import Control.Monad.IO.Unlift import Data.List qualified as L import Data.Monoid qualified as Monoid - import UnliftIO +import UnliftIO.Concurrent (getNumCapabilities) import Codec.Serialise (serialise, deserialiseOrFail) @@ -440,7 +438,9 @@ runPeerM :: forall e m . ( MonadUnliftIO m runPeerM env@PeerEnv{..} f = flip runContT pure do - as <- liftIO $ replicateM 16 $ async $ runPipeline _envDeferred + n <- liftIO getNumCapabilities <&> max 2 . div 2 + + as <- liftIO $ replicateM n $ async $ runPipeline _envDeferred sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout @@ -453,12 +453,12 @@ runPeerM env@PeerEnv{..} f = flip runContT pure do sweep void $ ContT $ bracket none $ const $ do + void $ liftIO $ stopPipeline _envDeferred + liftIO $ mapM_ cancel (as <> [sw]) pure () lift $ void $ runReaderT (fromPeerM f) env - void $ liftIO $ stopPipeline _envDeferred - liftIO $ mapM_ cancel (as <> [sw]) withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a withPeerM env action = runReaderT (fromPeerM action) env diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index d8fd7335..67fc06b3 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -645,7 +645,7 @@ downloadDispatcher brains env = flip runContT pure do Just s -> pure s Nothing -> do -- TODO: semaphore-hardcode - new <- TSem.newTSem 2 + new <- TSem.newTSem 10 modifyTVar _psem (HM.insert nonce new) pure new diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 995b3ecf..259c3116 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -128,6 +128,7 @@ import UnliftIO (MonadUnliftIO(..)) import UnliftIO.Exception qualified as U -- import UnliftIO.STM import UnliftIO.Async +import UnliftIO.Concurrent (getNumCapabilities) import Streaming.Prelude qualified as S @@ -812,7 +813,9 @@ runPeer opts = respawnOnError opts $ do simpleStorageSetProbe s stoProbe addProbe stoProbe - w <- replicateM defStorageThreads $ async $ liftIO $ simpleStorageWorker s + stn <- getNumCapabilities <&> max 2 . div 4 + + w <- replicateM 2 $ async $ liftIO $ simpleStorageWorker s localMulticast <- liftIO $ (headMay <$> parseAddrUDP (fromString defLocalMulticast) <&> fmap (fromSockAddr @'UDP . addrAddress) ) diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index f40620f3..1081fae1 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -76,7 +76,6 @@ import UnliftIO.Concurrent import Lens.Micro.Platform import Streaming.Prelude qualified as S - instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcRunScript where handleMethod top = do @@ -94,6 +93,10 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => entry $ bindMatch "hey" $ const do pure $ mkSym @C "hey" + entry $ bindMatch "system:capabilities" $ const $ do + n <- getNumCapabilities + pure $ mkForm "capabilities" [mkInt n] + entry $ bindMatch "tcp:peer:kick" $ \case [ StringLike addr ] -> flip runContT pure $ callCC \exit -> do