done in first approach

This commit is contained in:
Dmitry Zuikov 2023-01-11 10:15:04 +03:00
parent da7b56d4fc
commit ca29851f6d
3 changed files with 96 additions and 22 deletions

View File

@ -71,6 +71,7 @@ library
, mtl , mtl
, prettyprinter , prettyprinter
, stm , stm
, stm-chans
, transformers , transformers
, uniplate , uniplate

View File

@ -3,7 +3,7 @@ module HBS2.Storage.Simple where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Exception (try,tryJust) import Control.Exception
import Control.Monad import Control.Monad
import Control.Monad.Except import Control.Monad.Except
import Control.Monad.IO.Class import Control.Monad.IO.Class
@ -26,9 +26,13 @@ import System.IO.Error
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as TBQ import Control.Concurrent.STM.TBQueue qualified as TBQ
import Control.Concurrent.STM.TBQueue (TBQueue) import Control.Concurrent.STM.TBQueue (TBQueue)
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
import Control.Concurrent.STM.TBMQueue (TBMQueue)
import Control.Concurrent.STM.TVar (TVar) import Control.Concurrent.STM.TVar (TVar)
import Control.Concurrent.STM.TVar qualified as TV import Control.Concurrent.STM.TVar qualified as TV
import Debug.Trace
import HBS2.Clock import HBS2.Clock
import HBS2.Hash import HBS2.Hash
import HBS2.Prelude import HBS2.Prelude
@ -62,7 +66,7 @@ newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int }
data SimpleStorage a = data SimpleStorage a =
SimpleStorage SimpleStorage
{ _storageDir :: FilePath { _storageDir :: FilePath
, _storageOpQ :: TBQueue ( IO () ) , _storageOpQ :: TBMQueue ( IO () )
, _storageChunksCache :: Cache (FilePath, Offset, Size) ByteString , _storageChunksCache :: Cache (FilePath, Offset, Size) ByteString
} }
@ -81,7 +85,7 @@ simpleStorageInit opts = liftIO $ do
pdir <- canonicalizePath (fromPrefix prefix) pdir <- canonicalizePath (fromPrefix prefix)
tbq <- TBQ.newTBQueueIO (fromIntegral (fromQueueSize qSize)) tbq <- TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize))
hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 1)) -- FIXME: real setting hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 1)) -- FIXME: real setting
@ -102,11 +106,22 @@ simpleStorageInit opts = liftIO $ do
pure stor pure stor
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
simpleAddTask :: SimpleStorage h -> IO () -> IO ()
simpleAddTask s task = do
atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task
simpleStorageWorker :: SimpleStorage h -> IO () simpleStorageWorker :: SimpleStorage h -> IO ()
simpleStorageWorker ss = do simpleStorageWorker ss = do
ops <- async $ forever $ do ops <- async $ forever $ do
join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ )
s <- atomically $ do TBMQ.readTBMQueue ( ss ^. storageOpQ )
case s of
Nothing -> pure ()
Just a -> a
killer <- async $ forever $ do killer <- async $ forever $ do
pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting
@ -147,6 +162,8 @@ simpleBlockFileName ss h = path
(pref,suf) = splitAt 1 (show (pretty h)) (pref,suf) = splitAt 1 (show (pretty h))
path = view storageBlocks ss </> pref </> suf path = view storageBlocks ss </> pref </> suf
-- NOTE: reads a whole file into memory! -- NOTE: reads a whole file into memory!
-- if file size is too big --- it will -- if file size is too big --- it will
-- cause consequences! -- cause consequences!
@ -162,25 +179,24 @@ simpleGetBlockLazy :: SimpleStorage h
-> IO (Maybe LBS.ByteString) -> IO (Maybe LBS.ByteString)
simpleGetBlockLazy s key = do simpleGetBlockLazy s key = do
resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) resQ <- TBMQ.newTBMQueueIO 1 :: IO (TBMQueue (Maybe LBS.ByteString))
let fn = simpleBlockFileName s key let fn = simpleBlockFileName s key
let action = do let action = do
r <- tryJust (guard . isDoesNotExistError) r <- tryJust (guard . isDoesNotExistError)
(BS.readFile fn <&> LBS.fromStrict) (BS.readFile fn <&> LBS.fromStrict)
-- error "FUCK!"
result <- case r of result <- case r of
Right bytes -> pure (Just bytes) Right bytes -> pure (Just bytes)
Left _ -> pure Nothing Left _ -> pure Nothing
void $ atomically $ TBQ.writeTBQueue resQ result void $ atomically $ TBMQ.writeTBMQueue resQ result
let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action simpleAddTask s (catch action onFail)
atomically $ TBQ.readTBQueue resQ atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure
simpleGetChunkLazy :: SimpleStorage h simpleGetChunkLazy :: SimpleStorage h
-> Key (Raw LBS.ByteString) -> Key (Raw LBS.ByteString)
@ -189,7 +205,7 @@ simpleGetChunkLazy :: SimpleStorage h
-> IO (Maybe LBS.ByteString) -> IO (Maybe LBS.ByteString)
simpleGetChunkLazy s key off size = do simpleGetChunkLazy s key off size = do
resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) resQ <- TBMQ.newTBMQueueIO 1 :: IO (TBMQueue (Maybe LBS.ByteString))
let action = do let action = do
let fn = simpleBlockFileName s key let fn = simpleBlockFileName s key
@ -197,7 +213,7 @@ simpleGetChunkLazy s key off size = do
case cached of case cached of
Just chunk -> do Just chunk -> do
void $ atomically $ TBQ.writeTBQueue resQ (Just chunk) void $ atomically $ TBMQ.writeTBMQueue resQ (Just chunk)
Nothing -> do Nothing -> do
r <- tryJust (guard . isDoesNotExistError) r <- tryJust (guard . isDoesNotExistError)
@ -229,11 +245,13 @@ simpleGetChunkLazy s key off size = do
Right bytes -> pure (Just bytes) Right bytes -> pure (Just bytes)
Left _ -> pure Nothing Left _ -> pure Nothing
void $ atomically $ TBQ.writeTBQueue resQ result void $ atomically $ TBMQ.writeTBMQueue resQ result
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing
atomically $ TBQ.readTBQueue resQ simpleAddTask s (catch action onFail )
atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure
simplePutBlockLazy :: SimpleStorage h simplePutBlockLazy :: SimpleStorage h
-> LBS.ByteString -> LBS.ByteString
@ -244,17 +262,19 @@ simplePutBlockLazy s lbs = do
let hash = hashObject lbs :: Key (Raw LBS.ByteString) let hash = hashObject lbs :: Key (Raw LBS.ByteString)
let fn = simpleBlockFileName s hash let fn = simpleBlockFileName s hash
waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue ()) waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue Bool)
let action = do let action = do
LBS.writeFile fn lbs catch (LBS.writeFile fn lbs)
atomically $ TBQ.writeTBQueue waits () (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False)
atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action atomically $ TBQ.writeTBQueue waits True
void $ atomically $ TBQ.readTBQueue waits simpleAddTask s action
pure (Just hash) ok <- atomically $ TBQ.readTBQueue waits
pure $! if ok then Just hash else Nothing
simpleBlockExists :: SimpleStorage h simpleBlockExists :: SimpleStorage h

