diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index f0e9370e..65af7b04 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -54,7 +54,6 @@ common shared-properties , TypeFamilies - library import: shared-properties exposed-modules: HBS2.Storage.Simple @@ -64,6 +63,7 @@ library build-depends: base ^>=4.15.1.0, hbs2-core , async , bytestring + , bytestring-mmap , cache , containers , directory @@ -76,6 +76,7 @@ library , streaming , transformers , uniplate + , unordered-containers hs-source-dirs: lib diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index af5088a8..3925248a 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -1,5 +1,6 @@ {-# Language TemplateHaskell #-} {-# Language ScopedTypeVariables #-} +{-# Language UndecidableInstances #-} module HBS2.Storage.Simple ( module HBS2.Storage.Simple ) where @@ -12,8 +13,6 @@ import Control.Monad.Trans.Maybe import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.ByteString (ByteString) -import Data.Cache (Cache) -import Data.Cache qualified as Cache import Data.Foldable import Data.List qualified as L import Data.Maybe @@ -24,6 +23,11 @@ import System.FilePath.Posix import System.IO import System.IO.Error +import Data.HashMap.Strict qualified as HashMap +import Data.HashMap.Strict (HashMap) + +import System.IO.Posix.MMap ( unsafeMMapFile ) + import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue qualified as TBQ import Control.Concurrent.STM.TBMQueue qualified as TBMQ @@ -45,7 +49,8 @@ import HBS2.Storage -- operations and wait in getBlock 'till it's completion -- in order to make the disk access in this fashion safe - +class (Eq (Key h), Hashable (Key h), IsKey h, Key h ~ Hash h) => IsSimpleStorageKey h +instance (Eq (Key h), Hashable (Key h), IsKey h, Key h ~ Hash h) => IsSimpleStorageKey h type instance Block LBS.ByteString = LBS.ByteString @@ -57,9 +62,11 @@ newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int } data SimpleStorage a = SimpleStorage - { _storageDir :: FilePath - , _storageOpQ :: TBMQueue ( IO () ) + { _storageDir :: FilePath + , _storageOpQ :: TBMQueue ( IO () ) , _storageStopWriting :: TVar Bool + , _storageMMaped :: TVar (HashMap (Key a) ByteString) + , _storageMMapedLRU :: TVar (HashMap (Key a) TimeSpec) } makeLenses ''SimpleStorage @@ -75,23 +82,41 @@ storageRefs = to f where f b = _storageDir b "refs" +touchForRead :: (MonadIO m, IsSimpleStorageKey h) => SimpleStorage h -> Key h -> m ByteString +touchForRead ss k = liftIO $ do -simpleStorageInit :: (MonadIO m, Data opts) => opts -> m (SimpleStorage h) + bsmm <- unsafeMMapFile (simpleBlockFileName ss k) + tick <- getTime MonotonicCoarse + + atomically $ do + + mbs <- readTVar mmaped <&> HashMap.lookup k + + r <- case mbs of + Just bs -> pure bs + Nothing -> do + modifyTVar' mmaped (HashMap.insert k bsmm) + pure bsmm + + modifyTVar' (ss ^. storageMMapedLRU) (HashMap.insert k tick) + + pure r + + where + mmaped = ss ^. storageMMaped + + +simpleStorageInit :: (MonadIO m, Data opts, IsSimpleStorageKey h) => opts -> m (SimpleStorage h) simpleStorageInit opts = liftIO $ do let prefix = uniLastDef "." opts :: StoragePrefix let qSize = uniLastDef 20000 opts :: StorageQueueSize - pdir <- canonicalizePath (fromPrefix prefix) - - tbq <- TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize)) - - tstop <- TV.newTVarIO False - - let stor = SimpleStorage - { _storageDir = pdir - , _storageOpQ = tbq - , _storageStopWriting = tstop - } + stor <- SimpleStorage + <$> canonicalizePath (fromPrefix prefix) + <*> TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize)) + <*> TV.newTVarIO False + <*> TV.newTVarIO mempty + <*> TV.newTVarIO mempty createDirectoryIfMissing True (stor ^. storageBlocks) @@ -103,8 +128,6 @@ simpleStorageInit opts = liftIO $ do pure stor - - catchAny :: IO a -> (SomeException -> IO a) -> IO a catchAny = Control.Exception.catch @@ -123,7 +146,7 @@ simpleStorageStop ss = do else pause ( 0.01 :: Timeout 'Seconds ) >> next -simpleStorageWorker :: SimpleStorage h -> IO () +simpleStorageWorker :: IsSimpleStorageKey h => SimpleStorage h -> IO () simpleStorageWorker ss = do ops <- async $ fix \next -> do @@ -132,10 +155,23 @@ simpleStorageWorker ss = do Nothing -> pure () Just a -> a >> next - killer <- async $ forever $ do + killer30 <- async $ forever $ do pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting - (_, e) <- waitAnyCatchCancel [ops,killer] + atomically $ do + + alive <- readTVar ( ss ^. storageMMapedLRU ) + mmaped <- readTVar ( ss ^. storageMMaped ) + + let survived = mmaped `HashMap.intersection` alive + + writeTVar ( ss ^. storageMMaped ) survived + + killer5 <- async $ forever $ do + pause ( 5 :: Timeout 'Seconds ) -- FIXME: setting + atomically $ writeTVar ( ss ^. storageMMapedLRU ) mempty + + (_, e) <- waitAnyCatchCancel [ops,killer30, killer5] pure () @@ -187,7 +223,7 @@ simpleGetBlockLazy s key = do atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure -simpleGetChunkLazy :: IsKey h +simpleGetChunkLazy :: IsSimpleStorageKey h => SimpleStorage h -> Hash h -> Offset @@ -199,16 +235,11 @@ simpleGetChunkLazy s key off size = do let action = do let fn = simpleBlockFileName s key - r <- tryJust (guard . isDoesNotExistError) - $ withBinaryFile fn ReadMode $ \ha -> do - hSeek ha AbsoluteSeek ( fromIntegral off ) - LBS.hGet ha ( fromIntegral size ) + bs <- (Just <$> touchForRead s key) `catchAny` const (pure Nothing) -- FIXME: log this situation (file not found) - result <- case r of - Right bytes -> pure (Just bytes) - Left _ -> pure Nothing + let result = BS.take (fromIntegral size) . BS.drop (fromIntegral off) <$> bs - void $ atomically $ TBMQ.writeTBMQueue resQ result + void $ atomically $ TBMQ.writeTBMQueue resQ (LBS.fromStrict <$> result) let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing @@ -269,8 +300,7 @@ spawnAndWait s act = do atomically $ TBMQ.readTBMQueue doneQ -simpleWriteLinkRaw :: forall h . ( IsKey h - , Key h ~ Hash h +simpleWriteLinkRaw :: forall h . ( IsSimpleStorageKey h , Hashed h LBS.ByteString ) => SimpleStorage h @@ -308,6 +338,7 @@ simpleReadLinkRaw ss hash = do instance ( MonadIO m, IsKey hash , Hashed hash LBS.ByteString , Key hash ~ Hash hash + , IsSimpleStorageKey hash , Block LBS.ByteString ~ LBS.ByteString ) => Storage (SimpleStorage hash) hash LBS.ByteString m where diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index 3225a82a..2854ee1e 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -23,7 +23,7 @@ pieces :: Integral a => a pieces = 8192 class SimpleStorageExtra a where - putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash + putAsMerkle :: forall h . (IsSimpleStorageKey h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m () readChunked handle size = fuu