From 6de1949e56401d3bd5633cef7db1ac35225db220 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 31 May 2024 12:33:20 +0300 Subject: [PATCH] wip --- hbs2-storage-simple/benchmarks/Main.hs | 42 +++++++------ hbs2-storage-simple/hbs2-storage-simple.cabal | 2 + .../lib/HBS2/Storage/Compact.hs | 60 ++++++++++++++----- 3 files changed, 71 insertions(+), 33 deletions(-) diff --git a/hbs2-storage-simple/benchmarks/Main.hs b/hbs2-storage-simple/benchmarks/Main.hs index 577685b7..da0f8806 100644 --- a/hbs2-storage-simple/benchmarks/Main.hs +++ b/hbs2-storage-simple/benchmarks/Main.hs @@ -30,6 +30,7 @@ import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.Map (Map) import Data.Map qualified as Map +import Data.List.Split (chunksOf) import Streaming.Prelude (Of,Stream) import Streaming.Prelude qualified as S @@ -66,10 +67,7 @@ main = do let s = readDef @Int 256 ss let p = pref - let bss = randomByteStrings n s - let bss2 = randomByteStrings n s - let bss3 = randomByteStrings n s - let bss4 = randomByteStrings n s + bss <- S.toList_ $ randomByteStrings n s -- let bss41 = randomByteStrings (n `div` 2) s -- let bss42 = randomByteStrings (n`div` 2) s -- let bss43 = randomByteStrings (n`div`4) s @@ -95,27 +93,28 @@ main = do print $ "preparing to write" <+> pretty n <+> "chunks" - timeItNamed "write chunks to simple storage" do - S.mapM_ (enqueueBlock storage) bss - - timeItNamed "write chunks to sqlite test" do - withDB env $ transactional do - flip S.mapM_ bss2 $ \bs -> do - let h = hashObject @HbSync bs & pretty & show - insert [qc|insert into wtf (hash,val) values(?,?)|] (h,bs) - timeItNamed "write chunks to log" do fh <- openFile (path "lsm") AppendMode - flip S.mapM_ bss3 $ \bs -> do + forM_ bss $ \bs -> do let h = hashObject @HbSync bs & pretty & show LBS.hPut fh (serialise (h,bs)) hClose fh + timeItNamed "write chunks to simple storage" do + mapM_ (enqueueBlock storage) bss + + timeItNamed "write chunks to sqlite test" do + withDB env $ transactional do + forM_ bss $ \bs -> do + let h = hashObject @HbSync bs & pretty & show + insert [qc|insert into wtf (hash,val) values(?,?)|] (h,bs) + + timeItNamed "write chunks to log 2" do buf <- newIORef (mempty, 0 :: Int) fh <- openFile (path "lsm2") AppendMode - flip S.mapM_ bss3 $ \bs -> do + forM_ bss $ \bs -> do let h = hashObject @HbSync bs & pretty & show num <- atomicModifyIORef buf (\(chunks,sz) -> ((serialise (h,bs) : chunks,sz+1),sz+1)) @@ -127,16 +126,23 @@ main = do LBS.hPut fh (mconcat w) hClose fh + + let cn = length bss `div` 2 + let chu = chunksOf cn bss + timeItNamed "write chunks to compact-storage" do temp <- liftIO $ emptyTempFile "." "compact-storage" sto <- compactStorageOpen mempty temp - flip S.mapM_ bss4 $ \bs -> do - let h = hashObject @HbSync bs - compactStoragePut sto (coerce h) (LBS.toStrict bs) + w <- for chu $ \css -> do + async do + for_ css $ \bs -> do + let h = hashObject @HbSync bs + compactStoragePut sto (coerce h) (LBS.toStrict bs) + mapM_ wait w compactStorageClose sto timeItNamed "write chunks to LSM-mock" do diff --git a/hbs2-storage-simple/hbs2-storage-simple.cabal b/hbs2-storage-simple/hbs2-storage-simple.cabal index da62c866..3833f571 100644 --- a/hbs2-storage-simple/hbs2-storage-simple.cabal +++ b/hbs2-storage-simple/hbs2-storage-simple.cabal @@ -88,6 +88,7 @@ library , temporary , filepattern , unliftio + , vector hs-source-dirs: lib @@ -172,6 +173,7 @@ executable hbs2-storage-simple-benchmarks , safe , serialise , streaming + , split , text , temporary , transformers diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs index 7e968a54..da23d142 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs @@ -18,6 +18,8 @@ import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM import Data.Foldable import Data.Traversable +import Data.Vector (Vector,(!)) +import Data.Vector qualified as V import Codec.Serialise import GHC.Generics -- import System.IO @@ -84,8 +86,9 @@ data CompactStorage = CompactStorage { csHandle :: MVar Handle , csHeaderOff :: IORef EntryOffset - , csSeq :: IORef Integer - , csKeys :: IORef (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer))) + -- , csSeq :: IORef Integer + , csSeq :: TVar Integer + , csKeys :: Vector (TVar (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer)))) } type ForCompactStorage m = MonadIO m @@ -101,6 +104,15 @@ data CompactStorageOpenError = instance Exception CompactStorageOpenError + +buckets :: Int +buckets = 8 + +-- FIXME: buckets-hardcode +getKeyPrefix :: ByteString -> Int +getKeyPrefix bs = maybe 0 (fromIntegral.fst) (BS.uncons bs) `mod` buckets +{-# INLINE getKeyPrefix #-} + compactStorageOpen :: ForCompactStorage m => [CompactStorageOpenOpt] -> FilePath @@ -112,8 +124,11 @@ compactStorageOpen _ fp = do mha <- newMVar ha hoff0 <- newIORef 0 - keys0 <- newIORef mempty - ss <- newIORef 0 + + keys0 <- replicateM buckets (newTVarIO mempty) <&> V.fromList + + -- ss <- newIORef 0 + ss <- newTVarIO 0 if sz == 0 then pure $ CompactStorage mha hoff0 ss keys0 @@ -169,16 +184,21 @@ readIndex sto offset num = liftIO do when (rn /= num) do throwIO BrokenIndex - let new = HM.fromList [ (idxEntryKey e,Left (e,0)) | e <- entries ] + let new = [ (idxEntryKey e,Left (e,0)) | e <- entries ] -- readIndex from newer to older -- so we keep only the newer values in map - modifyIORef' (csKeys sto) (HM.unionWith (\_ b -> b) new) + atomically do + for_ new $ \(k,v) -> do + let tv = csKeys sto ! getKeyPrefix k + modifyTVar tv (HM.insertWith (\_ o -> o) k v) compactStorageCommit :: ForCompactStorage m => CompactStorage -> m () compactStorageCommit sto = liftIO do withMVar (csHandle sto) $ \ha -> do hSeek ha SeekFromEnd 0 - kv <- readIORef (csKeys sto) <&> HM.toList + + kv <- atomically do + mapM readTVar (csKeys sto) <&> mconcat . V.toList . fmap HM.toList let items = [ (k, v) | (k, Right v) <- kv ] @@ -227,26 +247,36 @@ compactStorageCommit sto = liftIO do -- atomically do atomicWriteIORef (csHeaderOff sto) (offLast - headerSize 1) - atomicModifyIORef' (csKeys sto) $ \m -> do - let new = foldl merge m idxEntries - (new, ()) + + atomically do + for_ idxEntries $ \(e,i) -> do + let k = idxEntryKey e + let tv = csKeys sto ! getKeyPrefix k + modifyTVar tv (HM.insertWith merge k (Left (e, i))) where - merge m (el,i) = HM.insertWith mergeEl (idxEntryKey el) (Left (el,i)) m - mergeEl new old = if getSeq new >= getSeq old then new else old + merge new old = if getSeq new >= getSeq old then new else old getSeq = \case Left (_,i) -> i Right (_,i) -> i + compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m () compactStoragePut sto k v = do -- TODO: ASAP-do-not-write-value-if-not-changed - c <- atomicModifyIORef' (csSeq sto) (\n -> (n+1,n)) - atomicModifyIORef' (csKeys sto) (\m -> (HM.insert k (Right (v,c)) m, ())) + + let tvar = csKeys sto ! getKeyPrefix k + + atomically $ do + c <- stateTVar (csSeq sto) (\n -> (n+1,n)) + modifyTVar tvar (HM.insert k (Right (v,c))) compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString) compactStorageGet sto key = do - val <- readIORef (csKeys sto) <&> HM.lookup key + let tvar = csKeys sto ! getKeyPrefix key + + val <- readTVarIO tvar <&> HM.lookup key + case val of Nothing -> pure Nothing Just (Right (s,_)) -> pure (Just s)