storage as queue-based stuff

This commit is contained in:
Dmitry Zuikov 2023-01-10 13:44:09 +03:00
parent 6f6c22ae58
commit 533ea3e0db
6 changed files with 213 additions and 19 deletions

View File

@ -48,8 +48,8 @@ instance Binary (Hash HbSync)
newtype Internal a = Internal a
class Hashed a where
hashObject :: a -> Hash HbSync
class Hashed t a where
hashObject :: a -> Hash t
alphabet :: Alphabet
alphabet = bitcoinAlphabet
@ -58,12 +58,12 @@ getAlphabet :: [Char]
getAlphabet = BS8.unpack (unAlphabet alphabet)
instance Hashed ByteString where
instance Hashed HbSync ByteString where
hashObject s = HbSyncHash $ force $ SB.toShort $ BA.convert digest
where
digest = hash s :: Digest (HashType HbSync)
instance Hashed LBS.ByteString where
instance Hashed HbSync LBS.ByteString where
hashObject s = HbSyncHash $ force $ SB.toShort $ BA.convert digest
where
digest = hashlazy s :: Digest (HashType HbSync)

View File

@ -2,11 +2,21 @@
module HBS2.Storage where
import Data.Kind
import Data.Proxy
import HBS2.Hash
type family Block block :: Type
type family Key block :: Type
class Monad m => Storage a block m | a -> block where
-- class HasHashFunction h a b where
-- hashFun :: Proxy a -> b -> Hash h
class ( Monad m
, Hashed (StorageHash a block) block
) => Storage a block m | a -> block where
type family StorageHash a block :: Type
putBlock :: a -> Block block -> m (Maybe (Key block))
getBlock :: a -> Key block -> m (Maybe (Block block))

View File

@ -61,12 +61,15 @@ library
-- other-modules:
-- other-extensions:
build-depends: base ^>=4.15.1.0, hbs2-core
, async
, bytestring
, containers
, directory
, filepath
, uniplate
, microlens-platform
, prettyprinter
, stm
, uniplate
hs-source-dirs: lib
@ -78,6 +81,7 @@ test-suite test
default-language: Haskell2010
other-modules:
TestSimpleStorage
-- other-extensions:
@ -87,9 +91,11 @@ test-suite test
build-depends:
base ^>=4.15.1.0, hbs2-storage-simple, hbs2-core
, async
, bytestring
, cborg
, containers
, directory
, hashable
, microlens-platform
, mtl

View File

