This commit is contained in:
Dmitry Zuikov 2023-01-13 12:09:32 +03:00
parent 2da7ea122a
commit 952bb05d6e
5 changed files with 100 additions and 7 deletions

View File

@ -115,6 +115,7 @@ test-suite test
default-language: Haskell2010 default-language: Haskell2010
other-modules: TestFakeMessaging other-modules: TestFakeMessaging
, TestActors
-- other-extensions: -- other-extensions:
@ -123,6 +124,7 @@ test-suite test
main-is: Main.hs main-is: Main.hs
build-depends: build-depends:
base ^>=4.15.1.0, hbs2-core base ^>=4.15.1.0, hbs2-core
, async
, bytestring , bytestring
, containers , containers
, hashable , hashable
@ -132,6 +134,7 @@ test-suite test
, random , random
, safe , safe
, serialise , serialise
, stm
, tasty , tasty
, tasty-hunit , tasty-hunit
, transformers , transformers

View File

@ -1,5 +1,56 @@
module HBS2.Actors where module HBS2.Actors
( Pipeline
, newPipeline
, runPipeline
, stopPipeline
, addJob
) where
import HBS2.Prelude
import HBS2.Clock
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
import Control.Concurrent.STM.TBMQueue (TBMQueue)
import Control.Concurrent.STM.TVar qualified as TVar
import Control.Monad
import Data.Function
import Data.Functor
data Pipeline m a =
Pipeline
{ stopAdding :: TVar Bool
, toQueue :: TBMQueue ( m a )
}
newPipeline :: forall a m . MonadIO m => Int -> m (Pipeline m a)
newPipeline size = do
tv <- liftIO $ TVar.newTVarIO False
liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv
runPipeline :: MonadIO m => Pipeline m a -> m ()
runPipeline pip = fix \next -> do
mbJob <- liftIO $ atomically $ TBMQ.readTBMQueue (toQueue pip)
case mbJob of
Nothing -> pure ()
Just job -> void job >> next
stopPipeline :: MonadIO m => Pipeline m a -> m ()
stopPipeline pip = liftIO $ do
atomically $ TVar.writeTVar ( stopAdding pip ) True
fix \next -> do
mt <- atomically $ TBMQ.isEmptyTBMQueue ( toQueue pip )
if mt then
atomically $ TBMQ.closeTBMQueue ( toQueue pip )
else do
pause ( 0.01 :: Timeout 'Seconds) >> next
addJob :: forall a m . MonadIO m => Pipeline m a -> m a -> m ()
addJob pip act = liftIO $ do
doWrite <- atomically $ TVar.readTVar ( stopAdding pip )
unless doWrite $ do
atomically $ TBMQ.writeTBMQueue (toQueue pip) act

View File

@ -6,9 +6,6 @@ import HBS2.Net.Proto
import HBS2.Clock import HBS2.Clock
import Data.Function import Data.Function
import Control.Concurrent.STM.TBMQueue (TBMQueue)
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
import Control.Concurrent.STM
-- needs: logger -- needs: logger
-- needs: reader and shit -- needs: reader and shit
@ -18,7 +15,7 @@ import Control.Concurrent.STM
data BlockInfoActor = data BlockInfoActor =
BlockInfoActor BlockInfoActor
{ tasks :: TBMQueue (IO ()) {
} }
@ -30,8 +27,7 @@ data BlockInfoActor =
createBlockInfoActor :: MonadIO m => m BlockInfoActor createBlockInfoActor :: MonadIO m => m BlockInfoActor
createBlockInfoActor = do createBlockInfoActor = do
qtask <- liftIO $ atomically $ TBMQ.newTBMQueue 500 -- FIXME: settings pure $ BlockInfoActor
pure $ BlockInfoActor undefined
runBlockInfoActor :: MonadIO m => BlockInfoActor -> m () runBlockInfoActor :: MonadIO m => BlockInfoActor -> m ()
runBlockInfoActor _ = runBlockInfoActor _ =

View File

@ -1,6 +1,7 @@
module Main where module Main where
import TestFakeMessaging import TestFakeMessaging
import TestActors
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
@ -11,6 +12,7 @@ main =
testGroup "root" testGroup "root"
[ [
testCase "testFakeMessaging1" testFakeMessaging1 testCase "testFakeMessaging1" testFakeMessaging1
, testCase "testActorsBasic" testActorsBasic
] ]

View File

@ -0,0 +1,41 @@
module TestActors where
import HBS2.Actors
import HBS2.Clock
import Test.Tasty.HUnit
import Control.Monad
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue qualified as Q
import Control.Concurrent.STM.TQueue (newTQueueIO)
import Control.Concurrent.Async
import Control.Concurrent
testActorsBasic :: IO ()
testActorsBasic = do
sink <- newTQueueIO @Int
pip <- newPipeline 10
wpip <- async $ runPipeline pip
let nums = [1..1000] :: [Int]
forConcurrently_ nums $ \n -> do
addJob pip do
atomically $ Q.writeTQueue sink n
pause ( 0.25 :: Timeout 'Seconds )
stopPipeline pip
void $ waitAnyCatchCancel [wpip]
ll <- atomically $ Q.flushTQueue sink
assertEqual "alive" 1000 (length ll)