From 1a9258ee3854bb08bafe14960297c46253eed62d Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Mon, 3 Jun 2024 05:50:07 +0300 Subject: [PATCH] wip --- .../lib/HBS2/Storage/Compact.hs | 72 +++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs index 57ed6505..6f559e44 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs @@ -10,6 +10,7 @@ module HBS2.Storage.Compact , compactStoragePut , compactStorageGet , compactStorageDel + , compactStorageSize , compactStorageFindLiveHeads , compactStorageRun , HBS2.Storage.Compact.keys @@ -17,6 +18,7 @@ module HBS2.Storage.Compact , HBS2.Storage.Compact.put , HBS2.Storage.Compact.get , HBS2.Storage.Compact.del + , HBS2.Storage.Compact.commit ) where import HBS2.Clock @@ -102,6 +104,7 @@ data Header = deriving stock (Show,Generic) data E = New ByteString + | Upd ByteString IndexEntry | Off IndexEntry | Del IndexEntry @@ -113,9 +116,15 @@ pattern Fresh e <- e@(Entry _ ( isFresh -> True )) pattern Tomb :: Entry -> Entry pattern Tomb e <- e@(Entry _ ( isTomb -> True )) +pattern Existed :: Entry -> IndexEntry -> Entry +pattern Existed e w <- e@(Entry _ (existed -> Just w)) + +{-# COMPLETE Existed #-} + isAlive :: Entry -> Bool isAlive = \case Entry _ New{} -> True + Entry _ Upd{} -> True Entry _ e@(Off{}) -> not (isTomb e) _ -> False @@ -123,10 +132,18 @@ isTomb :: E -> Bool isTomb (Off e) = idxEntryTomb e isTomb _ = False +existed :: E -> Maybe IndexEntry +existed = \case + Off e -> Just e + Upd _ e -> Just e + Del e -> Just e + _ -> Nothing + isFresh :: E -> Bool isFresh e = case e of New{} -> True Del{} -> True + Upd{} -> True _ -> False type Bucket = TVar (HashMap ByteString Entry) @@ -255,6 +272,8 @@ compactStorageCommit sto = liftIO do withMVar (csHandle sto) $ \ha -> do hSeek ha SeekFromEnd 0 + mma <- readTVarIO (csMMapped sto) + kv <- atomically do mapM readTVar (csKeys sto) <&> mconcat . V.toList . fmap HM.toList @@ -276,6 +295,18 @@ compactStorageCommit sto = liftIO do ((_,Entry i (Del e)):rest) | not (idxEntryTomb e) -> do next (off + 0, rest, (e { idxEntryTomb = True },i) : idx) + -- NOTE: this-might-be-slow + -- но это правильно, поскольку + -- у нас **compact** storage и мы не хотим, + -- что бы его раздувало одинаковыми значениями + ((k,Entry i (Upd v e)):rest) -> do + if readValue mma e == v then do + next (off + 0, rest, idx) + else do + BS.hPut ha v + let sz = fromIntegral $ BS.length v + next (off + sz, rest, (IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k,i) : idx) + ((k,Entry i (New v)):rest) -> do BS.hPut ha v let sz = fromIntegral $ BS.length v @@ -352,20 +383,21 @@ compactStorageDel sto key = do case val of Nothing -> pure () + Just (Entry i (Del _)) -> pure () - Just (Entry _ (Off e)) -> do - atomically do - j <- newSequenceSTM sto - modifyTVar tvar (HM.insert key (Entry j (Del e))) - succUncommitedSTM sto 1 - - Just (Entry i (New v)) -> do + Just (Entry _ (New _)) -> do -- FIXME: if-commit-in-progress-then-put-tomb atomically do modifyTVar tvar (HM.delete key) succUncommitedSTM sto 1 + Just (Existed e what) -> do + atomically do + j <- newSequenceSTM sto + modifyTVar tvar (HM.insert key (Entry j (Del what))) + succUncommitedSTM sto 1 + newSequenceSTM :: CompactStorage k -> STM Integer newSequenceSTM sto = stateTVar (csSeq sto) (\n -> (n+1,n)) @@ -375,13 +407,27 @@ succUncommitedSTM sto k = modifyTVar (csUncommitted sto) (+k) resetUncommitedSTM :: CompactStorage k -> STM () resetUncommitedSTM sto = writeTVar (csUncommitted sto) 0 +compactStorageSize :: ForCompactStorage m => CompactStorage k -> m Integer +compactStorageSize sto = liftIO $ withMVar (csHandle sto) hFileSize + compactStoragePut :: ForCompactStorage m => CompactStorage k -> ByteString -> ByteString -> m () compactStoragePut sto k v = do let tvar = getBucket sto k atomically $ do c <- newSequenceSTM sto - modifyTVar tvar (HM.insert k (Entry c (New v))) + modifyTVar tvar (HM.insertWith check k (Entry c (New v))) + + where + check (Entry i (New v1)) (Entry _ (Off e)) = Entry i (Upd v1 e) + check x _ = x + +readValue :: MMaped -> IndexEntry -> ByteString +readValue what e = do + let ptr = what & view _1 + BS.fromForeignPtr ptr (fromIntegral $ idxEntryOffset e) + (fromIntegral $ idxEntrySize e) +{-# INLINE readValue #-} compactStorageGet :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe ByteString) compactStorageGet sto key = do @@ -393,10 +439,8 @@ compactStorageGet sto key = do Just (Tomb{}) -> pure Nothing Just (Entry _ (Del _)) -> pure Nothing Just (Entry _ (New s)) -> pure (Just s) - Just (Entry _ (Off e)) -> liftIO do - ptr <- readTVarIO (csMMapped sto) <&> view _1 - pure $ Just $ BS.fromForeignPtr ptr (fromIntegral $ idxEntryOffset e) - (fromIntegral $ idxEntrySize e) + Just (Entry _ (Upd s _)) -> pure (Just s) + Just (Entry _ (Off e)) -> Just <$> (readTVarIO (csMMapped sto) <&> flip readValue e) compactStorageExists :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe Integer) compactStorageExists sto key = do @@ -558,6 +602,10 @@ del :: ForCompactStorage m del = compactStorageDel +commit :: ForCompactStorage m + => CompactStorage sto + -> m () +commit = compactStorageCommit -- Storage instance