@ -1,53 +1,174 @@
{-# Language TemplateHaskell #-}
module HBS2.Storage.Simple where
import Control.Concurrent.Async
import Control.Exception (try,tryJust)
import Control.Monad
import Control.Monad.IO.Class
import System.FilePath.Posix
import Lens.Micro.Platform
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.Foldable
import Data.List qualified as L
import Lens.Micro.Platform
import Prettyprinter
import System.Directory
import System.FilePath.Posix
import System.IO.Error
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as TBQ
import Control.Concurrent.STM.TBQueue (TBQueue)
import HBS2.Hash
import HBS2.Storage
import HBS2.Prelude
import HBS2.Prelude.Plated
-- NOTE: random accessing files in a git-like storage
-- causes to file handles exhaust.
-- Therefore, those handles MUST be in something like
-- pool.
--
-- I.e. we're should use something like TBChan to queue
-- operations and wait in getBlock 'till it's completion
-- in order to make the disk access in this fashion safe
newtype Raw a = Raw { fromRaw :: a }
type instance Block (Raw LBS.ByteString) = LBS.ByteString
type instance Key (Raw LBS.ByteString) = Hash HbSync
newtype StoragePrefix = StoragePrefix { fromPrefix :: FilePath }
newtype StoragePrefix = StoragePrefix { fromPrefix :: FilePath }
deriving stock (Data,Show)
deriving newtype (IsString)
newtype SimpleStorage =
newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int }
deriving stock (Data,Show)
deriving newtype (Eq,Ord,Enum,Num,Integral,Real)
data SimpleStorage a =
SimpleStorage
{ _storageDir :: FilePath
{ _storageDir :: FilePath
, _storageOpQ :: TBQueue ( IO () )
}
makeLenses ''SimpleStorage
storageBlocksDir :: SimpleStorage -> FilePath
storageBlocksDir s = view storageDir s </> "blocks"
storageBlocks :: SimpleGetter SimpleStorage FilePath
storageBlocks :: SimpleGetter (SimpleStorage h) FilePath
storageBlocks = to f
where
f b = _storageDir b </> "blocks"
simpleStorageInit :: (MonadIO m, Data opts) => opts -> m SimpleStorage
simpleStorageInit :: (MonadIO m, Data opts) => opts -> m (SimpleStorage h)
simpleStorageInit opts = liftIO $ do
let prefix = uniLastDef "." opts :: StoragePrefix
let qSize = uniLastDef 10 opts :: StorageQueueSize
pdir <- canonicalizePath (fromPrefix prefix)
tbq <- TBQ.newTBQueueIO (fromIntegral (fromQueueSize qSize))
let stor = SimpleStorage
{ _storageDir = pdir
, _storageOpQ = tbq
}
createDirectoryIfMissing True (stor ^. storageBlocks)
let alph = getAlphabet
for_ alph $ \a -> do
createDirectoryIfMissing True ( (stor ^. storageBlocks) </> L.singleton a )
pure stor
instance MonadIO m => Storage SimpleStorage (Raw LBS.ByteString) m where
simpleStorageWorker :: SimpleStorage h -> IO ()
simpleStorageWorker ss = do
readOps <- async $ forever $ do
join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ )
writeOps <- async $ forever $ do
join $ atomically $ TBQ.readTBQueue ( ss ^. storageOpQ )
void $ waitAnyCatchCancel [readOps,writeOps]
simpleBlockFileName :: SimpleStorage h -> Hash HbSync -> FilePath
simpleBlockFileName ss h = path
where
(pref,suf) = splitAt 1 (show (pretty h))
path = view storageBlocks ss </> pref </> suf
-- NOTE: reads whole file into memory!
-- if size is too big --- it will
-- cause consequences!
--
-- However, we can not hold the file
-- handle in lazy bytestring, because
-- here maybe too many open files
--
-- So, the block MUST be small
--
simpleGetBlockLazy :: SimpleStorage h
-> Key (Raw LBS.ByteString)
-> IO (Maybe (Raw LBS.ByteString))
simpleGetBlockLazy s key = do
resQ <- TBQ.newTBQueueIO 1 :: IO (TBQueue (Maybe (Raw LBS.ByteString)))
let fn = simpleBlockFileName s key
let action = do
r <- tryJust (guard . isDoesNotExistError)
(BS.readFile fn <&> LBS.fromStrict)
result <- case r of
Right bytes -> pure (Just (Raw bytes))
Left _ -> pure Nothing
void $ atomically $ TBQ.writeTBQueue resQ result
void $ atomically $ TBQ.writeTBQueue ( s ^. storageOpQ ) action
atomically $ TBQ.readTBQueue resQ
-- non-blocking version, always returns Just hash
-- maybe it's not good
simplePutBlockLazy :: SimpleStorage h
-> LBS.ByteString
-> IO (Maybe (Key (Raw LBS.ByteString)))
simplePutBlockLazy s lbs = do
let hash = hashObject lbs :: Key (Raw LBS.ByteString)
let fn = simpleBlockFileName s hash
let action = do
LBS.writeFile fn lbs
atomically $ TBQ.writeTBQueue (s ^. storageOpQ) action
pure (Just hash)
instance Hashed HbSync (Raw LBS.ByteString) where
hashObject (Raw s) = hashObject s
instance (MonadIO m, (Hashed hash (Raw LBS.ByteString)))
=> Storage (SimpleStorage hash) (Raw LBS.ByteString) m where
type instance StorageHash (SimpleStorage hash) (Raw LBS.ByteString) = hash
putBlock s lbs = liftIO $ simplePutBlockLazy s lbs
getBlock s key = liftIO $ simpleGetBlockLazy s key <&> fmap fromRaw

View File

@ -1,10 +1,18 @@
module Main where
import Test.Tasty
import Test.Tasty.HUnit
import TestSimpleStorage
import HBS2.Storage
main :: IO ()
main = defaultMain $
testGroup "root" [ testCase "testSimpleStorageInit" testSimpleStorageInit
]
main = do
error "oops"

View File

@ -0,0 +1,49 @@
module TestSimpleStorage where
import Data.Maybe
import Data.ByteString.Lazy qualified as LBS
import Control.Concurrent.Async
import Lens.Micro.Platform
import System.Directory
import Prettyprinter
import Test.Tasty.HUnit
import HBS2.Hash
import HBS2.Storage
import HBS2.Storage.Simple
testSimpleStorageInit :: IO ()
testSimpleStorageInit = do
storage <- simpleStorageInit [StoragePrefix ".storage"] :: IO (SimpleStorage HbSync)
exists <- doesDirectoryExist ( storage ^. storageBlocks )
assertBool "blocks directory exists" exists
worker <- async (simpleStorageWorker storage)
let str = "AAAAAAAAAA"
key <- putBlock storage str
assertBool "key is Just" (isJust key)
let hash = fromJust key
print (pretty key)
s <- getBlock storage hash
print s
assertBool "data read" (isJust s)
let result = fromJust s
assertEqual "written data == read data" str result
cancel worker