diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs index c3abd6ab..d0393d85 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Compact.hs @@ -1,5 +1,5 @@ -{-# Language PatternSynonyms #-} -{-# Language RecordWildCards #-} +{-# LANGUAGE PatternSynonyms #-} +{-# Language ViewPatterns #-} module HBS2.Storage.Compact where @@ -83,12 +83,34 @@ data Header = } deriving stock (Show,Generic) +data E = New ByteString + | Off IndexEntry + | Del IndexEntry + +data Entry = Entry Integer E + +pattern Fresh :: Entry -> Entry +pattern Fresh e <- e@(Entry _ ( isFresh -> True )) + +pattern Tomb :: Entry -> Entry +pattern Tomb e <- e@(Entry _ ( isTomb -> True )) + +isTomb :: E -> Bool +isTomb (Off e) = idxEntryTomb e +isTomb _ = False + +isFresh :: E -> Bool +isFresh e = case e of + New{} -> True + Del{} -> True + _ -> False + data CompactStorage = CompactStorage { csHandle :: MVar Handle , csHeaderOff :: IORef EntryOffset , csSeq :: TVar Integer - , csKeys :: Vector (TVar (HashMap ByteString (Either (IndexEntry,Integer) (ByteString,Integer)))) + , csKeys :: Vector (TVar (HashMap ByteString Entry)) } type ForCompactStorage m = MonadIO m @@ -103,8 +125,6 @@ data CompactStorageOpenError = instance Exception CompactStorageOpenError - - buckets :: Int buckets = 8 @@ -182,7 +202,7 @@ readIndex sto offset num = liftIO do when (rn /= num) do throwIO BrokenIndex - let new = [ (idxEntryKey e,Left (e,0)) | e <- entries ] + let new = [ (idxEntryKey e,Entry 0 (Off e)) | e <- entries ] -- readIndex from newer to older -- so we keep only the newer values in map atomically do @@ -198,7 +218,7 @@ compactStorageCommit sto = liftIO do kv <- atomically do mapM readTVar (csKeys sto) <&> mconcat . V.toList . fmap HM.toList - let items = [ (k, v) | (k, Right v) <- kv ] + let items = [ (k, v) | (k, v@Fresh{}) <- kv ] unless (List.null items) do @@ -212,11 +232,19 @@ compactStorageCommit sto = liftIO do idxEntries <- flip fix (off0, items, mempty) $ \next (off, what, idx) -> do case what of [] -> pure idx - ((k,(v,i)):rest) -> do + + ((_,Entry i (Del e)):rest) | not (idxEntryTomb e) -> do + next (off + 0, rest, (e { idxEntryTomb = True },i) : idx) + + ((k,Entry i (New v)):rest) -> do BS.hPut ha v let sz = fromIntegral $ BS.length v next (off + sz, rest, (IndexEntry (fromIntegral off) (fromIntegral sz) 0 False k,i) : idx) + ((_,Entry _ _):rest) -> do + next (off + 0, rest, idx) + + offIdx0 <- hTell ha <&> fromIntegral -- write index @@ -250,14 +278,43 @@ compactStorageCommit sto = liftIO do for_ idxEntries $ \(e,i) -> do let k = idxEntryKey e let tv = csKeys sto ! getKeyPrefix k - modifyTVar tv (HM.insertWith merge k (Left (e, i))) + modifyTVar tv (HM.alter (doAlter (Entry i (Off e))) k) where - merge new old = if getSeq new >= getSeq old then new else old - getSeq = \case - Left (_,i) -> i - Right (_,i) -> i + doAlter y@(Entry i (Off e)) v0 = case v0 of + -- deleted-during-commit + Nothing -> Just (Entry i (Del e)) + + Just x | getSeq x > getSeq y -> Just x + | otherwise -> Just y + + doAlter _ v = v + + getSeq = \case + Entry i _ -> i + +compactStorageDel :: ForCompactStorage m => CompactStorage -> ByteString -> m () +compactStorageDel sto key = do + + let tvar = csKeys sto ! getKeyPrefix key + val <- readTVarIO tvar <&> HM.lookup key + + case val of + Nothing -> pure () + Just (Entry i (Del _)) -> pure () + + Just (Entry _ (Off e)) -> do + atomically do + j <- newSequenceSTM sto + modifyTVar tvar (HM.insert key (Entry j (Del e))) + + Just (Entry i (New v)) -> do + -- FIXME: if-commit-in-progress-then-put-tomb + atomically $ modifyTVar tvar (HM.delete key) + +newSequenceSTM :: CompactStorage -> STM Integer +newSequenceSTM sto = stateTVar (csSeq sto) (\n -> (n+1,n)) compactStoragePut :: ForCompactStorage m => CompactStorage -> ByteString -> ByteString -> m () compactStoragePut sto k v = do @@ -266,19 +323,20 @@ compactStoragePut sto k v = do let tvar = csKeys sto ! getKeyPrefix k atomically $ do - c <- stateTVar (csSeq sto) (\n -> (n+1,n)) - modifyTVar tvar (HM.insert k (Right (v,c))) + c <- newSequenceSTM sto + modifyTVar tvar (HM.insert k (Entry c (New v))) compactStorageGet :: ForCompactStorage m => CompactStorage -> ByteString -> m (Maybe ByteString) compactStorageGet sto key = do let tvar = csKeys sto ! getKeyPrefix key - val <- readTVarIO tvar <&> HM.lookup key case val of - Nothing -> pure Nothing - Just (Right (s,_)) -> pure (Just s) - Just (Left (e,_)) -> liftIO do + Nothing -> pure Nothing + Just (Tomb{}) -> pure Nothing + Just (Entry _ (Del _)) -> pure Nothing + Just (Entry _ (New s)) -> pure (Just s) + Just (Entry _ (Off e)) -> liftIO do r <- withMVar (csHandle sto) $ \ha -> do try @_ @IOException do hSeek ha AbsoluteSeek (fromIntegral $ idxEntryOffset e) @@ -312,8 +370,6 @@ compactStorageFindLiveHeads path = liftIO do >>= either (const mzero) pure <&> view _3 - traceM $ show ("JOPA1", fwdOff, fwd) - h@(o,header) <- MaybeT $ readHeader mv (Just $ fromIntegral fwd)