From 2b75931e555f34796a169d247a63b78471a640f8 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 10 Jan 2023 15:20:26 +0300 Subject: [PATCH] wip --- hbs2-core/hbs2-core.cabal | 2 + hbs2-core/lib/HBS2/Clock.hs | 65 +++++++++++++++ hbs2-core/lib/HBS2/Storage.hs | 14 ++-- hbs2-storage-simple/hbs2-storage-simple.cabal | 6 +- .../lib/HBS2/Storage/Simple.hs | 80 ++++++++++++++++--- hbs2-storage-simple/test/TestSimpleStorage.hs | 77 +++++++++++++----- 6 files changed, 207 insertions(+), 37 deletions(-) create mode 100644 hbs2-core/lib/HBS2/Clock.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 98679fb5..6128d798 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -64,6 +64,7 @@ library exposed-modules: HBS2.Hash + , HBS2.Clock , HBS2.Prelude , HBS2.Prelude.Plated , HBS2.Storage @@ -78,6 +79,7 @@ library , binary , bytestring , cborg + , clock , containers , cryptonite , deepseq diff --git a/hbs2-core/lib/HBS2/Clock.hs b/hbs2-core/lib/HBS2/Clock.hs new file mode 100644 index 00000000..717c70ea --- /dev/null +++ b/hbs2-core/lib/HBS2/Clock.hs @@ -0,0 +1,65 @@ +module HBS2.Clock + ( module HBS2.Clock + , module System.Clock + )where + +import Control.Monad.IO.Class +import Data.Fixed +import Data.Int (Int64) +import Prettyprinter +import System.Clock +import Control.Concurrent (threadDelay) + +data TimeoutKind = MilliSeconds | Seconds | Minutes + +data family Timeout ( a :: TimeoutKind ) + + +newtype Wait a = Wait a + deriving newtype (Eq,Show,Pretty) + +newtype Delay a = Delay a + deriving newtype (Eq,Show,Pretty) + + + +class IsTimeout a where + toNanoSeconds :: Timeout a -> Int64 + + toMicroSeconds :: Timeout a -> Int + toMicroSeconds x = fromIntegral $ toNanoSeconds x `div` 1000 + + toTimeSpec :: Timeout a -> TimeSpec + toTimeSpec x = fromNanoSecs (fromIntegral (toNanoSeconds x)) + +class IsTimeout a => MonadPause m a where + pause :: Timeout a -> m () + +instance (IsTimeout a, MonadIO m) => MonadPause m a where + pause x = liftIO $ threadDelay (toMicroSeconds x) + +instance Pretty (Fixed E9) where + pretty = pretty . show + + +newtype instance Timeout 'MilliSeconds = + TimeoutMSec (Fixed E9) + deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty) + +newtype instance Timeout 'Seconds = + TimeoutSec (Fixed E9) + deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty) + +newtype instance Timeout 'Minutes = + TimeoutMin (Fixed E9) + deriving newtype (Eq,Ord,Num,Real,Fractional,Show,Pretty) + +instance IsTimeout 'MilliSeconds where + toNanoSeconds (TimeoutMSec x) = round (x * 1e6) + +instance IsTimeout 'Seconds where + toNanoSeconds (TimeoutSec x) = round (x * 1e9) + +instance IsTimeout 'Minutes where + toNanoSeconds (TimeoutMin x) = round (x * 60 * 1e9) + diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index f2490a6c..2059ec45 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -2,15 +2,18 @@ module HBS2.Storage where import Data.Kind -import Data.Proxy import HBS2.Hash type family Block block :: Type type family Key block :: Type --- class HasHashFunction h a b where --- hashFun :: Proxy a -> b -> Hash h +newtype Offset = Offset Integer + deriving newtype (Eq,Ord,Enum,Num,Real,Integral) + +newtype Size = Size Integer + deriving newtype (Eq,Ord,Enum,Num,Real,Integral) + class ( Monad m , Hashed (StorageHash a block) block @@ -20,9 +23,10 @@ class ( Monad m putBlock :: a -> Block block -> m (Maybe (Key block)) getBlock :: a -> Key block -> m (Maybe (Block block)) + + getChunk :: a -> Key block -> Offset -> Size -> m (Maybe (Block block)) + listBlocks :: a -> ( Key block -> m () ) -> m () - - diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index 940c773f..75d9ad8e 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -63,12 +63,14 @@ library build-depends: base ^>=4.15.1.0, hbs2-core , async , bytestring + , cache , containers , directory , filepath , microlens-platform , prettyprinter , stm + , transformers , uniplate @@ -96,16 +98,18 @@ test-suite test , cborg , containers , directory + , filepath , hashable , microlens-platform , mtl , prettyprinter + , QuickCheck , random , safe , serialise , tasty , tasty-hunit - , transformers + , temporary , uniplate , vector diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 258b1eb5..cc957ccd 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -5,24 +5,31 @@ import Control.Concurrent.Async import Control.Exception (try,tryJust) import Control.Monad import Control.Monad.IO.Class +import Control.Monad.Trans.Maybe import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS +import Data.Cache (Cache) +import Data.Cache qualified as Cache import Data.Foldable import Data.List qualified as L import Lens.Micro.Platform import Prettyprinter import System.Directory import System.FilePath.Posix +import System.IO import System.IO.Error import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue qualified as TBQ import Control.Concurrent.STM.TBQueue (TBQueue) +import Control.Concurrent.STM.TVar (TVar) +import Control.Concurrent.STM.TVar qualified as TV +import HBS2.Clock import HBS2.Hash -import HBS2.Storage import HBS2.Prelude import HBS2.Prelude.Plated +import HBS2.Storage -- NOTE: random accessing files in a git-like storage -- causes to file handles exhaust. @@ -50,8 +57,9 @@ newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int } data SimpleStorage a = SimpleStorage - { _storageDir :: FilePath - , _storageOpQ :: TBQueue ( IO () ) + { _storageDir :: FilePath + , _storageOpQ :: TBQueue ( IO () ) + , _storageHandles :: Cache (Key (Raw LBS.ByteString)) Handle } makeLenses ''SimpleStorage @@ -65,17 +73,22 @@ storageBlocks = to f simpleStorageInit :: (MonadIO m, Data opts) => opts -> m (SimpleStorage h) simpleStorageInit opts = liftIO $ do let prefix = uniLastDef "." opts :: StoragePrefix - let qSize = uniLastDef 10 opts :: StorageQueueSize + let qSize = uniLastDef 100 opts :: StorageQueueSize pdir <- canonicalizePath (fromPrefix prefix) tbq <- TBQ.newTBQueueIO (fromIntegral (fromQueueSize qSize)) + hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 10)) -- FIXME: real setting + let stor = SimpleStorage { _storageDir = pdir , _storageOpQ = tbq + , _storageHandles = hcache } + -- print ("STORAGE", stor ^. storageDir, stor ^. storageBlocks ) + createDirectoryIfMissing True (stor ^. storageBlocks) let alph = getAlphabet @@ -94,8 +107,17 @@ simpleStorageWorker ss = do writeOps <- async $ forever $ do join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ ) - void $ waitAnyCatchCancel [readOps,writeOps] + killer <- async $ forever $ do + pause ( 1 :: Timeout 'Minutes ) -- FIXME: setting + Cache.purgeExpired ( ss ^. storageHandles ) + void $ waitAnyCatchCancel [readOps,writeOps,killer] + +simpleGetHandle :: SimpleStorage h -> Key (Raw LBS.ByteString) -> IO Handle +simpleGetHandle s k = do + let cache = s ^. storageHandles + let fn = simpleBlockFileName s k + Cache.fetchWithCache cache k $ const $ openFile fn ReadMode simpleBlockFileName :: SimpleStorage h -> Hash HbSync -> FilePath simpleBlockFileName ss h = path @@ -115,10 +137,10 @@ simpleBlockFileName ss h = path -- simpleGetBlockLazy :: SimpleStorage h -> Key (Raw LBS.ByteString) - -> IO (Maybe (Raw LBS.ByteString)) + -> IO (Maybe LBS.ByteString) simpleGetBlockLazy s key = do - resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe (Raw LBS.ByteString))) + resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) let fn = simpleBlockFileName s key let action = do @@ -126,7 +148,7 @@ simpleGetBlockLazy s key = do (BS.readFile fn <&> LBS.fromStrict) result <- case r of - Right bytes -> pure (Just (Raw bytes)) + Right bytes -> pure (Just bytes) Left _ -> pure Nothing void $ atomically $ TBQ.writeTBQueue resQ result @@ -136,8 +158,35 @@ simpleGetBlockLazy s key = do atomically $ TBQ.readTBQueue resQ --- non-blocking version, always returns Just hash --- maybe it's not good +simpleGetChunkLazy :: SimpleStorage h + -> Key (Raw LBS.ByteString) + -> Offset + -> Size + -> IO (Maybe LBS.ByteString) + +simpleGetChunkLazy s key off size = do + resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe LBS.ByteString)) + let action = do + + r <- tryJust (guard . isDoesNotExistError) + (simpleGetHandle s key) + + chunk <- runMaybeT $ do + + handle <- MaybeT $ case r of + Right h -> pure (Just h) + Left _ -> pure Nothing + + liftIO $ do + hSeek handle AbsoluteSeek ( fromIntegral off ) + LBS.hGet handle (fromIntegral size) + + void $ atomically $ TBQ.writeTBQueue resQ chunk + + + void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action + atomically $ TBQ.readTBQueue resQ + simplePutBlockLazy :: SimpleStorage h -> LBS.ByteString -> IO (Maybe (Key (Raw LBS.ByteString))) @@ -147,11 +196,16 @@ simplePutBlockLazy s lbs = do let hash = hashObject lbs :: Key (Raw LBS.ByteString) let fn = simpleBlockFileName s hash + wait <- TBQ.newTBQueueIO 1 :: IO (TBQueue ()) + let action = do LBS.writeFile fn lbs + atomically $ TBQ.writeTBQueue wait () atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action + void $ atomically $ TBQ.readTBQueue wait + pure (Just hash) @@ -166,9 +220,9 @@ instance (MonadIO m, (Hashed hash (Raw LBS.ByteString))) putBlock s lbs = liftIO $ simplePutBlockLazy s lbs - getBlock s key = liftIO $ simpleGetBlockLazy s key <&> fmap fromRaw - - + getBlock s key = liftIO $ simpleGetBlockLazy s key + + getChunk s k off size = liftIO $ simpleGetChunkLazy s k off size diff --git a/hbs2-storage-simple/test/TestSimpleStorage.hs b/hbs2-storage-simple/test/TestSimpleStorage.hs index 3dbb6885..5309b598 100644 --- a/hbs2-storage-simple/test/TestSimpleStorage.hs +++ b/hbs2-storage-simple/test/TestSimpleStorage.hs @@ -1,49 +1,90 @@ module TestSimpleStorage where -import Data.Maybe -import Data.ByteString.Lazy qualified as LBS +import Data.Traversable +import Data.Foldable import Control.Concurrent.Async +import Control.Concurrent +import Data.ByteString.Lazy qualified as LBS +import Data.Maybe +import Data.Word import Lens.Micro.Platform -import System.Directory import Prettyprinter +import System.Directory +import System.FilePath.Posix +import System.IO.Temp +import Test.QuickCheck import Test.Tasty.HUnit import HBS2.Hash +import HBS2.Prelude.Plated import HBS2.Storage import HBS2.Storage.Simple + testSimpleStorageInit :: IO () testSimpleStorageInit = do - storage <- simpleStorageInit [StoragePrefix ".storage"] :: IO (SimpleStorage HbSync) - exists <- doesDirectoryExist ( storage ^. storageBlocks ) + withSystemTempDirectory "simpleStorageTest" $ \dir -> do - assertBool "blocks directory exists" exists + let opts = [ StoragePrefix (dir ".storage") + ] - worker <- async (simpleStorageWorker storage) + storage <- simpleStorageInit [StoragePrefix (dir ".storage")] :: IO (SimpleStorage HbSync) - let str = "AAAAAAAAAA" + exists <- doesDirectoryExist ( storage ^. storageBlocks ) - key <- putBlock storage str + assertBool "blocks directory exists" exists - assertBool "key is Just" (isJust key) + worker <- async (simpleStorageWorker storage) - let hash = fromJust key + let pieces = shrink [0x00 .. 0xFF] :: [[Word8]] - print (pretty key) + forConcurrently_ (take 1000 pieces) $ \piece -> do + -- for_ (take 100 pieces) $ \piece -> do - s <- getBlock storage hash + let str = LBS.pack piece - print s + key <- putBlock storage str - assertBool "data read" (isJust s) + -- threadDelay $ 500000 + -- print "ok" - let result = fromJust s + assertBool "key is Just" (isJust key) - assertEqual "written data == read data" str result + let hash = fromJust key - cancel worker + -- print (pretty key) + + s <- getBlock storage hash + + -- print s + + assertBool "data read" (isJust s) + + let result = fromJust s + + assertEqual "written data == read data" str result + + let chuSize = 4 + + let chNum = + let (n,r) = length piece `divMod` chuSize + in if r == 0 then n else succ n + + chunks' <- forM [0,chuSize .. (chNum - 1)*chuSize] $ \o -> do + getChunk storage hash (fromIntegral o) (fromIntegral chuSize) + + let fromChunks = mconcat $ catMaybes chunks' + + -- print (LBS.length str, LBS.length fromChunks, chNum) + + assertEqual "bs from chunks == str" str fromChunks + + pure () + + + cancel worker