works, slow

This commit is contained in:
Dmitry Zuikov 2023-01-10 16:19:48 +03:00
parent 2b75931e55
commit 332094a605
3 changed files with 43 additions and 26 deletions

View File

@ -68,6 +68,7 @@ library
, directory , directory
, filepath , filepath
, microlens-platform , microlens-platform
, mtl
, prettyprinter , prettyprinter
, stm , stm
, transformers , transformers

View File

@ -1,8 +1,10 @@
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
module HBS2.Storage.Simple where module HBS2.Storage.Simple where
import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Exception (try,tryJust) import Control.Exception (try,tryJust)
import Control.Monad.Except
import Control.Monad import Control.Monad
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
@ -101,23 +103,37 @@ simpleStorageInit opts = liftIO $ do
simpleStorageWorker :: SimpleStorage h -> IO () simpleStorageWorker :: SimpleStorage h -> IO ()
simpleStorageWorker ss = do simpleStorageWorker ss = do
readOps <- async $ forever $ do ops <- async $ forever $ do
join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ )
writeOps <- async $ forever $ do
join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ ) join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ )
killer <- async $ forever $ do killer <- async $ forever $ do
pause ( 1 :: Timeout 'Minutes ) -- FIXME: setting pause ( 1 :: Timeout 'Minutes ) -- FIXME: setting
Cache.purgeExpired ( ss ^. storageHandles ) Cache.purgeExpired ( ss ^. storageHandles )
void $ waitAnyCatchCancel [readOps,writeOps,killer] (_, e) <- waitAnyCatchCancel [ops,killer]
simpleGetHandle :: SimpleStorage h -> Key (Raw LBS.ByteString) -> IO Handle pure ()
simpleGetHandle :: SimpleStorage h -> Key (Raw LBS.ByteString) -> IO (Maybe Handle)
simpleGetHandle s k = do simpleGetHandle s k = do
let cache = s ^. storageHandles let cache = s ^. storageHandles
let fn = simpleBlockFileName s k let fn = simpleBlockFileName s k
Cache.fetchWithCache cache k $ const $ openFile fn ReadMode
-- h <- Cache.lookup cache k
-- runMaybeT $ do
-- print $ pretty "file to open: " <+> pretty fn
-- err <- runExceptT $ liftIO $ Cache.fetchWithCache cache k $ const $ openFile fn ReadMode
-- Cache.fetchWithCache cache k $ const $ openFile fn ReadMode
--
r <- tryJust (guard . isDoesNotExistError)
(openFile fn ReadMode)
case r of
Right h -> pure (Just h)
Left _ -> pure Nothing
simpleBlockFileName :: SimpleStorage h -> Hash HbSync -> FilePath simpleBlockFileName :: SimpleStorage h -> Hash HbSync -> FilePath
simpleBlockFileName ss h = path simpleBlockFileName ss h = path
@ -147,6 +163,8 @@ simpleGetBlockLazy s key = do
r <- tryJust (guard . isDoesNotExistError) r <- tryJust (guard . isDoesNotExistError)
(BS.readFile fn <&> LBS.fromStrict) (BS.readFile fn <&> LBS.fromStrict)
-- error "FUCK!"
result <- case r of result <- case r of
Right bytes -> pure (Just bytes) Right bytes -> pure (Just bytes)
Left _ -> pure Nothing Left _ -> pure Nothing
@ -156,6 +174,8 @@ simpleGetBlockLazy s key = do
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action
yield
atomically $ TBQ.readTBQueue resQ atomically $ TBQ.readTBQueue resQ
simpleGetChunkLazy :: SimpleStorage h simpleGetChunkLazy :: SimpleStorage h
@ -167,24 +187,19 @@ simpleGetChunkLazy :: SimpleStorage h
simpleGetChunkLazy s key off size = do simpleGetChunkLazy s key off size = do
resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString))
let action = do let action = do
let fn = simpleBlockFileName s key
r <- tryJust (guard . isDoesNotExistError) r <- tryJust (guard . isDoesNotExistError)
(simpleGetHandle s key) $ withBinaryFile fn ReadMode $ \handle -> do
hSeek handle AbsoluteSeek ( fromIntegral off )
chunk <- runMaybeT $ do LBS.hGet handle (fromIntegral size)
handle <- MaybeT $ case r of
Right h -> pure (Just h)
Left _ -> pure Nothing
liftIO $ do
hSeek handle AbsoluteSeek ( fromIntegral off )
LBS.hGet handle (fromIntegral size)
void $ atomically $ TBQ.writeTBQueue resQ chunk
result <- case r of
Right bytes -> pure (Just bytes)
Left _ -> pure Nothing
void $ atomically $ TBQ.writeTBQueue resQ result
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action
atomically $ TBQ.readTBQueue resQ atomically $ TBQ.readTBQueue resQ
simplePutBlockLazy :: SimpleStorage h simplePutBlockLazy :: SimpleStorage h
@ -196,15 +211,17 @@ simplePutBlockLazy s lbs = do
let hash = hashObject lbs :: Key (Raw LBS.ByteString) let hash = hashObject lbs :: Key (Raw LBS.ByteString)
let fn = simpleBlockFileName s hash let fn = simpleBlockFileName s hash
wait <- TBQ.newTBQueueIO 1 :: IO (TBQueue ()) waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue ())
let action = do let action = do
LBS.writeFile fn lbs LBS.writeFile fn lbs
atomically $ TBQ.writeTBQueue wait () atomically $ TBQ.writeTBQueue waits ()
atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action
void $ atomically $ TBQ.readTBQueue wait yield
void $ atomically $ TBQ.readTBQueue waits
pure (Just hash) pure (Just hash)

View File

@ -41,7 +41,7 @@ testSimpleStorageInit = do
let pieces = shrink [0x00 .. 0xFF] :: [[Word8]] let pieces = shrink [0x00 .. 0xFF] :: [[Word8]]
forConcurrently_ (take 1000 pieces) $ \piece -> do forConcurrently_ (take 1000 pieces) $ \piece -> do
-- for_ (take 100 pieces) $ \piece -> do -- for_ (take 1000 pieces) $ \piece -> do
let str = LBS.pack piece let str = LBS.pack piece
@ -83,7 +83,6 @@ testSimpleStorageInit = do
pure () pure ()
cancel worker cancel worker