diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index d9a15ca0..cdb3e271 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -115,6 +115,7 @@ test-suite test default-language: Haskell2010 other-modules: TestFakeMessaging + , TestActors -- other-extensions: @@ -123,6 +124,7 @@ test-suite test main-is: Main.hs build-depends: base ^>=4.15.1.0, hbs2-core + , async , bytestring , containers , hashable @@ -132,6 +134,7 @@ test-suite test , random , safe , serialise + , stm , tasty , tasty-hunit , transformers diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index f207d567..56b836b0 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -1,5 +1,56 @@ -module HBS2.Actors where +module HBS2.Actors + ( Pipeline + , newPipeline + , runPipeline + , stopPipeline + , addJob + ) where + +import HBS2.Prelude +import HBS2.Clock + +import Control.Concurrent.STM +import Control.Concurrent.STM.TBMQueue qualified as TBMQ +import Control.Concurrent.STM.TBMQueue (TBMQueue) +import Control.Concurrent.STM.TVar qualified as TVar +import Control.Monad +import Data.Function +import Data.Functor + +data Pipeline m a = + Pipeline + { stopAdding :: TVar Bool + , toQueue :: TBMQueue ( m a ) + } + +newPipeline :: forall a m . MonadIO m => Int -> m (Pipeline m a) +newPipeline size = do + tv <- liftIO $ TVar.newTVarIO False + liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv +runPipeline :: MonadIO m => Pipeline m a -> m () +runPipeline pip = fix \next -> do + mbJob <- liftIO $ atomically $ TBMQ.readTBMQueue (toQueue pip) + + case mbJob of + Nothing -> pure () + Just job -> void job >> next + +stopPipeline :: MonadIO m => Pipeline m a -> m () +stopPipeline pip = liftIO $ do + atomically $ TVar.writeTVar ( stopAdding pip ) True + fix \next -> do + mt <- atomically $ TBMQ.isEmptyTBMQueue ( toQueue pip ) + if mt then + atomically $ TBMQ.closeTBMQueue ( toQueue pip ) + else do + pause ( 0.01 :: Timeout 'Seconds) >> next + +addJob :: forall a m . MonadIO m => Pipeline m a -> m a -> m () +addJob pip act = liftIO $ do + doWrite <- atomically $ TVar.readTVar ( stopAdding pip ) + unless doWrite $ do + atomically $ TBMQ.writeTBMQueue (toQueue pip) act diff --git a/hbs2-core/lib/HBS2/Net/Proto/Actors/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/Actors/BlockInfo.hs index 204327e8..af137939 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Actors/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Actors/BlockInfo.hs @@ -6,9 +6,6 @@ import HBS2.Net.Proto import HBS2.Clock import Data.Function -import Control.Concurrent.STM.TBMQueue (TBMQueue) -import Control.Concurrent.STM.TBMQueue qualified as TBMQ -import Control.Concurrent.STM -- needs: logger -- needs: reader and shit @@ -18,7 +15,7 @@ import Control.Concurrent.STM data BlockInfoActor = BlockInfoActor - { tasks :: TBMQueue (IO ()) + { } @@ -30,8 +27,7 @@ data BlockInfoActor = createBlockInfoActor :: MonadIO m => m BlockInfoActor createBlockInfoActor = do - qtask <- liftIO $ atomically $ TBMQ.newTBMQueue 500 -- FIXME: settings - pure $ BlockInfoActor undefined + pure $ BlockInfoActor runBlockInfoActor :: MonadIO m => BlockInfoActor -> m () runBlockInfoActor _ = diff --git a/hbs2-core/test/Main.hs b/hbs2-core/test/Main.hs index cccb3d05..b8eb0774 100644 --- a/hbs2-core/test/Main.hs +++ b/hbs2-core/test/Main.hs @@ -1,6 +1,7 @@ module Main where import TestFakeMessaging +import TestActors import Test.Tasty import Test.Tasty.HUnit @@ -11,6 +12,7 @@ main = testGroup "root" [ testCase "testFakeMessaging1" testFakeMessaging1 + , testCase "testActorsBasic" testActorsBasic ] diff --git a/hbs2-core/test/TestActors.hs b/hbs2-core/test/TestActors.hs new file mode 100644 index 00000000..ca3bf801 --- /dev/null +++ b/hbs2-core/test/TestActors.hs @@ -0,0 +1,41 @@ +module TestActors where + +import HBS2.Actors +import HBS2.Clock + +import Test.Tasty.HUnit + +import Control.Monad +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue qualified as Q +import Control.Concurrent.STM.TQueue (newTQueueIO) +import Control.Concurrent.Async +import Control.Concurrent + +testActorsBasic :: IO () +testActorsBasic = do + + sink <- newTQueueIO @Int + + pip <- newPipeline 10 + + wpip <- async $ runPipeline pip + + let nums = [1..1000] :: [Int] + + forConcurrently_ nums $ \n -> do + addJob pip do + atomically $ Q.writeTQueue sink n + + pause ( 0.25 :: Timeout 'Seconds ) + + stopPipeline pip + + void $ waitAnyCatchCancel [wpip] + + ll <- atomically $ Q.flushTQueue sink + + assertEqual "alive" 1000 (length ll) + + +