This commit is contained in:
Dmitry Zuikov 2023-01-10 18:51:13 +03:00
parent f9c4464301
commit 7eb6b254c8
4 changed files with 86 additions and 43 deletions

View File

@ -2,6 +2,7 @@
module HBS2.Storage where
import Data.Kind
import Data.Hashable hiding (Hashed)
import HBS2.Hash
@ -9,10 +10,12 @@ type family Block block :: Type
type family Key block :: Type
newtype Offset = Offset Integer
deriving newtype (Eq,Ord,Enum,Num,Real,Integral)
deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable)
deriving stock (Show)
newtype Size = Size Integer
deriving newtype (Eq,Ord,Enum,Num,Real,Integral)
deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable)
deriving stock (Show)
class ( Monad m
@ -22,10 +25,13 @@ class ( Monad m
type family StorageHash a block :: Type
putBlock :: a -> Block block -> m (Maybe (Key block))
getBlock :: a -> Key block -> m (Maybe (Block block))
getChunk :: a -> Key block -> Offset -> Size -> m (Maybe (Block block))
hasBlock :: a -> Key block -> m Bool
listBlocks :: a -> ( Key block -> m () ) -> m ()

View File

@ -111,6 +111,7 @@ test-suite test
, tasty
, tasty-hunit
, temporary
, timeit
, uniplate
, vector

View File

@ -4,16 +4,18 @@ module HBS2.Storage.Simple where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception (try,tryJust)
import Control.Monad.Except
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.ByteString (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable
import Data.List qualified as L
import Data.Maybe
import Lens.Micro.Platform
import Prettyprinter
import System.Directory
@ -61,7 +63,7 @@ data SimpleStorage a =
SimpleStorage
{ _storageDir :: FilePath
, _storageOpQ :: TBQueue ( IO () )
, _storageHandles :: Cache (Key (Raw LBS.ByteString)) Handle
, _storageChunksCache :: Cache (FilePath, Offset, Size) ByteString
}
makeLenses ''SimpleStorage
@ -81,12 +83,12 @@ simpleStorageInit opts = liftIO $ do
tbq <- TBQ.newTBQueueIO (fromIntegral (fromQueueSize qSize))
hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 10)) -- FIXME: real setting
hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 1)) -- FIXME: real setting
let stor = SimpleStorage
{ _storageDir = pdir
, _storageOpQ = tbq
, _storageHandles = hcache
, _storageChunksCache = hcache
}
-- print ("STORAGE", stor ^. storageDir, stor ^. storageBlocks )
@ -107,33 +109,37 @@ simpleStorageWorker ss = do
join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ )
killer <- async $ forever $ do
pause ( 1 :: Timeout 'Minutes ) -- FIXME: setting
Cache.purgeExpired ( ss ^. storageHandles )
pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting
Cache.purgeExpired ( ss ^. storageChunksCache )
(_, e) <- waitAnyCatchCancel [ops,killer]
pure ()
simpleGetHandle :: SimpleStorage h -> Key (Raw LBS.ByteString) -> IO (Maybe Handle)
simpleGetHandle s k = do
let cache = s ^. storageHandles
simpleChunkLookup :: SimpleStorage h
-> Key (Raw LBS.ByteString)
-> Offset
-> Size
-> IO (Maybe LBS.ByteString)
simpleChunkLookup s k off size = do
let fn = simpleBlockFileName s k
let cache = s ^. storageChunksCache
Cache.lookup cache (fn, off, size) <&> fmap LBS.fromStrict
-- h <- Cache.lookup cache k
-- runMaybeT $ do
-- print $ pretty "file to open: " <+> pretty fn
-- err <- runExceptT $ liftIO $ Cache.fetchWithCache cache k $ const $ openFile fn ReadMode
-- Cache.fetchWithCache cache k $ const $ openFile fn ReadMode
--
r <- tryJust (guard . isDoesNotExistError)
(openFile fn ReadMode)
case r of
Right h -> pure (Just h)
Left _ -> pure Nothing
simpleChunkCache :: SimpleStorage h
-> Key (Raw LBS.ByteString)
-> Offset
-> Size
-> LBS.ByteString
-> IO ()
simpleChunkCache s k off size bs = do
let fn = simpleBlockFileName s k
let cache = s ^. storageChunksCache
-- print ("caching!", fn, off, size)
Cache.insert cache (fn, off, size) (LBS.toStrict bs)
simpleBlockFileName :: SimpleStorage h -> Hash HbSync -> FilePath
simpleBlockFileName ss h = path
@ -141,12 +147,12 @@ simpleBlockFileName ss h = path
(pref,suf) = splitAt 1 (show (pretty h))
path = view storageBlocks ss </> pref </> suf
-- NOTE: reads whole file into memory!
-- if size is too big --- it will
-- NOTE: reads a whole file into memory!
-- if file size is too big --- it will
-- cause consequences!
--
-- However, we can not hold the file
-- handle in lazy bytestring, because
-- However, we can not hold file
-- handles in lazy bytestrings, because
-- here maybe too many open files
--
-- So, the block MUST be small
@ -174,8 +180,6 @@ simpleGetBlockLazy s key = do
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action
yield
atomically $ TBQ.readTBQueue resQ
simpleGetChunkLazy :: SimpleStorage h
@ -188,15 +192,44 @@ simpleGetChunkLazy s key off size = do
resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString))
let action = do
let fn = simpleBlockFileName s key
r <- tryJust (guard . isDoesNotExistError)
$ withBinaryFile fn ReadMode $ \handle -> do
hSeek handle AbsoluteSeek ( fromIntegral off )
LBS.hGet handle (fromIntegral size)
result <- case r of
Right bytes -> pure (Just bytes)
Left _ -> pure Nothing
void $ atomically $ TBQ.writeTBQueue resQ result
cached <- simpleChunkLookup s key off size
case cached of
Just chunk -> do
void $ atomically $ TBQ.writeTBQueue resQ (Just chunk)
Nothing -> do
r <- tryJust (guard . isDoesNotExistError)
$ withBinaryFile fn ReadMode $ \handle -> do
hSeek handle AbsoluteSeek ( fromIntegral off )
bytes <- LBS.hGet handle ( fromIntegral size )
let ahead = 16
let bnum = off `div` fromIntegral size
let doCache =
ahead > 0
&& size > 0
&& size < 4096
&& (bnum `mod` ahead) == 0
when doCache do -- FIXME:! setting
chunks <- forM [ size .. size * fromIntegral ahead ] $ \i -> do
let o = fromIntegral off + fromIntegral (i * size)
hSeek handle AbsoluteSeek o
fwd <- LBS.hGet handle (fromIntegral size)
pure (fwd, fromIntegral o)
let chunks' = takeWhile (not . LBS.null . fst) chunks
mapM_ (\(c,o) -> simpleChunkCache s key o size c) chunks'
pure bytes
result <- case r of
Right bytes -> pure (Just bytes)
Left _ -> pure Nothing
void $ atomically $ TBQ.writeTBQueue resQ result
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action
@ -219,17 +252,20 @@ simplePutBlockLazy s lbs = do
atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action
yield
void $ atomically $ TBQ.readTBQueue waits
pure (Just hash)
simpleBlockExists :: SimpleStorage h
-> Key (Raw LBS.ByteString)
-> IO Bool
simpleBlockExists ss hash = doesFileExist $ simpleBlockFileName ss hash
instance Hashed HbSync (Raw LBS.ByteString) where
hashObject (Raw s) = hashObject s
instance (MonadIO m, (Hashed hash (Raw LBS.ByteString)))
=> Storage (SimpleStorage hash) (Raw LBS.ByteString) m where
@ -241,5 +277,5 @@ instance (MonadIO m, (Hashed hash (Raw LBS.ByteString)))
getChunk s k off size = liftIO $ simpleGetChunkLazy s k off size
hasBlock s k = liftIO $ simpleBlockExists s k

View File

@ -74,7 +74,7 @@ testSimpleStorageRandomReadWrite = do
let pieces = shrink [0x00 .. 0xFF] :: [[Word8]]
forConcurrently_ (take 1000 pieces) $ \piece -> do
-- for_ (take 1000 pieces) $ \piece -> do
-- for_ (take 10 pieces) $ \piece -> do
let str = LBS.pack piece