This commit is contained in:
Dmitry Zuikov 2024-06-03 05:50:07 +03:00
parent fedbe30323
commit 1a9258ee38
1 changed files with 60 additions and 12 deletions

View File

@ -10,6 +10,7 @@ module HBS2.Storage.Compact
, compactStoragePut
, compactStorageGet
, compactStorageDel
, compactStorageSize
, compactStorageFindLiveHeads
, compactStorageRun
, HBS2.Storage.Compact.keys
@ -17,6 +18,7 @@ module HBS2.Storage.Compact
, HBS2.Storage.Compact.put
, HBS2.Storage.Compact.get
, HBS2.Storage.Compact.del
, HBS2.Storage.Compact.commit
) where
import HBS2.Clock
@ -102,6 +104,7 @@ data Header =
deriving stock (Show,Generic)
data E = New ByteString
| Upd ByteString IndexEntry
| Off IndexEntry
| Del IndexEntry
@ -113,9 +116,15 @@ pattern Fresh e <- e@(Entry _ ( isFresh -> True ))
pattern Tomb :: Entry -> Entry
pattern Tomb e <- e@(Entry _ ( isTomb -> True ))
pattern Existed :: Entry -> IndexEntry -> Entry
pattern Existed e w <- e@(Entry _ (existed -> Just w))
{-# COMPLETE Existed #-}
isAlive :: Entry -> Bool
isAlive = \case
Entry _ New{} -> True
Entry _ Upd{} -> True
Entry _ e@(Off{}) -> not (isTomb e)
_ -> False
@ -123,10 +132,18 @@ isTomb :: E -> Bool
isTomb (Off e) = idxEntryTomb e
isTomb _ = False
existed :: E -> Maybe IndexEntry
existed = \case
Off e -> Just e
Upd _ e -> Just e
Del e -> Just e
_ -> Nothing
isFresh :: E -> Bool
isFresh e = case e of
New{} -> True
Del{} -> True
Upd{} -> True
_ -> False
type Bucket = TVar (HashMap ByteString Entry)
@ -255,6 +272,8 @@ compactStorageCommit sto = liftIO do
withMVar (csHandle sto) $ \ha -> do
hSeek ha SeekFromEnd 0
mma <- readTVarIO (csMMapped sto)
kv <- atomically do
mapM readTVar (csKeys sto) <&> mconcat . V.toList . fmap HM.toList
@ -276,6 +295,18 @@ compactStorageCommit sto = liftIO do
((_,Entry i (Del e)):rest) | not (idxEntryTomb e) -> do
next (off + 0, rest, (e { idxEntryTomb = True },i) : idx)
-- NOTE: this-might-be-slow
-- но это правильно, поскольку
-- у нас **compact** storage и мы не хотим,
-- что бы его раздувало одинаковыми значениями
((k,Entry i (Upd v e)):rest) -> do
if readValue mma e == v then do
next (off + 0, rest, idx)
else do
BS.hPut ha v
let sz = fromIntegral $ BS.length v
next (off + sz, rest, (IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k,i) : idx)
((k,Entry i (New v)):rest) -> do
BS.hPut ha v
let sz = fromIntegral $ BS.length v
@ -352,20 +383,21 @@ compactStorageDel sto key = do
case val of
Nothing -> pure ()
Just (Entry i (Del _)) -> pure ()
Just (Entry _ (Off e)) -> do
atomically do
j <- newSequenceSTM sto
modifyTVar tvar (HM.insert key (Entry j (Del e)))
succUncommitedSTM sto 1
Just (Entry i (New v)) -> do
Just (Entry _ (New _)) -> do
-- FIXME: if-commit-in-progress-then-put-tomb
atomically do
modifyTVar tvar (HM.delete key)
succUncommitedSTM sto 1
Just (Existed e what) -> do
atomically do
j <- newSequenceSTM sto
modifyTVar tvar (HM.insert key (Entry j (Del what)))
succUncommitedSTM sto 1
newSequenceSTM :: CompactStorage k -> STM Integer
newSequenceSTM sto = stateTVar (csSeq sto) (\n -> (n+1,n))
@ -375,13 +407,27 @@ succUncommitedSTM sto k = modifyTVar (csUncommitted sto) (+k)
resetUncommitedSTM :: CompactStorage k -> STM ()
resetUncommitedSTM sto = writeTVar (csUncommitted sto) 0
compactStorageSize :: ForCompactStorage m => CompactStorage k -> m Integer
compactStorageSize sto = liftIO $ withMVar (csHandle sto) hFileSize
compactStoragePut :: ForCompactStorage m => CompactStorage k -> ByteString -> ByteString -> m ()
compactStoragePut sto k v = do
let tvar = getBucket sto k
atomically $ do
c <- newSequenceSTM sto
modifyTVar tvar (HM.insert k (Entry c (New v)))
modifyTVar tvar (HM.insertWith check k (Entry c (New v)))
where
check (Entry i (New v1)) (Entry _ (Off e)) = Entry i (Upd v1 e)
check x _ = x
readValue :: MMaped -> IndexEntry -> ByteString
readValue what e = do
let ptr = what & view _1
BS.fromForeignPtr ptr (fromIntegral $ idxEntryOffset e)
(fromIntegral $ idxEntrySize e)
{-# INLINE readValue #-}
compactStorageGet :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe ByteString)
compactStorageGet sto key = do
@ -393,10 +439,8 @@ compactStorageGet sto key = do
Just (Tomb{}) -> pure Nothing
Just (Entry _ (Del _)) -> pure Nothing
Just (Entry _ (New s)) -> pure (Just s)
Just (Entry _ (Off e)) -> liftIO do
ptr <- readTVarIO (csMMapped sto) <&> view _1
pure $ Just $ BS.fromForeignPtr ptr (fromIntegral $ idxEntryOffset e)
(fromIntegral $ idxEntrySize e)
Just (Entry _ (Upd s _)) -> pure (Just s)
Just (Entry _ (Off e)) -> Just <$> (readTVarIO (csMMapped sto) <&> flip readValue e)
compactStorageExists :: ForCompactStorage m => CompactStorage k -> ByteString -> m (Maybe Integer)
compactStorageExists sto key = do
@ -558,6 +602,10 @@ del :: ForCompactStorage m
del = compactStorageDel
commit :: ForCompactStorage m
=> CompactStorage sto
-> m ()
commit = compactStorageCommit
-- Storage instance