View File

@ -1,5 +1,6 @@
module TestSimpleStorage where module TestSimpleStorage where
import Control.Monad.Except
import Control.Monad import Control.Monad
import Data.Traversable import Data.Traversable
import Data.Foldable import Data.Foldable
@ -14,6 +15,7 @@ import System.Directory
import System.FilePath.Posix import System.FilePath.Posix
import System.IO.Temp import System.IO.Temp
import Test.QuickCheck import Test.QuickCheck
import System.TimeIt
import Test.Tasty.HUnit import Test.Tasty.HUnit
@ -24,9 +26,58 @@ import HBS2.Storage
import HBS2.Storage.Simple import HBS2.Storage.Simple
-- CASE:
-- Current result:
-- *** Exception: thread blocked indefinitely in an STM transaction
--
-- Expected result: survives this situation with honor
testSimpleStorageErrors :: IO () testSimpleStorageErrors :: IO ()
testSimpleStorageErrors = do testSimpleStorageErrors = do
undefined
withSystemTempDirectory "simpleStorageTest" $ \dir -> do
let opts = [ StoragePrefix (dir </> ".storage")
]
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
r <- runExceptT $ liftIO $ do
worker <- async (simpleStorageWorker storage)
let blocks = storage ^. storageBlocks
p <- getPermissions blocks
setPermissions blocks (p { readable = False
, searchable = False
, writable = False
})
let str = "AAAAA" :: LBS.ByteString
let strKey = hashObject @HbSync str
key <- putBlock storage str
assertBool "nothing written" (isNothing key)
here <- hasBlock storage strKey
assertBool "nothing written, again" (not here)
val <- getBlock storage strKey
assertBool "nothing read" (isNothing val)
setPermissions blocks p
mapM_ cancel [worker]
snd <$> waitAnyCatch [worker]
case r of
Left err -> error "oopsie!"
_ -> pure ()
testSimpleStorageNoKeys :: IO () testSimpleStorageNoKeys :: IO ()
@ -40,6 +91,8 @@ testSimpleStorageNoKeys = do
worker <- async (simpleStorageWorker storage) worker <- async (simpleStorageWorker storage)
link worker
let pieces = take 1000 $ shrink [0x00 .. 0xFF] :: [[Word8]] let pieces = take 1000 $ shrink [0x00 .. 0xFF] :: [[Word8]]
results' <- forConcurrently pieces $ \p -> do results' <- forConcurrently pieces $ \p -> do