From ca05c2a17a7bd61921b99c43cc47b0b5ee955203 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 18 Feb 2025 08:16:44 +0300 Subject: [PATCH] pipeline timeout exceptions --- hbs2-core/lib/HBS2/Actors.hs | 23 +++++++++++++++---- hbs2-core/lib/HBS2/Storage.hs | 7 ++++++ .../lib/HBS2/Storage/Simple.hs | 8 ++++++- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index 2f123711..322ecf8d 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -12,9 +12,16 @@ import Control.Concurrent.STM import Control.Concurrent.STM.TBMQueue qualified as TBMQ import Control.Concurrent.STM.TBMQueue (TBMQueue) import Control.Concurrent.STM.TVar qualified as TVar -import Data.Function import Data.Kind import Control.Concurrent +import Control.Concurrent.Async +import Control.Exception + +data PipelineExcepion = + PipelineAddJobTimeout + deriving stock (Show,Typeable) + +instance Exception PipelineExcepion data Pipeline m a = Pipeline @@ -47,9 +54,15 @@ stopPipeline pip = liftIO $ do pause ( 0.01 :: Timeout 'Seconds) >> next addJob :: forall a m m1 . (MonadIO m, MonadIO m1) => Pipeline m a -> m a -> m1 () -addJob pip act = liftIO $ do - doWrite <- atomically $ TVar.readTVar ( stopAdding pip ) - unless doWrite $ do - atomically $ TBMQ.writeTBMQueue (toQueue pip) act +addJob pip' act' = liftIO $ do + doWrite <- atomically $ TVar.readTVar ( stopAdding pip' ) + -- FIXME: exception-timeout-hardcode + race (pause @'Seconds 3) (doAddJob doWrite pip' act') >>= \case + Left{} -> throwIO PipelineAddJobTimeout + _ -> pure () + where + doAddJob w pip act = + unless w $ do + atomically $ TBMQ.writeTBMQueue (toQueue pip) act diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index d684e0a2..048d665f 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -10,10 +10,17 @@ import Data.Kind import Lens.Micro.Platform import Data.ByteString.Lazy (ByteString) import Control.Monad.Trans.Maybe +import Control.Exception import Data.Word import Codec.Serialise() +data StorageException = + StorageAddTaskTimeout + deriving (Show,Typeable) + +instance Exception StorageException + class Pretty (Hash h) => IsKey h where type family Key h :: Type diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index b862ab84..6ecb7585 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -161,9 +161,15 @@ simpleStorageInit opts = liftIO $ do catchAny :: IO a -> (SomeException -> IO a) -> IO a catchAny = Control.Exception.catch +-- FIXME: io-operation-pipeline-block simpleAddTask :: SimpleStorage h -> IO () -> IO () simpleAddTask s task = do - atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task + -- FIXME: add-task-timeout-hardcode + reallyAdded <- race (pause @'Seconds 3) do + atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task + case reallyAdded of + Left{} -> throwIO StorageAddTaskTimeout + _ -> pure () simpleStorageStop :: SimpleStorage h -> IO () simpleStorageStop ss = do