mirror of https://github.com/voidlizard/hbs2
wip, NCQ3.NCQStorage basic test
This commit is contained in:
parent
6f3f9cae24
commit
c7058a9b6f
|
@ -95,6 +95,7 @@ type IsRefPubKey s = ( Eq (PubKey 'Sign s)
|
|||
type ForSomeRefKey a = ( Hashed HbSync a )
|
||||
|
||||
newtype SomeRefKey a = SomeRefKey a
|
||||
deriving newtype (Eq,Hashable)
|
||||
|
||||
instance RefMetaData (SomeRefKey a)
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ library
|
|||
exposed-modules:
|
||||
HBS2.Storage.NCQ3
|
||||
HBS2.Storage.NCQ3.Internal
|
||||
HBS2.Storage.NCQ3.Internal.Class
|
||||
HBS2.Storage.NCQ3.Internal.Types
|
||||
HBS2.Storage.NCQ3.Internal.Prelude
|
||||
HBS2.Storage.NCQ3.Internal.State
|
||||
|
|
|
@ -136,6 +136,10 @@ ncqFullTombLen :: forall a . Integral a => a
|
|||
ncqFullTombLen = ncqSLen + ncqKeyLen + ncqPrefixLen + 0
|
||||
{-# INLINE ncqFullTombLen #-}
|
||||
|
||||
ncqEntryPayloadSize :: Integral a => a -> a
|
||||
ncqEntryPayloadSize tot = tot - hpl
|
||||
where hpl = fromIntegral (ncqSLen + ncqKeyLen + ncqPrefixLen)
|
||||
{-# INLINE ncqEntryPayloadSize #-}
|
||||
|
||||
data NCQSectionType = B | R | T | M
|
||||
deriving stock (Eq,Ord,Show)
|
||||
|
|
|
@ -17,6 +17,7 @@ module HBS2.Storage.NCQ3
|
|||
where
|
||||
|
||||
import HBS2.Storage.NCQ3.Internal.Types as Exported
|
||||
import HBS2.Storage.NCQ3.Internal.Class as Exported
|
||||
import HBS2.Storage.NCQ3.Internal.Prelude as Exported
|
||||
import HBS2.Storage.NCQ3.Internal
|
||||
import HBS2.Storage.NCQ3.Internal.Run
|
||||
|
|
|
@ -12,6 +12,7 @@ import HBS2.Storage.NCQ3.Internal.Index
|
|||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||||
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.HashPSQ qualified as HPSQ
|
||||
import Data.Vector qualified as V
|
||||
|
@ -21,6 +22,7 @@ import Data.Set qualified as Set
|
|||
import Data.Either
|
||||
import Lens.Micro.Platform
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Sequence qualified as Seq
|
||||
import System.FilePath.Posix
|
||||
import System.Posix.Files qualified as Posix
|
||||
|
@ -105,6 +107,22 @@ ncqWithStorage fp action = flip runContT pure do
|
|||
wait w
|
||||
pure r
|
||||
|
||||
|
||||
ncqPutBlock :: MonadUnliftIO m
|
||||
=> NCQStorage
|
||||
-> LBS.ByteString
|
||||
-> m (Maybe HashRef)
|
||||
|
||||
-- FIXME: Nothing-on-exception
|
||||
ncqPutBlock sto lbs =
|
||||
ncqLocate sto ohash >>= \case
|
||||
Nothing -> Just <$> ncqPutBS sto (Just B) (Just ohash) bs
|
||||
_ -> pure (Just ohash)
|
||||
where
|
||||
bs = LBS.toStrict lbs
|
||||
ohash = HashRef $ hashObject @HbSync bs
|
||||
{-# INLINE ncqPutBlock #-}
|
||||
|
||||
-- FIXME: maybe-on-storage-closed
|
||||
ncqPutBS :: MonadUnliftIO m
|
||||
=> NCQStorage
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
{-# OPTIONS_GHC -Wno-orphans #-}
|
||||
module HBS2.Storage.NCQ3.Internal.Class where
|
||||
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Hash
|
||||
import HBS2.Storage
|
||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||
import HBS2.Storage.NCQ3.Internal.Types
|
||||
import HBS2.Storage.NCQ3.Internal.Fossil
|
||||
import HBS2.Storage.NCQ3.Internal.Index
|
||||
import HBS2.Storage.NCQ3.Internal
|
||||
|
||||
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.ByteString qualified as BS
|
||||
import Control.Monad.Trans.Maybe
|
||||
|
||||
|
||||
|
||||
instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where
|
||||
hasBlock sto h = ncqStorageHasBlock sto (coerce h)
|
||||
|
||||
putBlock sto lbs = fmap coerce <$> ncqPutBlock sto lbs
|
||||
enqueueBlock sto lbs = fmap coerce <$> ncqPutBlock sto lbs
|
||||
|
||||
getBlock sto h = runMaybeT $ do
|
||||
bs <- MaybeT (ncqStorageGetBlock sto (coerce h))
|
||||
pure (LBS.fromStrict bs)
|
||||
|
||||
delBlock sto = ncqStorageDelBlock sto . coerce
|
||||
|
||||
updateRef sto k v =
|
||||
ncqStorageSetRef sto (HashRef $ hashObject k) (coerce v)
|
||||
|
||||
getRef sto k =
|
||||
ncqStorageGetRef sto (HashRef $ hashObject k) <&> fmap coerce
|
||||
|
||||
delRef sto k =
|
||||
ncqStorageDelRef sto (HashRef $ hashObject k)
|
||||
|
||||
getChunk sto h off size = runMaybeT $ do
|
||||
bs <- MaybeT (ncqStorageGetBlock sto (coerce h))
|
||||
let lbs = LBS.fromStrict bs
|
||||
chunk = LBS.take (fromIntegral size) $ LBS.drop (fromIntegral off) lbs
|
||||
pure chunk
|
||||
|
||||
|
||||
ncqStorageHasBlock :: MonadUnliftIO m
|
||||
=> NCQStorage
|
||||
-> HashRef
|
||||
-> m (Maybe Integer)
|
||||
ncqStorageHasBlock sto h = ncqLocate sto h >>= \case
|
||||
Nothing -> pure Nothing
|
||||
Just (InMemory bs) -> blockSize bs
|
||||
Just (InFossil _ _ size) | ncqIsTombEntrySize size -> pure Nothing
|
||||
Just (InFossil _ _ size) -> do
|
||||
pure $ Just (ncqEntryPayloadSize (fromIntegral size))
|
||||
|
||||
where
|
||||
{-# INLINE blockSize #-}
|
||||
blockSize bs = case ncqEntryUnwrap bs of
|
||||
(_, Left _) -> pure Nothing
|
||||
(_, Right (M, val)) -> pure (Just (fromIntegral $ BS.length val))
|
||||
(_, Right (T, _)) -> pure Nothing
|
||||
(_, Right (R, val)) -> pure (Just (fromIntegral $ BS.length val))
|
||||
(_, Right (B, val)) -> pure (Just (fromIntegral $ BS.length val))
|
||||
{-# INLINE ncqStorageHasBlock #-}
|
||||
|
||||
|
||||
-- | Returns strict ByteString
|
||||
-- | It's up to user to perform
|
||||
-- | in order to free memory mapped file where located
|
||||
-- | the found block.
|
||||
-- | Dangling substrings prevent mmaped files from being released
|
||||
ncqStorageGetBlock :: MonadUnliftIO m
|
||||
=> NCQStorage
|
||||
-> HashRef
|
||||
-> m (Maybe ByteString)
|
||||
|
||||
ncqStorageGetBlock sto h = runMaybeT do
|
||||
loc <- lift (ncqLocate sto h) >>= toMPlus
|
||||
guard (not $ ncqIsTomb loc)
|
||||
(_,what) <- lift (ncqGetEntryBS sto loc)
|
||||
>>= toMPlus
|
||||
<&> ncqEntryUnwrap
|
||||
|
||||
case what of
|
||||
Left _ -> mzero
|
||||
Right (T, _) -> mzero
|
||||
Right (_, ebs) -> pure ebs
|
||||
|
||||
{-# INLINE ncqStorageGetBlock #-}
|
||||
|
||||
-- | Logically delete entry by hash (writes a tomb if present and not already tomb).
|
||||
-- No-op if entry doesn't exist.
|
||||
ncqStorageDelBlock :: MonadUnliftIO m
|
||||
=> NCQStorage
|
||||
-> HashRef
|
||||
-> m ()
|
||||
ncqStorageDelBlock = ncqDelEntry
|
||||
{-# INLINE ncqStorageDelBlock #-}
|
||||
|
||||
-- | Salted ref hash: H( ref || ncqSalt )
|
||||
ncqRefHash :: NCQStorage -> HashRef -> HashRef
|
||||
ncqRefHash NCQStorage{..} h =
|
||||
HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt))
|
||||
{-# INLINE ncqRefHash #-}
|
||||
|
||||
-- | Get ref value (hash) by logical ref key.
|
||||
-- Returns Nothing for tomb/absent/invalid.
|
||||
ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef)
|
||||
ncqStorageGetRef ncq ref = runMaybeT $ do
|
||||
let rkey = ncqRefHash ncq ref
|
||||
loc <- lift (ncqLocate ncq rkey) >>= toMPlus
|
||||
guard (not $ ncqIsTomb loc)
|
||||
bs <- lift (ncqGetEntryBS ncq loc) >>= toMPlus
|
||||
case snd (ncqEntryUnwrap bs) of
|
||||
Right (R, payload) | BS.length payload == ncqKeyLen
|
||||
-> pure (coerce payload)
|
||||
_ -> mzero
|
||||
{-# INLINE ncqStorageGetRef #-}
|
||||
|
||||
-- | Set ref value if changed. Writes section of type R with fixed key = rkey.
|
||||
ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m ()
|
||||
ncqStorageSetRef ncq ref val = do
|
||||
cur <- ncqStorageGetRef ncq ref
|
||||
unless (cur == Just val) $ do
|
||||
let rkey = ncqRefHash ncq ref
|
||||
payload = coerce @_ @ByteString val
|
||||
-- Section type R, fixed key = rkey, payload = value hash bytes
|
||||
void $ ncqPutBS ncq (Just R) (Just rkey) payload
|
||||
{-# INLINE ncqStorageSetRef #-}
|
||||
|
||||
-- | Delete ref (write tomb for ref key), no-op if absent.
|
||||
ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m ()
|
||||
ncqStorageDelRef ncq ref =
|
||||
ncqDelEntry ncq (ncqRefHash ncq ref)
|
||||
{-# INLINE ncqStorageDelRef #-}
|
||||
|
|
@ -12,6 +12,7 @@ module HBS2.Storage.NCQ3.Internal.Prelude
|
|||
, ncqMetaPrefix
|
||||
, ncqIsMeta
|
||||
, ncqFullDataLen
|
||||
, ncqEntryPayloadSize
|
||||
, NCQFullRecordLen(..)
|
||||
, ToFileName(..)
|
||||
, IndexFile(..)
|
||||
|
|
|
@ -10,6 +10,7 @@ import HBS2.Misc.PrettyStuff
|
|||
import HBS2.Clock
|
||||
import HBS2.Merkle
|
||||
import HBS2.Polling
|
||||
import HBS2.Peer.Proto.AnyRef
|
||||
|
||||
import HBS2.Storage
|
||||
import HBS2.Storage.Simple
|
||||
|
@ -44,6 +45,7 @@ import Data.HashSet qualified as HS
|
|||
import Data.HashMap.Strict qualified as HM
|
||||
import Test.Tasty.HUnit
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Ord
|
||||
import Data.Set qualified as Set
|
||||
import System.Random.MWC as MWC
|
||||
|
@ -61,6 +63,8 @@ import UnliftIO.Directory
|
|||
|
||||
{-HLINT ignore "Functor law"-}
|
||||
|
||||
|
||||
|
||||
ncq3Tests :: forall m . MonadUnliftIO m => MakeDictM C m ()
|
||||
ncq3Tests = do
|
||||
entry $ bindMatch "test:ncq3:start-stop" $ nil_ $ \e ->do
|
||||
|
@ -582,6 +586,116 @@ ncq3Tests = do
|
|||
|
||||
notice $ "second must fail" <+> pretty wx <+> "=>" <+> viaShow r
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq3:storage:basic" $ nil_ $ \e -> do
|
||||
let (opts,args) = splitOpts [] e
|
||||
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- args ]
|
||||
let pD = headDef 0.10 [ realToFrac x | LitScientificVal x <- drop 1 args ]
|
||||
let pR = 0.01
|
||||
let kN = headDef 1000 [ fromIntegral x | LitIntVal x <- drop 2 args ]
|
||||
|
||||
blkz <- newTVarIO (mempty :: HashMap (Hash HbSync) (Maybe LBS.ByteString))
|
||||
refz <- newTVarIO (mempty :: HashMap (SomeRefKey HashRef) (Maybe (Hash HbSync)))
|
||||
|
||||
runTest $ \TestEnv{..} -> do
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
ncqWithStorage testEnvDir $ \sto -> do
|
||||
|
||||
replicateM_ n $ liftIO do
|
||||
sz <- uniformRM (1, 64*1024) g
|
||||
bs <- genRandomBS g sz <&> LBS.fromStrict
|
||||
ha <- putBlock sto bs `orDie` "Block not stored"
|
||||
mb <- getBlock sto ha
|
||||
|
||||
when (mb /= Just bs) do
|
||||
assertFailure ("getBlock mismatch for " <> show (pretty ha))
|
||||
|
||||
sz <- hasBlock sto ha `orDie` "block not found"
|
||||
|
||||
assertBool ("hasBlock size mismatch for " <> show (pretty ha)) (sz == fromIntegral (LBS.length bs))
|
||||
|
||||
atomically $ modifyTVar blkz (HM.insert ha (Just bs))
|
||||
|
||||
pd <- uniformRM (0, 1.0) g
|
||||
|
||||
when (pd < pD) do
|
||||
delBlock sto ha
|
||||
atomically $ modifyTVar blkz (HM.insert ha Nothing)
|
||||
found <- hasBlock sto ha
|
||||
assertBool (show $ "not deleted" <+> pretty ha) (isNothing found)
|
||||
|
||||
pr <- uniformRM (0, 1.0) g
|
||||
|
||||
when (pr < pR) do
|
||||
k <- uniformRM (1,10) g
|
||||
replicateM_ k do
|
||||
ref <- SomeRefKey . HashRef . coerce <$> genRandomBS g 32
|
||||
updateRef sto ref ha
|
||||
atomically $ modifyTVar refz (HM.insert ref (Just ha))
|
||||
what <- getRef sto ref
|
||||
assertBool (show $ "ref not found" <+> pretty ref) (what == Just ha)
|
||||
|
||||
prd <- uniformRM (0, 1.0) g
|
||||
|
||||
when (prd < 0.10) do
|
||||
delRef sto ref
|
||||
atomically $ modifyTVar refz (HM.insert ref Nothing)
|
||||
|
||||
notice "immediate test done"
|
||||
|
||||
ncqWithStorage testEnvDir $ \sto -> flip runContT pure do
|
||||
|
||||
p <- newTVarIO (0,0)
|
||||
|
||||
void $ ContT $ withAsync $ forever do
|
||||
(b,r) <- readTVarIO p
|
||||
ema <- readTVarIO (ncqWriteEMA sto)
|
||||
pause @'Seconds 2
|
||||
notice $ "progress" <+> pretty ema <+> pretty b <+> pretty r
|
||||
|
||||
fix \next -> do
|
||||
|
||||
blokz <- readTVarIO blkz <&> HM.toList
|
||||
for_ blokz $ \b -> do
|
||||
atomically $ modifyTVar p (over _1 succ)
|
||||
case b of
|
||||
(h,Nothing) -> liftIO do
|
||||
found <- hasBlock sto h
|
||||
assertBool (show $ "not deleted" <+> pretty h) (isNothing found)
|
||||
|
||||
(h,Just bs) -> liftIO do
|
||||
size <- hasBlock sto h >>= orThrowUser ("not found" <+> pretty h)
|
||||
|
||||
assertBool (show $ "size mismatch" <+> pretty h <+> pretty size <+> pretty (LBS.length bs))
|
||||
(size == fromIntegral (LBS.length bs))
|
||||
|
||||
bs1 <- getBlock sto h >>= orThrowUser ("not found data for" <+> pretty h)
|
||||
assertBool (show $ "data mismatch" <+> pretty h) (bs1 == bs)
|
||||
|
||||
refsz <- readTVarIO refz <&> HM.toList
|
||||
for_ refsz \r -> do
|
||||
atomically $ modifyTVar p (over _2 succ)
|
||||
case r of
|
||||
(ref, Nothing) -> liftIO do
|
||||
what <- getRef sto ref
|
||||
assertBool (show $ "ref resurrected" <+> pretty ref) (isNothing what)
|
||||
|
||||
(ref, Just hv) -> liftIO do
|
||||
what <- getRef sto ref
|
||||
assertBool (show $ "ref mismatch" <+> pretty ref <+> pretty what <+> pretty hv)
|
||||
(what == Just hv)
|
||||
|
||||
noone <- lift (ncqFossilMergeStep sto) <&> not
|
||||
|
||||
if noone then
|
||||
none
|
||||
else do
|
||||
notice "again"
|
||||
next
|
||||
|
||||
notice "re-opened storage test done"
|
||||
|
||||
testNCQ3Concurrent1 :: MonadUnliftIO m
|
||||
=> Bool
|
||||
-> Int
|
||||
|
@ -754,3 +868,5 @@ testNCQ3Lookup1 syn TestEnv{..} = do
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue