hbs2/hbs2-core/lib/HBS2/Actors.hs

69 lines
1.9 KiB
Haskell

module HBS2.Actors
( Pipeline
, newPipeline
, runPipeline
, stopPipeline
, addJob
) where
import HBS2.Prelude
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
import Control.Concurrent.STM.TBMQueue (TBMQueue)
import Control.Concurrent.STM.TVar qualified as TVar
import Data.Kind
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
data PipelineExcepion =
PipelineAddJobTimeout
deriving stock (Show,Typeable)
instance Exception PipelineExcepion
data Pipeline m a =
Pipeline
{ stopAdding :: TVar Bool
, toQueue :: TBMQueue ( m a )
}
newPipeline :: forall a (m1 :: Type -> Type) (m :: Type -> Type) . (MonadIO m, MonadIO m1) => Int -> m (Pipeline m1 a)
newPipeline size = do
tv <- liftIO $ TVar.newTVarIO False
liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv
runPipeline :: MonadIO m => Pipeline m a -> m ()
runPipeline pip = fix \next -> do
mbJob <- liftIO $ atomically $ TBMQ.readTBMQueue (toQueue pip)
case mbJob of
Nothing -> pure ()
Just job -> void (liftIO yield >> job) >> next
stopPipeline :: MonadIO m => Pipeline m a -> m ()
stopPipeline pip = liftIO $ do
atomically $ TVar.writeTVar ( stopAdding pip ) True
fix \next -> do
mt <- atomically $ TBMQ.isEmptyTBMQueue ( toQueue pip )
if mt then
atomically $ TBMQ.closeTBMQueue ( toQueue pip )
else do
pause ( 0.01 :: Timeout 'Seconds) >> next
addJob :: forall a m m1 . (MonadIO m, MonadIO m1) => Pipeline m a -> m a -> m1 ()
addJob pip' act' = liftIO $ do
doWrite <- atomically $ TVar.readTVar ( stopAdding pip' )
-- FIXME: exception-timeout-hardcode
race (pause @'Seconds 3) (doAddJob doWrite pip' act') >>= \case
Left{} -> throwIO PipelineAddJobTimeout
_ -> pure ()
where
doAddJob w pip act =
unless w $ do
atomically $ TBMQ.writeTBMQueue (toQueue pip) act