From ca29851f6d3a1566807f6fa5c6d4262a85fdf4bc Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 11 Jan 2023 10:15:04 +0300 Subject: [PATCH] done in first approach --- hbs2-storage-simple/hbs2-storage-simple.cabal | 1 + .../lib/HBS2/Storage/Simple.hs | 62 ++++++++++++------- hbs2-storage-simple/test/TestSimpleStorage.hs | 55 +++++++++++++++- 3 files changed, 96 insertions(+), 22 deletions(-) diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index c54257ef..8ac51c46 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -71,6 +71,7 @@ library , mtl , prettyprinter , stm + , stm-chans , transformers , uniplate diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 03e5deb7..8004753f 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -3,7 +3,7 @@ module HBS2.Storage.Simple where import Control.Concurrent import Control.Concurrent.Async -import Control.Exception (try,tryJust) +import Control.Exception import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class @@ -26,9 +26,13 @@ import System.IO.Error import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue qualified as TBQ import Control.Concurrent.STM.TBQueue (TBQueue) +import Control.Concurrent.STM.TBMQueue qualified as TBMQ +import Control.Concurrent.STM.TBMQueue (TBMQueue) import Control.Concurrent.STM.TVar (TVar) import Control.Concurrent.STM.TVar qualified as TV +import Debug.Trace + import HBS2.Clock import HBS2.Hash import HBS2.Prelude @@ -62,7 +66,7 @@ newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int } data SimpleStorage a = SimpleStorage { _storageDir :: FilePath - , _storageOpQ :: TBQueue ( IO () ) + , _storageOpQ :: TBMQueue ( IO () ) , _storageChunksCache :: Cache (FilePath, Offset, Size) ByteString } @@ -81,7 +85,7 @@ simpleStorageInit opts = liftIO $ do pdir <- canonicalizePath (fromPrefix prefix) - tbq <- TBQ.newTBQueueIO (fromIntegral (fromQueueSize qSize)) + tbq <- TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize)) hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 1)) -- FIXME: real setting @@ -102,11 +106,22 @@ simpleStorageInit opts = liftIO $ do pure stor +catchAny :: IO a -> (SomeException -> IO a) -> IO a +catchAny = Control.Exception.catch + +simpleAddTask :: SimpleStorage h -> IO () -> IO () +simpleAddTask s task = do + atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task + simpleStorageWorker :: SimpleStorage h -> IO () simpleStorageWorker ss = do ops <- async $ forever $ do - join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ ) + + s <- atomically $ do TBMQ.readTBMQueue ( ss ^. storageOpQ ) + case s of + Nothing -> pure () + Just a -> a killer <- async $ forever $ do pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting @@ -147,6 +162,8 @@ simpleBlockFileName ss h = path (pref,suf) = splitAt 1 (show (pretty h)) path = view storageBlocks ss pref suf + + -- NOTE: reads a whole file into memory! -- if file size is too big --- it will -- cause consequences! @@ -162,25 +179,24 @@ simpleGetBlockLazy :: SimpleStorage h -> IO (Maybe LBS.ByteString) simpleGetBlockLazy s key = do - resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) + resQ <- TBMQ.newTBMQueueIO 1 :: IO (TBMQueue (Maybe LBS.ByteString)) let fn = simpleBlockFileName s key let action = do r <- tryJust (guard . isDoesNotExistError) (BS.readFile fn <&> LBS.fromStrict) - -- error "FUCK!" - result <- case r of Right bytes -> pure (Just bytes) Left _ -> pure Nothing - void $ atomically $ TBQ.writeTBQueue resQ result + void $ atomically $ TBMQ.writeTBMQueue resQ result + let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing - void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action + simpleAddTask s (catch action onFail) - atomically $ TBQ.readTBQueue resQ + atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure simpleGetChunkLazy :: SimpleStorage h -> Key (Raw LBS.ByteString) @@ -189,7 +205,7 @@ simpleGetChunkLazy :: SimpleStorage h -> IO (Maybe LBS.ByteString) simpleGetChunkLazy s key off size = do - resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) + resQ <- TBMQ.newTBMQueueIO 1 :: IO (TBMQueue (Maybe LBS.ByteString)) let action = do let fn = simpleBlockFileName s key @@ -197,7 +213,7 @@ simpleGetChunkLazy s key off size = do case cached of Just chunk -> do - void $ atomically $ TBQ.writeTBQueue resQ (Just chunk) + void $ atomically $ TBMQ.writeTBMQueue resQ (Just chunk) Nothing -> do r <- tryJust (guard . isDoesNotExistError) @@ -229,11 +245,13 @@ simpleGetChunkLazy s key off size = do Right bytes -> pure (Just bytes) Left _ -> pure Nothing - void $ atomically $ TBQ.writeTBQueue resQ result + void $ atomically $ TBMQ.writeTBMQueue resQ result - void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action + let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing - atomically $ TBQ.readTBQueue resQ + simpleAddTask s (catch action onFail ) + + atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure simplePutBlockLazy :: SimpleStorage h -> LBS.ByteString @@ -244,17 +262,19 @@ simplePutBlockLazy s lbs = do let hash = hashObject lbs :: Key (Raw LBS.ByteString) let fn = simpleBlockFileName s hash - waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue ()) + waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue Bool) let action = do - LBS.writeFile fn lbs - atomically $ TBQ.writeTBQueue waits () + catch (LBS.writeFile fn lbs) + (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False) - atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action + atomically $ TBQ.writeTBQueue waits True - void $ atomically $ TBQ.readTBQueue waits + simpleAddTask s action - pure (Just hash) + ok <- atomically $ TBQ.readTBQueue waits + + pure $! if ok then Just hash else Nothing simpleBlockExists :: SimpleStorage h diff --git a/hbs2-storage-simple/test/TestSimpleStorage.hs b/hbs2-storage-simple/test/TestSimpleStorage.hs index 8a9b8581..83c1f91d 100644 --- a/hbs2-storage-simple/test/TestSimpleStorage.hs +++ b/hbs2-storage-simple/test/TestSimpleStorage.hs @@ -1,5 +1,6 @@ module TestSimpleStorage where +import Control.Monad.Except import Control.Monad import Data.Traversable import Data.Foldable @@ -14,6 +15,7 @@ import System.Directory import System.FilePath.Posix import System.IO.Temp import Test.QuickCheck +import System.TimeIt import Test.Tasty.HUnit @@ -24,9 +26,58 @@ import HBS2.Storage import HBS2.Storage.Simple +-- CASE: +-- Current result: +-- *** Exception: thread blocked indefinitely in an STM transaction +-- +-- Expected result: survives this situation with honor testSimpleStorageErrors :: IO () testSimpleStorageErrors = do - undefined + + withSystemTempDirectory "simpleStorageTest" $ \dir -> do + + let opts = [ StoragePrefix (dir ".storage") + ] + + storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) + + r <- runExceptT $ liftIO $ do + + worker <- async (simpleStorageWorker storage) + + let blocks = storage ^. storageBlocks + + p <- getPermissions blocks + + setPermissions blocks (p { readable = False + , searchable = False + , writable = False + }) + + let str = "AAAAA" :: LBS.ByteString + let strKey = hashObject @HbSync str + + key <- putBlock storage str + + assertBool "nothing written" (isNothing key) + + here <- hasBlock storage strKey + + assertBool "nothing written, again" (not here) + + val <- getBlock storage strKey + + assertBool "nothing read" (isNothing val) + + setPermissions blocks p + + mapM_ cancel [worker] + + snd <$> waitAnyCatch [worker] + + case r of + Left err -> error "oopsie!" + _ -> pure () testSimpleStorageNoKeys :: IO () @@ -40,6 +91,8 @@ testSimpleStorageNoKeys = do worker <- async (simpleStorageWorker storage) + link worker + let pieces = take 1000 $ shrink [0x00 .. 0xFF] :: [[Word8]] results' <- forConcurrently pieces $ \p -> do