From c67ffc2679391003be421e6d1bcfc23e53027dda Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 23 Jul 2025 14:26:24 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/hbs2-storage-ncq.cabal | 4 + hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 1 - .../lib/HBS2/Storage/NCQ/Types.hs | 3 +- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 255 +--------------- .../lib/HBS2/Storage/NCQ2/Internal.hs | 11 + .../lib/HBS2/Storage/NCQ2/Internal/Probes.hs | 68 +++++ .../lib/HBS2/Storage/NCQ2/Internal/Types.hs | 275 ++++++++++++++++++ hbs2-tests/test/TestNCQ.hs | 2 +- 8 files changed, 374 insertions(+), 245 deletions(-) create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index e0a5d0e0..baa2a0f2 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -61,6 +61,9 @@ library exposed-modules: HBS2.Storage.NCQ HBS2.Storage.NCQ2 + HBS2.Storage.NCQ2.Internal + HBS2.Storage.NCQ2.Internal.Types + HBS2.Storage.NCQ2.Internal.Probes HBS2.Storage.NCQ.Types -- other-modules: -- other-extensions: @@ -79,6 +82,7 @@ library , microlens-platform , mmap , mtl + , mwc-random , network-byte-order , prettyprinter , psqueues diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index ce5dbe2d..242f30c4 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -16,7 +16,6 @@ import HBS2.Storage.NCQ.Types import HBS2.Misc.PrettyStuff import HBS2.System.Logger.Simple.ANSI - import HBS2.Data.Log.Structured.NCQ import HBS2.Data.Log.Structured.SD diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 7703fd5f..5bc933d4 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -1,3 +1,4 @@ +{-# Language UndecidableInstances #-} module HBS2.Storage.NCQ.Types where import HBS2.Prelude @@ -43,7 +44,7 @@ data NCQStorageException = instance Exception NCQStorageException newtype FileKey = FileKey ByteString - deriving newtype (Eq,Ord,Hashable,Show) + deriving newtype (Eq,Ord,Hashable,Show,Serialise) instance IsString FileKey where fromString = FileKey . BS8.pack . dropExtension . takeFileName diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 6e3d04bd..dfb1827d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -4,6 +4,7 @@ module HBS2.Storage.NCQ2 ( module HBS2.Storage.NCQ2 , module HBS2.Storage.NCQ.Types + , module HBS2.Storage.NCQ2.Internal ) where @@ -11,62 +12,38 @@ import HBS2.Prelude.Plated import HBS2.Hash import HBS2.OrDie import HBS2.Data.Types.Refs -import HBS2.Base58 -import HBS2.Net.Auth.Credentials -import HBS2.Storage import HBS2.Misc.PrettyStuff import HBS2.System.Logger.Simple.ANSI import HBS2.Data.Log.Structured.NCQ -import HBS2.Data.Log.Structured.SD import HBS2.Storage.NCQ.Types +import HBS2.Storage.NCQ2.Internal import Data.Config.Suckless.System -import Data.Config.Suckless.Script hiding (void) -import Codec.Compression.Zstd qualified as Zstd -import Codec.Compression.Zstd.Lazy as ZstdL -import Codec.Compression.Zstd.Streaming qualified as ZstdS -import Codec.Compression.Zstd.Streaming (Result(..)) import Numeric (showHex) -import Control.Applicative import Data.ByteString.Builder import Network.ByteOrder qualified as N -import Data.Bit.ThreadSafe qualified as BV import Data.HashMap.Strict (HashMap) -import Control.Monad.Except import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe -import Data.Time.Clock.POSIX import Data.Ord (Down(..),comparing) import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TSem -import Data.Hashable (hash) -import Data.HashPSQ qualified as HPSQ -import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) -import Data.IntSet qualified as IntSet -import Data.IntSet (IntSet) import Data.Sequence qualified as Seq -import Data.Sequence (Seq(..), (|>),(<|)) import Data.List qualified as List import Data.ByteString.Lazy qualified as LBS -import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 -import Data.Char (isDigit) -import Data.Fixed import Data.Coerce +import Data.Sequence ((|>)) import Data.Word import Data.Either import Data.Maybe -import Data.Text qualified as Text -import Data.Text.IO qualified as Text -import Data.Int import Data.Vector qualified as V import Data.Vector (Vector, (!)) import Lens.Micro.Platform @@ -74,7 +51,6 @@ import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM import System.FilePath.Posix -import System.Posix.Fcntl import System.Posix.Files qualified as Posix import System.Posix.IO as PosixBase import System.Posix.Types as Posix @@ -88,15 +64,12 @@ import System.Posix.Files ( getFileStatus , setFileMode ) import System.Posix.Files qualified as PFS -import System.IO.Error (catchIOError) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) -import System.Mem --- import Foreign.Ptr --- import Foreign di -import qualified Data.ByteString.Internal as BSI import Streaming.Prelude qualified as S +import System.Random.MWC as MWC + import UnliftIO import UnliftIO.Concurrent(getNumCapabilities) import UnliftIO.IO.File @@ -104,113 +77,6 @@ import UnliftIO.IO.File -- FIXME: ASAP-USE-FILE-LOCK import System.FileLock as FL -type FOff = Word64 - -data NCQEntry = - NCQEntry - { ncqEntryData :: !ByteString - , ncqDumped :: !(TVar (Maybe FileKey)) - } - -type Shard = TVar (HashMap HashRef NCQEntry) - -type NCQOffset = Word64 -type NCQSize = Word32 - -type StateVersion = Word64 - -data NCQIdxEntry = - NCQIdxEntry {-# UNPACK#-} !NCQOffset !NCQSize - -data StateOP = D FileKey | F TimeSpec FileKey | P FileKey - deriving (Eq,Ord,Show) - -data NCQFlag = - NCQMergeNow | NCQCompactNow - deriving (Eq,Ord,Generic) - -data Location = - InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize - | InMemory {-# UNPACK #-} !ByteString - -instance Pretty Location where - pretty = \case - InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s - InMemory _ -> "in-memory" - -data TrackedFile = - TrackedFile - { tfTime :: FilePrio - , tfKey :: FileKey - , tfCached :: TVar (Maybe CachedEntry) - } - -data FactE = EmptyFact - deriving (Eq,Ord,Show,Data,Generic) - -type FactSeq = POSIXTime - -data Fact = - Fact - { factWritten :: Maybe FactSeq - , factE :: FactE - } - deriving (Eq,Ord,Show,Data,Generic) - -instance Hashable FactE -instance Hashable Fact - -type TrackedFiles = Vector TrackedFile - -data NCQStorage2 = - NCQStorage2 - { ncqRoot :: FilePath - , ncqGen :: Int - , ncqSalt :: HashRef - , ncqPostponeMerge :: Timeout 'Seconds - , ncqPostponeSweep :: Timeout 'Seconds - , ncqLuckyNum :: Int - , ncqFsync :: Int - , ncqWriteQLen :: Int - , ncqWriteBlock :: Int - , ncqMinLog :: Int - , ncqMaxLog :: Int - , ncqMaxCached :: Int - , ncqIdleThrsh :: Double - , ncqMemTable :: Vector Shard - , ncqWriteQ :: TVar (Seq HashRef) - , ncqWriteOps :: Vector (TQueue (IO ())) - , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) - , ncqStorageTasks :: TVar Int - , ncqStorageStopReq :: TVar Bool - , ncqStorageSyncReq :: TVar Bool - , ncqMergeReq :: TVar Bool - , ncqMergeSem :: TSem - , ncqSyncNo :: TVar Int - , ncqCurrentFiles :: TVar (HashSet FileKey) - , ncqTrackedFiles :: TVar TrackedFiles - , ncqStateVersion :: TVar StateVersion - , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) - , ncqStateName :: TVar (Maybe StateFile) - , ncqStateSem :: TSem - , ncqCachedEntries :: TVar Int - , ncqWrites :: TVar Int - , ncqWriteEMA :: TVar Double -- for writes-per-seconds - , ncqJobQ :: TQueue (IO ()) - , ncqMiscSem :: TSem - , ncqSweepSem :: TSem - , ncqMergeTasks :: TVar Int - , ncqOnRunWriteIdle :: TVar (IO ()) - - , ncqFactFiles :: TVar (HashSet FileKey) - , ncqFacts :: TVar (HashSet Fact) - } - -megabytes :: forall a . Integral a => a -megabytes = 1024 ^ 2 - -gigabytes :: forall a . Integral a => a -gigabytes = 1024 ^ 3 ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 ncqStorageOpen2 fp upd = do @@ -260,6 +126,8 @@ ncqStorageOpen2 fp upd = do ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList + ncqRndGen <- liftIO MWC.createSystemRandom + let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" let ncq = NCQStorage2{..} & upd @@ -284,61 +152,6 @@ ncqWithStorage fp action = flip runContT pure do wait w pure r -ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath -ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp - -ncqGetWorkDir :: NCQStorage2 -> FilePath -ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen - -ncqGetLockFileName :: NCQStorage2 -> FilePath -ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" - -ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath -ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do - flip fix 0 $ \next i -> do - t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime - let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "") - let n = ncqGetFileName me (pref <> v <> suff) - doesFileExist n >>= \case - False -> pure n - True -> next (succ i) - -ncqEmptyKey :: ByteString -ncqEmptyKey = BS.replicate ncqKeyLen 0 - -ncqGetFactsDir :: NCQStorage2 -> FilePath -ncqGetFactsDir me = ncqGetWorkDir me ".facts" - -ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data" - -ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewStateName me = ncqNewUniqFileName me "state-" "" - -ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data" - -ncqGetNewFactFileName :: MonadIO m => NCQStorage2 -> m FilePath -ncqGetNewFactFileName me = do - ncqNewUniqFileName me (d "fact-") ".f" - where d = ncqGetFactsDir me - -ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () -ncqStorageStop2 NCQStorage2{..} = do - atomically $ writeTVar ncqStorageStopReq True - -ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m () -ncqStorageSync2 NCQStorage2{..} = do - atomically $ writeTVar ncqStorageSyncReq True - -ncqShardIdx :: NCQStorage2 -> HashRef -> Int -ncqShardIdx NCQStorage2{..} h = - fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable -{-# INLINE ncqShardIdx #-} - -ncqGetShard :: NCQStorage2 -> HashRef -> Shard -ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h -{-# INLINE ncqGetShard #-} ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry) ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h @@ -445,19 +258,7 @@ ncqEntrySize = \case InFossil _ _ _ size -> fromIntegral size InMemory bs -> fromIntegral (BS.length bs) -useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a -useVersion ncq m = bracket succV predV m - where - succV = atomically (ncqStateUseSTM ncq) - predV = const $ atomically (ncqStateUnuseSTM ncq) -ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) -ncqListTrackedFilesSTM NCQStorage2{..} = do - fs <- readTVar ncqTrackedFiles - for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached - -ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) -ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM ncqPreloadIndexes :: MonadUnliftIO m @@ -551,21 +352,6 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do go 0 0 mempty -ncqLookupIndex :: MonadUnliftIO m - => HashRef - -> (ByteString, NWayHash) - -> m (Maybe NCQIdxEntry ) -ncqLookupIndex hx (mmaped, nway) = do - fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) -{-# INLINE ncqLookupIndex #-} - -decodeEntry :: ByteString -> NCQIdxEntry -decodeEntry entryBs = do - let (p,r) = BS.splitAt 8 entryBs - let off = fromIntegral (N.word64 p) - let size = fromIntegral (N.word32 (BS.take 4 r)) - NCQIdxEntry off size -{-# INLINE decodeEntry #-} ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) ncqLocate2 NCQStorage2{..} href = do @@ -704,6 +490,12 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do spawnActivity factsDB + spawnActivity $ postponed 20 $ forever do + ema <- readTVarIO ncqWriteEMA + when (ema < 50 ) do + ncqKeyNumIntersectionProbe ncq + + pause @'Seconds 10 ContT $ bracket none $ const $ liftIO do fhh <- atomically (STM.flushTQueue closeQ) @@ -1089,18 +881,6 @@ ncqWaitTasks NCQStorage2{..} = atomically do tno <- readTVar ncqStorageTasks when (tno > 0) STM.retry -ncqStateUseSTM :: NCQStorage2 -> STM () -ncqStateUseSTM NCQStorage2{..} = do - k <- readTVar ncqStateVersion <&> fromIntegral - modifyTVar ncqStateUsage (IntMap.update (Just . over _1 succ) k) -{-# INLINE ncqStateUseSTM #-} - -ncqStateUnuseSTM :: NCQStorage2 -> STM () -ncqStateUnuseSTM NCQStorage2{..} = do - k <- readTVar ncqStateVersion <&> fromIntegral - -- TODO: remove when n <= 0 - modifyTVar ncqStateUsage (IntMap.update (Just . over _1 pred) k) -{-# INLINE ncqStateUnuseSTM #-} ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool ncqStateUpdate me@NCQStorage2{..} ops' = withSem ncqStateSem $ flip runContT pure $ callCC \exit -> do @@ -1633,14 +1413,5 @@ appendTailSection fh = liftIO do {-# INLINE appendTailSection #-} -withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a -withSem sem m = bracket enter leave (const m) - where enter = atomically (waitTSem sem) - leave = const $ atomically (signalTSem sem) - -isNotPending :: Maybe CachedEntry -> Bool -isNotPending = \case - Just (PendingEntry {}) -> False - _ -> True diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs new file mode 100644 index 00000000..5c2e933d --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal.hs @@ -0,0 +1,11 @@ +module HBS2.Storage.NCQ2.Internal + ( module HBS2.Storage.NCQ2.Internal + , module Export + )where + +import HBS2.Storage.NCQ2.Internal.Types as Export +import HBS2.Storage.NCQ2.Internal.Probes as Export + + + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs new file mode 100644 index 00000000..e5c53f32 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs @@ -0,0 +1,68 @@ +{-# Language RecordWildCards #-} +module HBS2.Storage.NCQ2.Internal.Probes where + +import HBS2.Prelude +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.System.Logger.Simple.ANSI +import HBS2.Misc.PrettyStuff + +import HBS2.Data.Log.Structured.NCQ + +import HBS2.Storage.NCQ2.Internal.Types +import HBS2.Storage.NCQ.Types + +import Control.Monad.Trans.Maybe +import Data.Coerce +import Data.HashMap.Strict qualified as HM +import Data.List qualified as List +import Data.Maybe +import Data.Vector ((!)) +import Data.Vector qualified as V +import Lens.Micro.Platform +import System.Random.MWC qualified as MWC +import UnliftIO + +ncqKeyNumIntersectionProbe :: MonadUnliftIO m => NCQStorage2 -> m () +ncqKeyNumIntersectionProbe me@NCQStorage2{..} = useVersion me $ const $ void $ runMaybeT do + + -- Фильтруем pending + files0 <- lift (ncqListTrackedFiles me) + let files = V.toList $ V.filter (isNotPending . view _2) files0 + + when (length files < 2) mzero + + (a,b) <- liftIO $ fix \next -> do + i <- MWC.uniformRM (0, length files - 1) ncqRndGen + j <- MWC.uniformRM (0, length files - 1) ncqRndGen + if i == j then next else pure (files !! min i j, files !! max i j) + + let fka = view _1 a + let fkb = view _1 b + let key = FactKey $ coerce $ hashObject @HbSync $ serialise $ List.sort [fka, fkb] + + known <- lift (readTVarIO ncqFacts <&> HM.member key) + when known mzero + + let fIndexA = ncqGetFileName me (toFileName (IndexFile fka)) + let fIndexB = ncqGetFileName me (toFileName (IndexFile fkb)) + + idxPair' <- liftIO $ try @_ @IOException do + (,) <$> nwayHashMMapReadOnly fIndexA + <*> nwayHashMMapReadOnly fIndexB + + ((bs1,n1),(bs2,n2)) <- case idxPair' of + Right (Just x, Just y) -> pure (x,y) + _ -> warn ("can't load index pair" <+> pretty (fka, fkb)) >> mzero + + n <- liftIO $ do + ref <- newTVarIO 0 + nwayHashScanAll n1 bs1 $ \_ k _ -> do + here <- ncqLookupIndex (coerce k) (bs2,n2) + when (isJust here) $ atomically $ modifyTVar' ref (+1) + readTVarIO ref + + debug $ yellow "ncqKeyNumIntersectionProbe" + <+> pretty fka <+> pretty fkb <+> pretty n + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs new file mode 100644 index 00000000..f7ced96d --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs @@ -0,0 +1,275 @@ +{-# Language RecordWildCards #-} +module HBS2.Storage.NCQ2.Internal.Types where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Base58 +import HBS2.Net.Auth.Credentials +import HBS2.Storage +import HBS2.Misc.PrettyStuff +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Data.Log.Structured.SD +import HBS2.Data.Log.Structured.NCQ + +import HBS2.Storage.NCQ.Types + +import Data.Config.Suckless.System + +import Numeric (showHex) +import Network.ByteOrder qualified as N +import Data.HashMap.Strict (HashMap) +import Control.Concurrent.STM.TSem +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.Sequence qualified as Seq +import Data.Sequence (Seq(..), (|>),(<|)) +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.Coerce +import Data.Word +import Data.Vector qualified as V +import Data.Vector (Vector, (!)) +import Lens.Micro.Platform +import Data.HashSet (HashSet) +import System.FilePath.Posix + +import Control.Monad.ST +import System.Random.MWC as MWC + +import UnliftIO + + +type FOff = Word64 + +data NCQEntry = + NCQEntry + { ncqEntryData :: !ByteString + , ncqDumped :: !(TVar (Maybe FileKey)) + } + +type Shard = TVar (HashMap HashRef NCQEntry) + +type NCQOffset = Word64 +type NCQSize = Word32 + +type StateVersion = Word64 + +data NCQIdxEntry = + NCQIdxEntry {-# UNPACK#-} !NCQOffset !NCQSize + +data StateOP = D FileKey | F TimeSpec FileKey | P FileKey + deriving (Eq,Ord,Show) + +data NCQFlag = + NCQMergeNow | NCQCompactNow + deriving (Eq,Ord,Generic) + +data Location = + InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize + | InMemory {-# UNPACK #-} !ByteString + +instance Pretty Location where + pretty = \case + InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s + InMemory _ -> "in-memory" + +data TrackedFile = + TrackedFile + { tfTime :: FilePrio + , tfKey :: FileKey + , tfCached :: TVar (Maybe CachedEntry) + } + +data FactE = KeyIntersection FileKey FileKey Int + deriving (Eq,Ord,Show,Generic) + +type FactSeq = POSIXTime + +newtype FactKey = + FactKey ByteString + deriving newtype (Eq,Ord,Hashable) + +data Fact = + Facot + { factWritten :: Maybe FactSeq + , factE :: FactE + } + deriving (Eq,Ord,Show,Generic) + +instance Hashable FactE +instance Hashable Fact + +type TrackedFiles = Vector TrackedFile + +data NCQStorage2 = + NCQStorage2 + { ncqRoot :: FilePath + , ncqGen :: Int + , ncqSalt :: HashRef + , ncqPostponeMerge :: Timeout 'Seconds + , ncqPostponeSweep :: Timeout 'Seconds + , ncqLuckyNum :: Int + , ncqFsync :: Int + , ncqWriteQLen :: Int + , ncqWriteBlock :: Int + , ncqMinLog :: Int + , ncqMaxLog :: Int + , ncqMaxCached :: Int + , ncqIdleThrsh :: Double + , ncqMemTable :: Vector Shard + , ncqWriteQ :: TVar (Seq HashRef) + , ncqWriteOps :: Vector (TQueue (IO ())) + , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) + , ncqStorageTasks :: TVar Int + , ncqStorageStopReq :: TVar Bool + , ncqStorageSyncReq :: TVar Bool + , ncqMergeReq :: TVar Bool + , ncqMergeSem :: TSem + , ncqSyncNo :: TVar Int + , ncqCurrentFiles :: TVar (HashSet FileKey) + , ncqTrackedFiles :: TVar TrackedFiles + , ncqStateVersion :: TVar StateVersion + , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) + , ncqStateName :: TVar (Maybe StateFile) + , ncqStateSem :: TSem + , ncqCachedEntries :: TVar Int + , ncqWrites :: TVar Int + , ncqWriteEMA :: TVar Double -- for writes-per-seconds + , ncqJobQ :: TQueue (IO ()) + , ncqMiscSem :: TSem + , ncqSweepSem :: TSem + , ncqMergeTasks :: TVar Int + , ncqOnRunWriteIdle :: TVar (IO ()) + + , ncqFactFiles :: TVar (HashSet FileKey) + , ncqFacts :: TVar (HashMap FactKey Fact) + , ncqRndGen :: Gen RealWorld + } + +megabytes :: forall a . Integral a => a +megabytes = 1024 ^ 2 + +gigabytes :: forall a . Integral a => a +gigabytes = 1024 ^ 3 + + +ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath +ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp + +ncqGetWorkDir :: NCQStorage2 -> FilePath +ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen + +ncqGetLockFileName :: NCQStorage2 -> FilePath +ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" + +ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath +ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do + flip fix 0 $ \next i -> do + t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime + let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "") + let n = ncqGetFileName me (pref <> v <> suff) + doesFileExist n >>= \case + False -> pure n + True -> next (succ i) + +ncqEmptyKey :: ByteString +ncqEmptyKey = BS.replicate ncqKeyLen 0 + +ncqGetFactsDir :: NCQStorage2 -> FilePath +ncqGetFactsDir me = ncqGetWorkDir me ".facts" + +ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data" + +ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewStateName me = ncqNewUniqFileName me "state-" "" + +ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data" + +ncqGetNewFactFileName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewFactFileName me = do + ncqNewUniqFileName me (d "fact-") ".f" + where d = ncqGetFactsDir me + +ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () +ncqStorageStop2 NCQStorage2{..} = do + atomically $ writeTVar ncqStorageStopReq True + +ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m () +ncqStorageSync2 NCQStorage2{..} = do + atomically $ writeTVar ncqStorageSyncReq True + +ncqShardIdx :: NCQStorage2 -> HashRef -> Int +ncqShardIdx NCQStorage2{..} h = + fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable +{-# INLINE ncqShardIdx #-} + +ncqGetShard :: NCQStorage2 -> HashRef -> Shard +ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h +{-# INLINE ncqGetShard #-} + + +useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a +useVersion ncq m = bracket succV predV m + where + succV = atomically (ncqStateUseSTM ncq) + predV = const $ atomically (ncqStateUnuseSTM ncq) + + +ncqStateUseSTM :: NCQStorage2 -> STM () +ncqStateUseSTM NCQStorage2{..} = do + k <- readTVar ncqStateVersion <&> fromIntegral + modifyTVar ncqStateUsage (IntMap.update (Just . over _1 succ) k) +{-# INLINE ncqStateUseSTM #-} + +ncqStateUnuseSTM :: NCQStorage2 -> STM () +ncqStateUnuseSTM NCQStorage2{..} = do + k <- readTVar ncqStateVersion <&> fromIntegral + -- TODO: remove when n <= 0 + modifyTVar ncqStateUsage (IntMap.update (Just . over _1 pred) k) +{-# INLINE ncqStateUnuseSTM #-} + +withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a +withSem sem m = bracket enter leave (const m) + where enter = atomically (waitTSem sem) + leave = const $ atomically (signalTSem sem) + + + +ncqLookupIndex :: MonadUnliftIO m + => HashRef + -> (ByteString, NWayHash) + -> m (Maybe NCQIdxEntry ) +ncqLookupIndex hx (mmaped, nway) = do + fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) +{-# INLINE ncqLookupIndex #-} + +decodeEntry :: ByteString -> NCQIdxEntry +decodeEntry entryBs = do + let (p,r) = BS.splitAt 8 entryBs + let off = fromIntegral (N.word64 p) + let size = fromIntegral (N.word32 (BS.take 4 r)) + NCQIdxEntry off size +{-# INLINE decodeEntry #-} + + +isNotPending :: Maybe CachedEntry -> Bool +isNotPending = \case + Just (PendingEntry {}) -> False + _ -> True + +isPending :: Maybe CachedEntry -> Bool +isPending = not . isNotPending + + +ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) +ncqListTrackedFilesSTM NCQStorage2{..} = do + fs <- readTVar ncqTrackedFiles + for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached + +ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) +ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 1fc5a5ce..0b071205 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -1788,7 +1788,7 @@ main = do g <- liftIO MWC.createSystemRandom let dir = testEnvDir let n = 30000 - let p = 0.15 + let p = 0.25 sizes <- replicateM n (uniformRM (4096, 256*1024) g)