diff --git a/hbs2-core/lib/HBS2/Data/Types/Refs.hs b/hbs2-core/lib/HBS2/Data/Types/Refs.hs index 6e5d017d..8955c428 100644 --- a/hbs2-core/lib/HBS2/Data/Types/Refs.hs +++ b/hbs2-core/lib/HBS2/Data/Types/Refs.hs @@ -95,6 +95,7 @@ type IsRefPubKey s = ( Eq (PubKey 'Sign s) type ForSomeRefKey a = ( Hashed HbSync a ) newtype SomeRefKey a = SomeRefKey a + deriving newtype (Eq,Hashable) instance RefMetaData (SomeRefKey a) diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 9d59aaae..6238d8f5 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -62,6 +62,7 @@ library exposed-modules: HBS2.Storage.NCQ3 HBS2.Storage.NCQ3.Internal + HBS2.Storage.NCQ3.Internal.Class HBS2.Storage.NCQ3.Internal.Types HBS2.Storage.NCQ3.Internal.Prelude HBS2.Storage.NCQ3.Internal.State diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 1aba82aa..31641239 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -136,6 +136,10 @@ ncqFullTombLen :: forall a . Integral a => a ncqFullTombLen = ncqSLen + ncqKeyLen + ncqPrefixLen + 0 {-# INLINE ncqFullTombLen #-} +ncqEntryPayloadSize :: Integral a => a -> a +ncqEntryPayloadSize tot = tot - hpl + where hpl = fromIntegral (ncqSLen + ncqKeyLen + ncqPrefixLen) +{-# INLINE ncqEntryPayloadSize #-} data NCQSectionType = B | R | T | M deriving stock (Eq,Ord,Show) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs index 9240beb5..f89a9765 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs @@ -17,6 +17,7 @@ module HBS2.Storage.NCQ3 where import HBS2.Storage.NCQ3.Internal.Types as Exported +import HBS2.Storage.NCQ3.Internal.Class as Exported import HBS2.Storage.NCQ3.Internal.Prelude as Exported import HBS2.Storage.NCQ3.Internal import HBS2.Storage.NCQ3.Internal.Run diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 65958661..76deeba7 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -12,6 +12,7 @@ import HBS2.Storage.NCQ3.Internal.Index import HBS2.Storage.NCQ3.Internal.MMapCache import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe import Network.ByteOrder qualified as N import Data.HashPSQ qualified as HPSQ import Data.Vector qualified as V @@ -21,6 +22,7 @@ import Data.Set qualified as Set import Data.Either import Lens.Micro.Platform import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS import Data.Sequence qualified as Seq import System.FilePath.Posix import System.Posix.Files qualified as Posix @@ -105,6 +107,22 @@ ncqWithStorage fp action = flip runContT pure do wait w pure r + +ncqPutBlock :: MonadUnliftIO m + => NCQStorage + -> LBS.ByteString + -> m (Maybe HashRef) + +-- FIXME: Nothing-on-exception +ncqPutBlock sto lbs = + ncqLocate sto ohash >>= \case + Nothing -> Just <$> ncqPutBS sto (Just B) (Just ohash) bs + _ -> pure (Just ohash) + where + bs = LBS.toStrict lbs + ohash = HashRef $ hashObject @HbSync bs +{-# INLINE ncqPutBlock #-} + -- FIXME: maybe-on-storage-closed ncqPutBS :: MonadUnliftIO m => NCQStorage diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Class.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Class.hs new file mode 100644 index 00000000..fcc5efbb --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Class.hs @@ -0,0 +1,139 @@ +{-# OPTIONS_GHC -Wno-orphans #-} +module HBS2.Storage.NCQ3.Internal.Class where + +import HBS2.Data.Types.Refs +import HBS2.Hash +import HBS2.Storage +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.Fossil +import HBS2.Storage.NCQ3.Internal.Index +import HBS2.Storage.NCQ3.Internal + + +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Control.Monad.Trans.Maybe + + + +instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where + hasBlock sto h = ncqStorageHasBlock sto (coerce h) + + putBlock sto lbs = fmap coerce <$> ncqPutBlock sto lbs + enqueueBlock sto lbs = fmap coerce <$> ncqPutBlock sto lbs + + getBlock sto h = runMaybeT $ do + bs <- MaybeT (ncqStorageGetBlock sto (coerce h)) + pure (LBS.fromStrict bs) + + delBlock sto = ncqStorageDelBlock sto . coerce + + updateRef sto k v = + ncqStorageSetRef sto (HashRef $ hashObject k) (coerce v) + + getRef sto k = + ncqStorageGetRef sto (HashRef $ hashObject k) <&> fmap coerce + + delRef sto k = + ncqStorageDelRef sto (HashRef $ hashObject k) + + getChunk sto h off size = runMaybeT $ do + bs <- MaybeT (ncqStorageGetBlock sto (coerce h)) + let lbs = LBS.fromStrict bs + chunk = LBS.take (fromIntegral size) $ LBS.drop (fromIntegral off) lbs + pure chunk + + +ncqStorageHasBlock :: MonadUnliftIO m + => NCQStorage + -> HashRef + -> m (Maybe Integer) +ncqStorageHasBlock sto h = ncqLocate sto h >>= \case + Nothing -> pure Nothing + Just (InMemory bs) -> blockSize bs + Just (InFossil _ _ size) | ncqIsTombEntrySize size -> pure Nothing + Just (InFossil _ _ size) -> do + pure $ Just (ncqEntryPayloadSize (fromIntegral size)) + + where + {-# INLINE blockSize #-} + blockSize bs = case ncqEntryUnwrap bs of + (_, Left _) -> pure Nothing + (_, Right (M, val)) -> pure (Just (fromIntegral $ BS.length val)) + (_, Right (T, _)) -> pure Nothing + (_, Right (R, val)) -> pure (Just (fromIntegral $ BS.length val)) + (_, Right (B, val)) -> pure (Just (fromIntegral $ BS.length val)) +{-# INLINE ncqStorageHasBlock #-} + + +-- | Returns strict ByteString +-- | It's up to user to perform +-- | in order to free memory mapped file where located +-- | the found block. +-- | Dangling substrings prevent mmaped files from being released +ncqStorageGetBlock :: MonadUnliftIO m + => NCQStorage + -> HashRef + -> m (Maybe ByteString) + +ncqStorageGetBlock sto h = runMaybeT do + loc <- lift (ncqLocate sto h) >>= toMPlus + guard (not $ ncqIsTomb loc) + (_,what) <- lift (ncqGetEntryBS sto loc) + >>= toMPlus + <&> ncqEntryUnwrap + + case what of + Left _ -> mzero + Right (T, _) -> mzero + Right (_, ebs) -> pure ebs + +{-# INLINE ncqStorageGetBlock #-} + +-- | Logically delete entry by hash (writes a tomb if present and not already tomb). +-- No-op if entry doesn't exist. +ncqStorageDelBlock :: MonadUnliftIO m + => NCQStorage + -> HashRef + -> m () +ncqStorageDelBlock = ncqDelEntry +{-# INLINE ncqStorageDelBlock #-} + +-- | Salted ref hash: H( ref || ncqSalt ) +ncqRefHash :: NCQStorage -> HashRef -> HashRef +ncqRefHash NCQStorage{..} h = + HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) +{-# INLINE ncqRefHash #-} + +-- | Get ref value (hash) by logical ref key. +-- Returns Nothing for tomb/absent/invalid. +ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef) +ncqStorageGetRef ncq ref = runMaybeT $ do + let rkey = ncqRefHash ncq ref + loc <- lift (ncqLocate ncq rkey) >>= toMPlus + guard (not $ ncqIsTomb loc) + bs <- lift (ncqGetEntryBS ncq loc) >>= toMPlus + case snd (ncqEntryUnwrap bs) of + Right (R, payload) | BS.length payload == ncqKeyLen + -> pure (coerce payload) + _ -> mzero +{-# INLINE ncqStorageGetRef #-} + +-- | Set ref value if changed. Writes section of type R with fixed key = rkey. +ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m () +ncqStorageSetRef ncq ref val = do + cur <- ncqStorageGetRef ncq ref + unless (cur == Just val) $ do + let rkey = ncqRefHash ncq ref + payload = coerce @_ @ByteString val + -- Section type R, fixed key = rkey, payload = value hash bytes + void $ ncqPutBS ncq (Just R) (Just rkey) payload +{-# INLINE ncqStorageSetRef #-} + +-- | Delete ref (write tomb for ref key), no-op if absent. +ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m () +ncqStorageDelRef ncq ref = + ncqDelEntry ncq (ncqRefHash ncq ref) +{-# INLINE ncqStorageDelRef #-} + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs index 4c7187db..e7cec949 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs @@ -12,6 +12,7 @@ module HBS2.Storage.NCQ3.Internal.Prelude , ncqMetaPrefix , ncqIsMeta , ncqFullDataLen + , ncqEntryPayloadSize , NCQFullRecordLen(..) , ToFileName(..) , IndexFile(..) diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index bcd10e88..b6662355 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -10,6 +10,7 @@ import HBS2.Misc.PrettyStuff import HBS2.Clock import HBS2.Merkle import HBS2.Polling +import HBS2.Peer.Proto.AnyRef import HBS2.Storage import HBS2.Storage.Simple @@ -44,6 +45,7 @@ import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM import Test.Tasty.HUnit import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS import Data.Ord import Data.Set qualified as Set import System.Random.MWC as MWC @@ -61,6 +63,8 @@ import UnliftIO.Directory {-HLINT ignore "Functor law"-} + + ncq3Tests :: forall m . MonadUnliftIO m => MakeDictM C m () ncq3Tests = do entry $ bindMatch "test:ncq3:start-stop" $ nil_ $ \e ->do @@ -582,6 +586,116 @@ ncq3Tests = do notice $ "second must fail" <+> pretty wx <+> "=>" <+> viaShow r + + entry $ bindMatch "test:ncq3:storage:basic" $ nil_ $ \e -> do + let (opts,args) = splitOpts [] e + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- args ] + let pD = headDef 0.10 [ realToFrac x | LitScientificVal x <- drop 1 args ] + let pR = 0.01 + let kN = headDef 1000 [ fromIntegral x | LitIntVal x <- drop 2 args ] + + blkz <- newTVarIO (mempty :: HashMap (Hash HbSync) (Maybe LBS.ByteString)) + refz <- newTVarIO (mempty :: HashMap (SomeRefKey HashRef) (Maybe (Hash HbSync))) + + runTest $ \TestEnv{..} -> do + g <- liftIO MWC.createSystemRandom + + ncqWithStorage testEnvDir $ \sto -> do + + replicateM_ n $ liftIO do + sz <- uniformRM (1, 64*1024) g + bs <- genRandomBS g sz <&> LBS.fromStrict + ha <- putBlock sto bs `orDie` "Block not stored" + mb <- getBlock sto ha + + when (mb /= Just bs) do + assertFailure ("getBlock mismatch for " <> show (pretty ha)) + + sz <- hasBlock sto ha `orDie` "block not found" + + assertBool ("hasBlock size mismatch for " <> show (pretty ha)) (sz == fromIntegral (LBS.length bs)) + + atomically $ modifyTVar blkz (HM.insert ha (Just bs)) + + pd <- uniformRM (0, 1.0) g + + when (pd < pD) do + delBlock sto ha + atomically $ modifyTVar blkz (HM.insert ha Nothing) + found <- hasBlock sto ha + assertBool (show $ "not deleted" <+> pretty ha) (isNothing found) + + pr <- uniformRM (0, 1.0) g + + when (pr < pR) do + k <- uniformRM (1,10) g + replicateM_ k do + ref <- SomeRefKey . HashRef . coerce <$> genRandomBS g 32 + updateRef sto ref ha + atomically $ modifyTVar refz (HM.insert ref (Just ha)) + what <- getRef sto ref + assertBool (show $ "ref not found" <+> pretty ref) (what == Just ha) + + prd <- uniformRM (0, 1.0) g + + when (prd < 0.10) do + delRef sto ref + atomically $ modifyTVar refz (HM.insert ref Nothing) + + notice "immediate test done" + + ncqWithStorage testEnvDir $ \sto -> flip runContT pure do + + p <- newTVarIO (0,0) + + void $ ContT $ withAsync $ forever do + (b,r) <- readTVarIO p + ema <- readTVarIO (ncqWriteEMA sto) + pause @'Seconds 2 + notice $ "progress" <+> pretty ema <+> pretty b <+> pretty r + + fix \next -> do + + blokz <- readTVarIO blkz <&> HM.toList + for_ blokz $ \b -> do + atomically $ modifyTVar p (over _1 succ) + case b of + (h,Nothing) -> liftIO do + found <- hasBlock sto h + assertBool (show $ "not deleted" <+> pretty h) (isNothing found) + + (h,Just bs) -> liftIO do + size <- hasBlock sto h >>= orThrowUser ("not found" <+> pretty h) + + assertBool (show $ "size mismatch" <+> pretty h <+> pretty size <+> pretty (LBS.length bs)) + (size == fromIntegral (LBS.length bs)) + + bs1 <- getBlock sto h >>= orThrowUser ("not found data for" <+> pretty h) + assertBool (show $ "data mismatch" <+> pretty h) (bs1 == bs) + + refsz <- readTVarIO refz <&> HM.toList + for_ refsz \r -> do + atomically $ modifyTVar p (over _2 succ) + case r of + (ref, Nothing) -> liftIO do + what <- getRef sto ref + assertBool (show $ "ref resurrected" <+> pretty ref) (isNothing what) + + (ref, Just hv) -> liftIO do + what <- getRef sto ref + assertBool (show $ "ref mismatch" <+> pretty ref <+> pretty what <+> pretty hv) + (what == Just hv) + + noone <- lift (ncqFossilMergeStep sto) <&> not + + if noone then + none + else do + notice "again" + next + + notice "re-opened storage test done" + testNCQ3Concurrent1 :: MonadUnliftIO m => Bool -> Int @@ -754,3 +868,5 @@ testNCQ3Lookup1 syn TestEnv{..} = do + +