This commit is contained in:
Dmitry Zuikov 2023-01-28 05:59:37 +03:00
parent 4c0f96342d
commit d81671ef58
3 changed files with 67 additions and 35 deletions

View File

@ -54,7 +54,6 @@ common shared-properties
, TypeFamilies
library
import: shared-properties
exposed-modules: HBS2.Storage.Simple
@ -64,6 +63,7 @@ library
build-depends: base ^>=4.15.1.0, hbs2-core
, async
, bytestring
, bytestring-mmap
, cache
, containers
, directory
@ -76,6 +76,7 @@ library
, streaming
, transformers
, uniplate
, unordered-containers
hs-source-dirs: lib

View File

@ -1,5 +1,6 @@
{-# Language TemplateHaskell #-}
{-# Language ScopedTypeVariables #-}
{-# Language UndecidableInstances #-}
module HBS2.Storage.Simple
( module HBS2.Storage.Simple
) where
@ -12,8 +13,6 @@ import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.ByteString (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable
import Data.List qualified as L
import Data.Maybe
@ -24,6 +23,11 @@ import System.FilePath.Posix
import System.IO
import System.IO.Error
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap)
import System.IO.Posix.MMap ( unsafeMMapFile )
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as TBQ
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
@ -45,7 +49,8 @@ import HBS2.Storage
-- operations and wait in getBlock 'till it's completion
-- in order to make the disk access in this fashion safe
class (Eq (Key h), Hashable (Key h), IsKey h, Key h ~ Hash h) => IsSimpleStorageKey h
instance (Eq (Key h), Hashable (Key h), IsKey h, Key h ~ Hash h) => IsSimpleStorageKey h
type instance Block LBS.ByteString = LBS.ByteString
@ -57,9 +62,11 @@ newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int }
data SimpleStorage a =
SimpleStorage
{ _storageDir :: FilePath
, _storageOpQ :: TBMQueue ( IO () )
{ _storageDir :: FilePath
, _storageOpQ :: TBMQueue ( IO () )
, _storageStopWriting :: TVar Bool
, _storageMMaped :: TVar (HashMap (Key a) ByteString)
, _storageMMapedLRU :: TVar (HashMap (Key a) TimeSpec)
}
makeLenses ''SimpleStorage
@ -75,23 +82,41 @@ storageRefs = to f
where
f b = _storageDir b </> "refs"
touchForRead :: (MonadIO m, IsSimpleStorageKey h) => SimpleStorage h -> Key h -> m ByteString
touchForRead ss k = liftIO $ do
simpleStorageInit :: (MonadIO m, Data opts) => opts -> m (SimpleStorage h)
bsmm <- unsafeMMapFile (simpleBlockFileName ss k)
tick <- getTime MonotonicCoarse
atomically $ do
mbs <- readTVar mmaped <&> HashMap.lookup k
r <- case mbs of
Just bs -> pure bs
Nothing -> do
modifyTVar' mmaped (HashMap.insert k bsmm)
pure bsmm
modifyTVar' (ss ^. storageMMapedLRU) (HashMap.insert k tick)
pure r
where
mmaped = ss ^. storageMMaped
simpleStorageInit :: (MonadIO m, Data opts, IsSimpleStorageKey h) => opts -> m (SimpleStorage h)
simpleStorageInit opts = liftIO $ do
let prefix = uniLastDef "." opts :: StoragePrefix
let qSize = uniLastDef 20000 opts :: StorageQueueSize
pdir <- canonicalizePath (fromPrefix prefix)
tbq <- TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize))
tstop <- TV.newTVarIO False
let stor = SimpleStorage
{ _storageDir = pdir
, _storageOpQ = tbq
, _storageStopWriting = tstop
}
stor <- SimpleStorage
<$> canonicalizePath (fromPrefix prefix)
<*> TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize))
<*> TV.newTVarIO False
<*> TV.newTVarIO mempty
<*> TV.newTVarIO mempty
createDirectoryIfMissing True (stor ^. storageBlocks)
@ -103,8 +128,6 @@ simpleStorageInit opts = liftIO $ do
pure stor
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
@ -123,7 +146,7 @@ simpleStorageStop ss = do
else
pause ( 0.01 :: Timeout 'Seconds ) >> next
simpleStorageWorker :: SimpleStorage h -> IO ()
simpleStorageWorker :: IsSimpleStorageKey h => SimpleStorage h -> IO ()
simpleStorageWorker ss = do
ops <- async $ fix \next -> do
@ -132,10 +155,23 @@ simpleStorageWorker ss = do
Nothing -> pure ()
Just a -> a >> next
killer <- async $ forever $ do
killer30 <- async $ forever $ do
pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting
(_, e) <- waitAnyCatchCancel [ops,killer]
atomically $ do
alive <- readTVar ( ss ^. storageMMapedLRU )
mmaped <- readTVar ( ss ^. storageMMaped )
let survived = mmaped `HashMap.intersection` alive
writeTVar ( ss ^. storageMMaped ) survived
killer5 <- async $ forever $ do
pause ( 5 :: Timeout 'Seconds ) -- FIXME: setting
atomically $ writeTVar ( ss ^. storageMMapedLRU ) mempty
(_, e) <- waitAnyCatchCancel [ops,killer30, killer5]
pure ()
@ -187,7 +223,7 @@ simpleGetBlockLazy s key = do
atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure
simpleGetChunkLazy :: IsKey h
simpleGetChunkLazy :: IsSimpleStorageKey h
=> SimpleStorage h
-> Hash h
-> Offset
@ -199,16 +235,11 @@ simpleGetChunkLazy s key off size = do
let action = do
let fn = simpleBlockFileName s key
r <- tryJust (guard . isDoesNotExistError)
$ withBinaryFile fn ReadMode $ \ha -> do
hSeek ha AbsoluteSeek ( fromIntegral off )
LBS.hGet ha ( fromIntegral size )
bs <- (Just <$> touchForRead s key) `catchAny` const (pure Nothing) -- FIXME: log this situation (file not found)
result <- case r of
Right bytes -> pure (Just bytes)
Left _ -> pure Nothing
let result = BS.take (fromIntegral size) . BS.drop (fromIntegral off) <$> bs
void $ atomically $ TBMQ.writeTBMQueue resQ result
void $ atomically $ TBMQ.writeTBMQueue resQ (LBS.fromStrict <$> result)
let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing
@ -269,8 +300,7 @@ spawnAndWait s act = do
atomically $ TBMQ.readTBMQueue doneQ
simpleWriteLinkRaw :: forall h . ( IsKey h
, Key h ~ Hash h
simpleWriteLinkRaw :: forall h . ( IsSimpleStorageKey h
, Hashed h LBS.ByteString
)
=> SimpleStorage h
@ -308,6 +338,7 @@ simpleReadLinkRaw ss hash = do
instance ( MonadIO m, IsKey hash
, Hashed hash LBS.ByteString
, Key hash ~ Hash hash
, IsSimpleStorageKey hash
, Block LBS.ByteString ~ LBS.ByteString
)
=> Storage (SimpleStorage hash) hash LBS.ByteString m where

View File

@ -23,7 +23,7 @@ pieces :: Integral a => a
pieces = 8192
class SimpleStorageExtra a where
putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash
putAsMerkle :: forall h . (IsSimpleStorageKey h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash
readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m ()
readChunked handle size = fuu