mirror of https://github.com/voidlizard/hbs2
wip, deletion
This commit is contained in:
parent
1b003ed124
commit
5afd9c6048
|
@ -6,7 +6,11 @@ module HBS2.Storage.NCQ3
|
|||
, ncqStorageOpen3
|
||||
, ncqStorageRun3
|
||||
, ncqPutBS
|
||||
, ncqGetEntryBS
|
||||
, IsTomb(..)
|
||||
, ncqLocate
|
||||
, ncqDelEntry
|
||||
, ncqEntrySize
|
||||
)
|
||||
where
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import HBS2.Storage.NCQ3.Internal.Run
|
|||
import HBS2.Storage.NCQ3.Internal.Memtable
|
||||
import HBS2.Storage.NCQ3.Internal.Files
|
||||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
import Network.ByteOrder qualified as N
|
||||
|
@ -202,3 +203,66 @@ ncqTryLoadState me@NCQStorage3{..} = do
|
|||
pure True
|
||||
|
||||
|
||||
ncqTombEntrySize :: NCQSize
|
||||
ncqTombEntrySize = ncqSLen + ncqKeyLen + ncqPrefixLen
|
||||
|
||||
ncqIsTombEntrySize :: Integral a => a -> Bool
|
||||
ncqIsTombEntrySize s = fromIntegral s <= ncqTombEntrySize
|
||||
{-# INLINE ncqIsTombEntrySize #-}
|
||||
|
||||
ncqEntryUnwrap :: ByteString
|
||||
-> (ByteString, Either ByteString (NCQSectionType, ByteString))
|
||||
ncqEntryUnwrap source = do
|
||||
let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source)
|
||||
(k, ncqEntryUnwrapValue v)
|
||||
{-# INLINE ncqEntryUnwrap #-}
|
||||
|
||||
ncqEntryUnwrapValue :: ByteString
|
||||
-> Either ByteString (NCQSectionType, ByteString)
|
||||
ncqEntryUnwrapValue v = case ncqIsMeta v of
|
||||
Just meta -> Right (meta, BS.drop ncqPrefixLen v)
|
||||
Nothing -> Left v
|
||||
{-# INLINE ncqEntryUnwrapValue #-}
|
||||
|
||||
|
||||
class IsTomb a where
|
||||
ncqIsTomb :: a -> Bool
|
||||
|
||||
instance IsTomb IndexEntry where
|
||||
ncqIsTomb (IndexEntry _ _ s) = s <= (ncqSLen + ncqKeyLen + ncqPrefixLen)
|
||||
|
||||
instance IsTomb Location where
|
||||
ncqIsTomb = \case
|
||||
InFossil _ _ s -> ncqIsTombEntrySize s
|
||||
InMemory bs -> case ncqEntryUnwrap bs of
|
||||
(_, Right (T, _)) -> True
|
||||
_ -> False
|
||||
|
||||
ncqGetEntryBS :: MonadUnliftIO m => NCQStorage3 -> Location -> m (Maybe ByteString)
|
||||
ncqGetEntryBS me = \case
|
||||
InMemory bs -> pure $ Just bs
|
||||
InFossil fk off size -> do
|
||||
try @_ @SomeException (ncqGetCachedData me fk) >>= \case
|
||||
Left{} -> pure Nothing
|
||||
Right (CachedData mmap) -> do
|
||||
pure $ Just $ BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap
|
||||
|
||||
ncqEntrySize :: forall a . Integral a => Location -> a
|
||||
ncqEntrySize = \case
|
||||
InFossil _ _ size -> fromIntegral size
|
||||
InMemory bs -> fromIntegral (BS.length bs)
|
||||
|
||||
ncqDelEntry :: MonadUnliftIO m
|
||||
=> NCQStorage3
|
||||
-> HashRef
|
||||
-> m ()
|
||||
ncqDelEntry me href = do
|
||||
-- всегда пишем tomb и надеемся на лучшее
|
||||
-- merge/compact разберутся
|
||||
-- однако не пишем, если записи еще нет
|
||||
ncqLocate me href >>= \case
|
||||
Just loc | not (ncqIsTomb loc) -> do
|
||||
void $ ncqPutBS me (Just T) (Just href) ""
|
||||
_ -> none
|
||||
|
||||
|
||||
|
|
|
@ -36,9 +36,10 @@ ncqIndexPayloadSize = fileKey + fileOffset + blockSize + padding
|
|||
|
||||
unpackIndexEntry :: ByteString -> IndexEntry
|
||||
unpackIndexEntry entryBs = do
|
||||
let (fks,rest1) = BS.splitAt 4 entryBs
|
||||
let (offs,rest2) = BS.splitAt 8 rest1
|
||||
let ss = BS.take 4 rest2
|
||||
let (fks,rest1) = BS.splitAt 8 entryBs -- FileKey: 8
|
||||
let (offs,rest2) = BS.splitAt 8 rest1 -- Offset: 8
|
||||
let ss = BS.take 4 rest2 -- Size: 4
|
||||
-- padding: 0?
|
||||
let fk = FileKey (N.word64 fks)
|
||||
let off = N.word64 offs
|
||||
let size = N.word32 ss
|
||||
|
|
|
@ -18,6 +18,7 @@ import HBS2.Storage.NCQ3
|
|||
import HBS2.Storage.NCQ3.Internal.Files
|
||||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
import HBS2.Storage.NCQ3.Internal.Fossil
|
||||
import HBS2.Storage.NCQ3.Internal
|
||||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
|
@ -48,6 +49,7 @@ import Control.Concurrent.STM qualified as STM
|
|||
import Data.List qualified as List
|
||||
import Control.Monad.Trans.Cont
|
||||
import System.IO.Temp qualified as Temp
|
||||
import System.Random.Stateful
|
||||
import UnliftIO
|
||||
import UnliftIO.IO.File
|
||||
import UnliftIO.IO as IO
|
||||
|
@ -297,6 +299,112 @@ ncq3Tests = do
|
|||
entry $ bindMatch "test:ncq3:lookup1" $ nil_ $ \e -> do
|
||||
runTest (testNCQ3Lookup1 e)
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq3:del1" $ nil_ $ \syn -> do
|
||||
|
||||
runTest $ \TestEnv{..} -> do
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
let dir = testEnvDir
|
||||
|
||||
let (opts, argz) = splitOpts [("-m",0)] syn
|
||||
let n = headDef 10000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
|
||||
let merge = or [ True | ListVal [StringLike "-m"] <- opts ]
|
||||
|
||||
thashes <- newTVarIO mempty
|
||||
|
||||
ncqWithStorage3 dir $ \sto@NCQStorage3{..} -> do
|
||||
|
||||
notice $ "write+immediate delete" <+> pretty n <+> "records"
|
||||
|
||||
hashes <- replicateM n do
|
||||
|
||||
h <- ncqPutBS sto (Just B) Nothing =<< liftIO (genRandomBS g (64*1024))
|
||||
ncqDelEntry sto h
|
||||
|
||||
t <- (ncqLocate sto h <&> fmap ncqIsTomb)
|
||||
>>= orThrowUser ("missed" <+> pretty h)
|
||||
|
||||
liftIO $ assertBool (show $ "tomb/1" <+> pretty h) t
|
||||
|
||||
pure h
|
||||
|
||||
atomically $ writeTVar thashes (HS.fromList hashes)
|
||||
|
||||
flip runContT pure $ callCC \exit -> do
|
||||
|
||||
for_ hashes $ \h -> do
|
||||
loc <- lift (ncqLocate sto h)
|
||||
>>= orThrowUser ("missed" <+> pretty h)
|
||||
|
||||
unless (ncqEntrySize loc == ncqTombEntrySize) do
|
||||
notice $ pretty h <+> pretty (ncqEntrySize loc) <+> pretty ncqTombEntrySize
|
||||
|
||||
liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc)
|
||||
|
||||
ncqIndexCompactFull sto
|
||||
|
||||
ncqWithStorage3 dir $ \sto -> do
|
||||
-- notice "check deleted"
|
||||
hashes <- readTVarIO thashes
|
||||
|
||||
for_ hashes $ \h -> do
|
||||
|
||||
ncqLocate sto h >>= \case
|
||||
Nothing -> notice $ "not-found" <+> pretty h
|
||||
Just loc -> do
|
||||
liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc)
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq3:del2" $ nil_ $ \syn -> do
|
||||
|
||||
runTest $ \TestEnv{..} -> do
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
let dir = testEnvDir
|
||||
|
||||
let (_, argz) = splitOpts [] syn
|
||||
let n = headDef 50000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
let p0 = headDef 0.25 [ realToFrac x | LitScientificVal x <- drop 1 argz ]
|
||||
|
||||
thashes <- newTVarIO mempty
|
||||
|
||||
ncqWithStorage3 dir $ \sto@NCQStorage3{..} -> do
|
||||
|
||||
sizes <- replicateM n $ liftIO $ uniformRM (32*1024, 256*1024) g
|
||||
|
||||
notice $ "write" <+> pretty n <+> "blocks"
|
||||
pooledForConcurrentlyN_ 16 sizes $ \s -> do
|
||||
h <- ncqPutBS sto (Just B) Nothing =<< liftIO (genRandomBS g s)
|
||||
|
||||
p1 <- liftIO $ uniformRM @Double (0, 1) g
|
||||
|
||||
when (p1 < p0) do
|
||||
ncqDelEntry sto h
|
||||
atomically $ modifyTVar thashes (HS.insert h)
|
||||
|
||||
deleted <- readTVarIO thashes
|
||||
|
||||
tombs <- for (HS.toList deleted) $ \d -> do
|
||||
ncqLocate sto d <&> maybe False ncqIsTomb
|
||||
|
||||
let tnum = sum [ 1 | x <- tombs, x ]
|
||||
|
||||
notice $ "should be deleted" <+> pretty (HS.size deleted) <+> "/" <+> pretty tnum
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
ncqIndexCompactFull sto
|
||||
-- ncqCompactStep sto
|
||||
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
let dt = timeSpecDeltaSeconds @(Fixed E6) t0 t1
|
||||
|
||||
notice $ "ncqCompactStep time" <+> pretty dt
|
||||
|
||||
none
|
||||
|
||||
|
||||
testNCQ3Concurrent1 :: MonadUnliftIO m
|
||||
=> Bool
|
||||
-> Int
|
||||
|
|
Loading…
Reference in New Issue