pipeline timeout exceptions

This commit is contained in:
voidlizard 2025-02-18 08:16:44 +03:00
parent 5418e7527b
commit ca05c2a17a
3 changed files with 32 additions and 6 deletions

View File

@ -12,9 +12,16 @@ import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
import Control.Concurrent.STM.TBMQueue (TBMQueue)
import Control.Concurrent.STM.TVar qualified as TVar
import Data.Function
import Data.Kind
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
data PipelineExcepion =
PipelineAddJobTimeout
deriving stock (Show,Typeable)
instance Exception PipelineExcepion
data Pipeline m a =
Pipeline
@ -47,9 +54,15 @@ stopPipeline pip = liftIO $ do
pause ( 0.01 :: Timeout 'Seconds) >> next
addJob :: forall a m m1 . (MonadIO m, MonadIO m1) => Pipeline m a -> m a -> m1 ()
addJob pip act = liftIO $ do
doWrite <- atomically $ TVar.readTVar ( stopAdding pip )
unless doWrite $ do
atomically $ TBMQ.writeTBMQueue (toQueue pip) act
addJob pip' act' = liftIO $ do
doWrite <- atomically $ TVar.readTVar ( stopAdding pip' )
-- FIXME: exception-timeout-hardcode
race (pause @'Seconds 3) (doAddJob doWrite pip' act') >>= \case
Left{} -> throwIO PipelineAddJobTimeout
_ -> pure ()
where
doAddJob w pip act =
unless w $ do
atomically $ TBMQ.writeTBMQueue (toQueue pip) act

View File

@ -10,10 +10,17 @@ import Data.Kind
import Lens.Micro.Platform
import Data.ByteString.Lazy (ByteString)
import Control.Monad.Trans.Maybe
import Control.Exception
import Data.Word
import Codec.Serialise()
data StorageException =
StorageAddTaskTimeout
deriving (Show,Typeable)
instance Exception StorageException
class Pretty (Hash h) => IsKey h where
type family Key h :: Type

View File

@ -161,9 +161,15 @@ simpleStorageInit opts = liftIO $ do
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
-- FIXME: io-operation-pipeline-block
simpleAddTask :: SimpleStorage h -> IO () -> IO ()
simpleAddTask s task = do
atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task
-- FIXME: add-task-timeout-hardcode
reallyAdded <- race (pause @'Seconds 3) do
atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task
case reallyAdded of
Left{} -> throwIO StorageAddTaskTimeout
_ -> pure ()
simpleStorageStop :: SimpleStorage h -> IO ()
simpleStorageStop ss = do