mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
1f589cfe55
commit
c67ffc2679
|
@ -61,6 +61,9 @@ library
|
|||
exposed-modules:
|
||||
HBS2.Storage.NCQ
|
||||
HBS2.Storage.NCQ2
|
||||
HBS2.Storage.NCQ2.Internal
|
||||
HBS2.Storage.NCQ2.Internal.Types
|
||||
HBS2.Storage.NCQ2.Internal.Probes
|
||||
HBS2.Storage.NCQ.Types
|
||||
-- other-modules:
|
||||
-- other-extensions:
|
||||
|
@ -79,6 +82,7 @@ library
|
|||
, microlens-platform
|
||||
, mmap
|
||||
, mtl
|
||||
, mwc-random
|
||||
, network-byte-order
|
||||
, prettyprinter
|
||||
, psqueues
|
||||
|
|
|
@ -16,7 +16,6 @@ import HBS2.Storage.NCQ.Types
|
|||
import HBS2.Misc.PrettyStuff
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
import HBS2.Data.Log.Structured.SD
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
{-# Language UndecidableInstances #-}
|
||||
module HBS2.Storage.NCQ.Types where
|
||||
|
||||
import HBS2.Prelude
|
||||
|
@ -43,7 +44,7 @@ data NCQStorageException =
|
|||
instance Exception NCQStorageException
|
||||
|
||||
newtype FileKey = FileKey ByteString
|
||||
deriving newtype (Eq,Ord,Hashable,Show)
|
||||
deriving newtype (Eq,Ord,Hashable,Show,Serialise)
|
||||
|
||||
instance IsString FileKey where
|
||||
fromString = FileKey . BS8.pack . dropExtension . takeFileName
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
module HBS2.Storage.NCQ2
|
||||
( module HBS2.Storage.NCQ2
|
||||
, module HBS2.Storage.NCQ.Types
|
||||
, module HBS2.Storage.NCQ2.Internal
|
||||
)
|
||||
where
|
||||
|
||||
|
@ -11,62 +12,38 @@ import HBS2.Prelude.Plated
|
|||
import HBS2.Hash
|
||||
import HBS2.OrDie
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Base58
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Storage
|
||||
import HBS2.Misc.PrettyStuff
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
import HBS2.Data.Log.Structured.SD
|
||||
|
||||
import HBS2.Storage.NCQ.Types
|
||||
import HBS2.Storage.NCQ2.Internal
|
||||
|
||||
import Data.Config.Suckless.System
|
||||
import Data.Config.Suckless.Script hiding (void)
|
||||
|
||||
import Codec.Compression.Zstd qualified as Zstd
|
||||
import Codec.Compression.Zstd.Lazy as ZstdL
|
||||
import Codec.Compression.Zstd.Streaming qualified as ZstdS
|
||||
import Codec.Compression.Zstd.Streaming (Result(..))
|
||||
|
||||
import Numeric (showHex)
|
||||
import Control.Applicative
|
||||
import Data.ByteString.Builder
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.Bit.ThreadSafe qualified as BV
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.Ord (Down(..),comparing)
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Control.Concurrent.STM.TSem
|
||||
import Data.Hashable (hash)
|
||||
import Data.HashPSQ qualified as HPSQ
|
||||
import Data.HashPSQ (HashPSQ)
|
||||
import Data.IntMap qualified as IntMap
|
||||
import Data.IntMap (IntMap)
|
||||
import Data.IntSet qualified as IntSet
|
||||
import Data.IntSet (IntSet)
|
||||
import Data.Sequence qualified as Seq
|
||||
import Data.Sequence (Seq(..), (|>),(<|))
|
||||
import Data.List qualified as List
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.ByteString.Lazy.Char8 qualified as LBS8
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Char8 qualified as BS8
|
||||
import Data.Char (isDigit)
|
||||
import Data.Fixed
|
||||
import Data.Coerce
|
||||
import Data.Sequence ((|>))
|
||||
import Data.Word
|
||||
import Data.Either
|
||||
import Data.Maybe
|
||||
import Data.Text qualified as Text
|
||||
import Data.Text.IO qualified as Text
|
||||
import Data.Int
|
||||
import Data.Vector qualified as V
|
||||
import Data.Vector (Vector, (!))
|
||||
import Lens.Micro.Platform
|
||||
|
@ -74,7 +51,6 @@ import Data.HashSet (HashSet)
|
|||
import Data.HashSet qualified as HS
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import System.FilePath.Posix
|
||||
import System.Posix.Fcntl
|
||||
import System.Posix.Files qualified as Posix
|
||||
import System.Posix.IO as PosixBase
|
||||
import System.Posix.Types as Posix
|
||||
|
@ -88,15 +64,12 @@ import System.Posix.Files ( getFileStatus
|
|||
, setFileMode
|
||||
)
|
||||
import System.Posix.Files qualified as PFS
|
||||
import System.IO.Error (catchIOError)
|
||||
import System.IO.MMap as MMap
|
||||
import System.IO.Temp (emptyTempFile)
|
||||
import System.Mem
|
||||
-- import Foreign.Ptr
|
||||
-- import Foreign di
|
||||
import qualified Data.ByteString.Internal as BSI
|
||||
import Streaming.Prelude qualified as S
|
||||
|
||||
import System.Random.MWC as MWC
|
||||
|
||||
import UnliftIO
|
||||
import UnliftIO.Concurrent(getNumCapabilities)
|
||||
import UnliftIO.IO.File
|
||||
|
@ -104,113 +77,6 @@ import UnliftIO.IO.File
|
|||
-- FIXME: ASAP-USE-FILE-LOCK
|
||||
import System.FileLock as FL
|
||||
|
||||
type FOff = Word64
|
||||
|
||||
data NCQEntry =
|
||||
NCQEntry
|
||||
{ ncqEntryData :: !ByteString
|
||||
, ncqDumped :: !(TVar (Maybe FileKey))
|
||||
}
|
||||
|
||||
type Shard = TVar (HashMap HashRef NCQEntry)
|
||||
|
||||
type NCQOffset = Word64
|
||||
type NCQSize = Word32
|
||||
|
||||
type StateVersion = Word64
|
||||
|
||||
data NCQIdxEntry =
|
||||
NCQIdxEntry {-# UNPACK#-} !NCQOffset !NCQSize
|
||||
|
||||
data StateOP = D FileKey | F TimeSpec FileKey | P FileKey
|
||||
deriving (Eq,Ord,Show)
|
||||
|
||||
data NCQFlag =
|
||||
NCQMergeNow | NCQCompactNow
|
||||
deriving (Eq,Ord,Generic)
|
||||
|
||||
data Location =
|
||||
InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize
|
||||
| InMemory {-# UNPACK #-} !ByteString
|
||||
|
||||
instance Pretty Location where
|
||||
pretty = \case
|
||||
InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s
|
||||
InMemory _ -> "in-memory"
|
||||
|
||||
data TrackedFile =
|
||||
TrackedFile
|
||||
{ tfTime :: FilePrio
|
||||
, tfKey :: FileKey
|
||||
, tfCached :: TVar (Maybe CachedEntry)
|
||||
}
|
||||
|
||||
data FactE = EmptyFact
|
||||
deriving (Eq,Ord,Show,Data,Generic)
|
||||
|
||||
type FactSeq = POSIXTime
|
||||
|
||||
data Fact =
|
||||
Fact
|
||||
{ factWritten :: Maybe FactSeq
|
||||
, factE :: FactE
|
||||
}
|
||||
deriving (Eq,Ord,Show,Data,Generic)
|
||||
|
||||
instance Hashable FactE
|
||||
instance Hashable Fact
|
||||
|
||||
type TrackedFiles = Vector TrackedFile
|
||||
|
||||
data NCQStorage2 =
|
||||
NCQStorage2
|
||||
{ ncqRoot :: FilePath
|
||||
, ncqGen :: Int
|
||||
, ncqSalt :: HashRef
|
||||
, ncqPostponeMerge :: Timeout 'Seconds
|
||||
, ncqPostponeSweep :: Timeout 'Seconds
|
||||
, ncqLuckyNum :: Int
|
||||
, ncqFsync :: Int
|
||||
, ncqWriteQLen :: Int
|
||||
, ncqWriteBlock :: Int
|
||||
, ncqMinLog :: Int
|
||||
, ncqMaxLog :: Int
|
||||
, ncqMaxCached :: Int
|
||||
, ncqIdleThrsh :: Double
|
||||
, ncqMemTable :: Vector Shard
|
||||
, ncqWriteQ :: TVar (Seq HashRef)
|
||||
, ncqWriteOps :: Vector (TQueue (IO ()))
|
||||
, ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location))
|
||||
, ncqStorageTasks :: TVar Int
|
||||
, ncqStorageStopReq :: TVar Bool
|
||||
, ncqStorageSyncReq :: TVar Bool
|
||||
, ncqMergeReq :: TVar Bool
|
||||
, ncqMergeSem :: TSem
|
||||
, ncqSyncNo :: TVar Int
|
||||
, ncqCurrentFiles :: TVar (HashSet FileKey)
|
||||
, ncqTrackedFiles :: TVar TrackedFiles
|
||||
, ncqStateVersion :: TVar StateVersion
|
||||
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
|
||||
, ncqStateName :: TVar (Maybe StateFile)
|
||||
, ncqStateSem :: TSem
|
||||
, ncqCachedEntries :: TVar Int
|
||||
, ncqWrites :: TVar Int
|
||||
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
||||
, ncqJobQ :: TQueue (IO ())
|
||||
, ncqMiscSem :: TSem
|
||||
, ncqSweepSem :: TSem
|
||||
, ncqMergeTasks :: TVar Int
|
||||
, ncqOnRunWriteIdle :: TVar (IO ())
|
||||
|
||||
, ncqFactFiles :: TVar (HashSet FileKey)
|
||||
, ncqFacts :: TVar (HashSet Fact)
|
||||
}
|
||||
|
||||
megabytes :: forall a . Integral a => a
|
||||
megabytes = 1024 ^ 2
|
||||
|
||||
gigabytes :: forall a . Integral a => a
|
||||
gigabytes = 1024 ^ 3
|
||||
|
||||
ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2
|
||||
ncqStorageOpen2 fp upd = do
|
||||
|
@ -260,6 +126,8 @@ ncqStorageOpen2 fp upd = do
|
|||
|
||||
ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList
|
||||
|
||||
ncqRndGen <- liftIO MWC.createSystemRandom
|
||||
|
||||
let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk"
|
||||
|
||||
let ncq = NCQStorage2{..} & upd
|
||||
|
@ -284,61 +152,6 @@ ncqWithStorage fp action = flip runContT pure do
|
|||
wait w
|
||||
pure r
|
||||
|
||||
ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath
|
||||
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
|
||||
|
||||
ncqGetWorkDir :: NCQStorage2 -> FilePath
|
||||
ncqGetWorkDir NCQStorage2{..} = ncqRoot </> show ncqGen
|
||||
|
||||
ncqGetLockFileName :: NCQStorage2 -> FilePath
|
||||
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
|
||||
|
||||
ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath
|
||||
ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do
|
||||
flip fix 0 $ \next i -> do
|
||||
t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime
|
||||
let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "")
|
||||
let n = ncqGetFileName me (pref <> v <> suff)
|
||||
doesFileExist n >>= \case
|
||||
False -> pure n
|
||||
True -> next (succ i)
|
||||
|
||||
ncqEmptyKey :: ByteString
|
||||
ncqEmptyKey = BS.replicate ncqKeyLen 0
|
||||
|
||||
ncqGetFactsDir :: NCQStorage2 -> FilePath
|
||||
ncqGetFactsDir me = ncqGetWorkDir me </> ".facts"
|
||||
|
||||
ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data"
|
||||
|
||||
ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewStateName me = ncqNewUniqFileName me "state-" ""
|
||||
|
||||
ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data"
|
||||
|
||||
ncqGetNewFactFileName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewFactFileName me = do
|
||||
ncqNewUniqFileName me (d </> "fact-") ".f"
|
||||
where d = ncqGetFactsDir me
|
||||
|
||||
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
ncqStorageStop2 NCQStorage2{..} = do
|
||||
atomically $ writeTVar ncqStorageStopReq True
|
||||
|
||||
ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
ncqStorageSync2 NCQStorage2{..} = do
|
||||
atomically $ writeTVar ncqStorageSyncReq True
|
||||
|
||||
ncqShardIdx :: NCQStorage2 -> HashRef -> Int
|
||||
ncqShardIdx NCQStorage2{..} h =
|
||||
fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable
|
||||
{-# INLINE ncqShardIdx #-}
|
||||
|
||||
ncqGetShard :: NCQStorage2 -> HashRef -> Shard
|
||||
ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h
|
||||
{-# INLINE ncqGetShard #-}
|
||||
|
||||
ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry)
|
||||
ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h
|
||||
|
@ -445,19 +258,7 @@ ncqEntrySize = \case
|
|||
InFossil _ _ _ size -> fromIntegral size
|
||||
InMemory bs -> fromIntegral (BS.length bs)
|
||||
|
||||
useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a
|
||||
useVersion ncq m = bracket succV predV m
|
||||
where
|
||||
succV = atomically (ncqStateUseSTM ncq)
|
||||
predV = const $ atomically (ncqStateUnuseSTM ncq)
|
||||
|
||||
ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry)))
|
||||
ncqListTrackedFilesSTM NCQStorage2{..} = do
|
||||
fs <- readTVar ncqTrackedFiles
|
||||
for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached
|
||||
|
||||
ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry)))
|
||||
ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM
|
||||
|
||||
|
||||
ncqPreloadIndexes :: MonadUnliftIO m
|
||||
|
@ -551,21 +352,6 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do
|
|||
go 0 0 mempty
|
||||
|
||||
|
||||
ncqLookupIndex :: MonadUnliftIO m
|
||||
=> HashRef
|
||||
-> (ByteString, NWayHash)
|
||||
-> m (Maybe NCQIdxEntry )
|
||||
ncqLookupIndex hx (mmaped, nway) = do
|
||||
fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx)
|
||||
{-# INLINE ncqLookupIndex #-}
|
||||
|
||||
decodeEntry :: ByteString -> NCQIdxEntry
|
||||
decodeEntry entryBs = do
|
||||
let (p,r) = BS.splitAt 8 entryBs
|
||||
let off = fromIntegral (N.word64 p)
|
||||
let size = fromIntegral (N.word32 (BS.take 4 r))
|
||||
NCQIdxEntry off size
|
||||
{-# INLINE decodeEntry #-}
|
||||
|
||||
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
|
||||
ncqLocate2 NCQStorage2{..} href = do
|
||||
|
@ -704,6 +490,12 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
|
|||
|
||||
spawnActivity factsDB
|
||||
|
||||
spawnActivity $ postponed 20 $ forever do
|
||||
ema <- readTVarIO ncqWriteEMA
|
||||
when (ema < 50 ) do
|
||||
ncqKeyNumIntersectionProbe ncq
|
||||
|
||||
pause @'Seconds 10
|
||||
|
||||
ContT $ bracket none $ const $ liftIO do
|
||||
fhh <- atomically (STM.flushTQueue closeQ)
|
||||
|
@ -1089,18 +881,6 @@ ncqWaitTasks NCQStorage2{..} = atomically do
|
|||
tno <- readTVar ncqStorageTasks
|
||||
when (tno > 0) STM.retry
|
||||
|
||||
ncqStateUseSTM :: NCQStorage2 -> STM ()
|
||||
ncqStateUseSTM NCQStorage2{..} = do
|
||||
k <- readTVar ncqStateVersion <&> fromIntegral
|
||||
modifyTVar ncqStateUsage (IntMap.update (Just . over _1 succ) k)
|
||||
{-# INLINE ncqStateUseSTM #-}
|
||||
|
||||
ncqStateUnuseSTM :: NCQStorage2 -> STM ()
|
||||
ncqStateUnuseSTM NCQStorage2{..} = do
|
||||
k <- readTVar ncqStateVersion <&> fromIntegral
|
||||
-- TODO: remove when n <= 0
|
||||
modifyTVar ncqStateUsage (IntMap.update (Just . over _1 pred) k)
|
||||
{-# INLINE ncqStateUnuseSTM #-}
|
||||
|
||||
ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool
|
||||
ncqStateUpdate me@NCQStorage2{..} ops' = withSem ncqStateSem $ flip runContT pure $ callCC \exit -> do
|
||||
|
@ -1633,14 +1413,5 @@ appendTailSection fh = liftIO do
|
|||
{-# INLINE appendTailSection #-}
|
||||
|
||||
|
||||
withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a
|
||||
withSem sem m = bracket enter leave (const m)
|
||||
where enter = atomically (waitTSem sem)
|
||||
leave = const $ atomically (signalTSem sem)
|
||||
|
||||
isNotPending :: Maybe CachedEntry -> Bool
|
||||
isNotPending = \case
|
||||
Just (PendingEntry {}) -> False
|
||||
_ -> True
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
module HBS2.Storage.NCQ2.Internal
|
||||
( module HBS2.Storage.NCQ2.Internal
|
||||
, module Export
|
||||
)where
|
||||
|
||||
import HBS2.Storage.NCQ2.Internal.Types as Export
|
||||
import HBS2.Storage.NCQ2.Internal.Probes as Export
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
{-# Language RecordWildCards #-}
|
||||
module HBS2.Storage.NCQ2.Internal.Probes where
|
||||
|
||||
import HBS2.Prelude
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
import HBS2.Misc.PrettyStuff
|
||||
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
|
||||
import HBS2.Storage.NCQ2.Internal.Types
|
||||
import HBS2.Storage.NCQ.Types
|
||||
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.Coerce
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.List qualified as List
|
||||
import Data.Maybe
|
||||
import Data.Vector ((!))
|
||||
import Data.Vector qualified as V
|
||||
import Lens.Micro.Platform
|
||||
import System.Random.MWC qualified as MWC
|
||||
import UnliftIO
|
||||
|
||||
ncqKeyNumIntersectionProbe :: MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
ncqKeyNumIntersectionProbe me@NCQStorage2{..} = useVersion me $ const $ void $ runMaybeT do
|
||||
|
||||
-- Фильтруем pending
|
||||
files0 <- lift (ncqListTrackedFiles me)
|
||||
let files = V.toList $ V.filter (isNotPending . view _2) files0
|
||||
|
||||
when (length files < 2) mzero
|
||||
|
||||
(a,b) <- liftIO $ fix \next -> do
|
||||
i <- MWC.uniformRM (0, length files - 1) ncqRndGen
|
||||
j <- MWC.uniformRM (0, length files - 1) ncqRndGen
|
||||
if i == j then next else pure (files !! min i j, files !! max i j)
|
||||
|
||||
let fka = view _1 a
|
||||
let fkb = view _1 b
|
||||
let key = FactKey $ coerce $ hashObject @HbSync $ serialise $ List.sort [fka, fkb]
|
||||
|
||||
known <- lift (readTVarIO ncqFacts <&> HM.member key)
|
||||
when known mzero
|
||||
|
||||
let fIndexA = ncqGetFileName me (toFileName (IndexFile fka))
|
||||
let fIndexB = ncqGetFileName me (toFileName (IndexFile fkb))
|
||||
|
||||
idxPair' <- liftIO $ try @_ @IOException do
|
||||
(,) <$> nwayHashMMapReadOnly fIndexA
|
||||
<*> nwayHashMMapReadOnly fIndexB
|
||||
|
||||
((bs1,n1),(bs2,n2)) <- case idxPair' of
|
||||
Right (Just x, Just y) -> pure (x,y)
|
||||
_ -> warn ("can't load index pair" <+> pretty (fka, fkb)) >> mzero
|
||||
|
||||
n <- liftIO $ do
|
||||
ref <- newTVarIO 0
|
||||
nwayHashScanAll n1 bs1 $ \_ k _ -> do
|
||||
here <- ncqLookupIndex (coerce k) (bs2,n2)
|
||||
when (isJust here) $ atomically $ modifyTVar' ref (+1)
|
||||
readTVarIO ref
|
||||
|
||||
debug $ yellow "ncqKeyNumIntersectionProbe"
|
||||
<+> pretty fka <+> pretty fkb <+> pretty n
|
||||
|
||||
|
|
@ -0,0 +1,275 @@
|
|||
{-# Language RecordWildCards #-}
|
||||
module HBS2.Storage.NCQ2.Internal.Types where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Base58
|
||||
import HBS2.Net.Auth.Credentials
|
||||
import HBS2.Storage
|
||||
import HBS2.Misc.PrettyStuff
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import HBS2.Data.Log.Structured.SD
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
|
||||
import HBS2.Storage.NCQ.Types
|
||||
|
||||
import Data.Config.Suckless.System
|
||||
|
||||
import Numeric (showHex)
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Control.Concurrent.STM.TSem
|
||||
import Data.IntMap qualified as IntMap
|
||||
import Data.IntMap (IntMap)
|
||||
import Data.Sequence qualified as Seq
|
||||
import Data.Sequence (Seq(..), (|>),(<|))
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.Coerce
|
||||
import Data.Word
|
||||
import Data.Vector qualified as V
|
||||
import Data.Vector (Vector, (!))
|
||||
import Lens.Micro.Platform
|
||||
import Data.HashSet (HashSet)
|
||||
import System.FilePath.Posix
|
||||
|
||||
import Control.Monad.ST
|
||||
import System.Random.MWC as MWC
|
||||
|
||||
import UnliftIO
|
||||
|
||||
|
||||
type FOff = Word64
|
||||
|
||||
data NCQEntry =
|
||||
NCQEntry
|
||||
{ ncqEntryData :: !ByteString
|
||||
, ncqDumped :: !(TVar (Maybe FileKey))
|
||||
}
|
||||
|
||||
type Shard = TVar (HashMap HashRef NCQEntry)
|
||||
|
||||
type NCQOffset = Word64
|
||||
type NCQSize = Word32
|
||||
|
||||
type StateVersion = Word64
|
||||
|
||||
data NCQIdxEntry =
|
||||
NCQIdxEntry {-# UNPACK#-} !NCQOffset !NCQSize
|
||||
|
||||
data StateOP = D FileKey | F TimeSpec FileKey | P FileKey
|
||||
deriving (Eq,Ord,Show)
|
||||
|
||||
data NCQFlag =
|
||||
NCQMergeNow | NCQCompactNow
|
||||
deriving (Eq,Ord,Generic)
|
||||
|
||||
data Location =
|
||||
InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize
|
||||
| InMemory {-# UNPACK #-} !ByteString
|
||||
|
||||
instance Pretty Location where
|
||||
pretty = \case
|
||||
InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s
|
||||
InMemory _ -> "in-memory"
|
||||
|
||||
data TrackedFile =
|
||||
TrackedFile
|
||||
{ tfTime :: FilePrio
|
||||
, tfKey :: FileKey
|
||||
, tfCached :: TVar (Maybe CachedEntry)
|
||||
}
|
||||
|
||||
data FactE = KeyIntersection FileKey FileKey Int
|
||||
deriving (Eq,Ord,Show,Generic)
|
||||
|
||||
type FactSeq = POSIXTime
|
||||
|
||||
newtype FactKey =
|
||||
FactKey ByteString
|
||||
deriving newtype (Eq,Ord,Hashable)
|
||||
|
||||
data Fact =
|
||||
Facot
|
||||
{ factWritten :: Maybe FactSeq
|
||||
, factE :: FactE
|
||||
}
|
||||
deriving (Eq,Ord,Show,Generic)
|
||||
|
||||
instance Hashable FactE
|
||||
instance Hashable Fact
|
||||
|
||||
type TrackedFiles = Vector TrackedFile
|
||||
|
||||
data NCQStorage2 =
|
||||
NCQStorage2
|
||||
{ ncqRoot :: FilePath
|
||||
, ncqGen :: Int
|
||||
, ncqSalt :: HashRef
|
||||
, ncqPostponeMerge :: Timeout 'Seconds
|
||||
, ncqPostponeSweep :: Timeout 'Seconds
|
||||
, ncqLuckyNum :: Int
|
||||
, ncqFsync :: Int
|
||||
, ncqWriteQLen :: Int
|
||||
, ncqWriteBlock :: Int
|
||||
, ncqMinLog :: Int
|
||||
, ncqMaxLog :: Int
|
||||
, ncqMaxCached :: Int
|
||||
, ncqIdleThrsh :: Double
|
||||
, ncqMemTable :: Vector Shard
|
||||
, ncqWriteQ :: TVar (Seq HashRef)
|
||||
, ncqWriteOps :: Vector (TQueue (IO ()))
|
||||
, ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location))
|
||||
, ncqStorageTasks :: TVar Int
|
||||
, ncqStorageStopReq :: TVar Bool
|
||||
, ncqStorageSyncReq :: TVar Bool
|
||||
, ncqMergeReq :: TVar Bool
|
||||
, ncqMergeSem :: TSem
|
||||
, ncqSyncNo :: TVar Int
|
||||
, ncqCurrentFiles :: TVar (HashSet FileKey)
|
||||
, ncqTrackedFiles :: TVar TrackedFiles
|
||||
, ncqStateVersion :: TVar StateVersion
|
||||
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
|
||||
, ncqStateName :: TVar (Maybe StateFile)
|
||||
, ncqStateSem :: TSem
|
||||
, ncqCachedEntries :: TVar Int
|
||||
, ncqWrites :: TVar Int
|
||||
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
||||
, ncqJobQ :: TQueue (IO ())
|
||||
, ncqMiscSem :: TSem
|
||||
, ncqSweepSem :: TSem
|
||||
, ncqMergeTasks :: TVar Int
|
||||
, ncqOnRunWriteIdle :: TVar (IO ())
|
||||
|
||||
, ncqFactFiles :: TVar (HashSet FileKey)
|
||||
, ncqFacts :: TVar (HashMap FactKey Fact)
|
||||
, ncqRndGen :: Gen RealWorld
|
||||
}
|
||||
|
||||
megabytes :: forall a . Integral a => a
|
||||
megabytes = 1024 ^ 2
|
||||
|
||||
gigabytes :: forall a . Integral a => a
|
||||
gigabytes = 1024 ^ 3
|
||||
|
||||
|
||||
ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath
|
||||
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
|
||||
|
||||
ncqGetWorkDir :: NCQStorage2 -> FilePath
|
||||
ncqGetWorkDir NCQStorage2{..} = ncqRoot </> show ncqGen
|
||||
|
||||
ncqGetLockFileName :: NCQStorage2 -> FilePath
|
||||
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
|
||||
|
||||
ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath
|
||||
ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do
|
||||
flip fix 0 $ \next i -> do
|
||||
t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime
|
||||
let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "")
|
||||
let n = ncqGetFileName me (pref <> v <> suff)
|
||||
doesFileExist n >>= \case
|
||||
False -> pure n
|
||||
True -> next (succ i)
|
||||
|
||||
ncqEmptyKey :: ByteString
|
||||
ncqEmptyKey = BS.replicate ncqKeyLen 0
|
||||
|
||||
ncqGetFactsDir :: NCQStorage2 -> FilePath
|
||||
ncqGetFactsDir me = ncqGetWorkDir me </> ".facts"
|
||||
|
||||
ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data"
|
||||
|
||||
ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewStateName me = ncqNewUniqFileName me "state-" ""
|
||||
|
||||
ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data"
|
||||
|
||||
ncqGetNewFactFileName :: MonadIO m => NCQStorage2 -> m FilePath
|
||||
ncqGetNewFactFileName me = do
|
||||
ncqNewUniqFileName me (d </> "fact-") ".f"
|
||||
where d = ncqGetFactsDir me
|
||||
|
||||
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
ncqStorageStop2 NCQStorage2{..} = do
|
||||
atomically $ writeTVar ncqStorageStopReq True
|
||||
|
||||
ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m ()
|
||||
ncqStorageSync2 NCQStorage2{..} = do
|
||||
atomically $ writeTVar ncqStorageSyncReq True
|
||||
|
||||
ncqShardIdx :: NCQStorage2 -> HashRef -> Int
|
||||
ncqShardIdx NCQStorage2{..} h =
|
||||
fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable
|
||||
{-# INLINE ncqShardIdx #-}
|
||||
|
||||
ncqGetShard :: NCQStorage2 -> HashRef -> Shard
|
||||
ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h
|
||||
{-# INLINE ncqGetShard #-}
|
||||
|
||||
|
||||
useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a
|
||||
useVersion ncq m = bracket succV predV m
|
||||
where
|
||||
succV = atomically (ncqStateUseSTM ncq)
|
||||
predV = const $ atomically (ncqStateUnuseSTM ncq)
|
||||
|
||||
|
||||
ncqStateUseSTM :: NCQStorage2 -> STM ()
|
||||
ncqStateUseSTM NCQStorage2{..} = do
|
||||
k <- readTVar ncqStateVersion <&> fromIntegral
|
||||
modifyTVar ncqStateUsage (IntMap.update (Just . over _1 succ) k)
|
||||
{-# INLINE ncqStateUseSTM #-}
|
||||
|
||||
ncqStateUnuseSTM :: NCQStorage2 -> STM ()
|
||||
ncqStateUnuseSTM NCQStorage2{..} = do
|
||||
k <- readTVar ncqStateVersion <&> fromIntegral
|
||||
-- TODO: remove when n <= 0
|
||||
modifyTVar ncqStateUsage (IntMap.update (Just . over _1 pred) k)
|
||||
{-# INLINE ncqStateUnuseSTM #-}
|
||||
|
||||
withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a
|
||||
withSem sem m = bracket enter leave (const m)
|
||||
where enter = atomically (waitTSem sem)
|
||||
leave = const $ atomically (signalTSem sem)
|
||||
|
||||
|
||||
|
||||
ncqLookupIndex :: MonadUnliftIO m
|
||||
=> HashRef
|
||||
-> (ByteString, NWayHash)
|
||||
-> m (Maybe NCQIdxEntry )
|
||||
ncqLookupIndex hx (mmaped, nway) = do
|
||||
fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx)
|
||||
{-# INLINE ncqLookupIndex #-}
|
||||
|
||||
decodeEntry :: ByteString -> NCQIdxEntry
|
||||
decodeEntry entryBs = do
|
||||
let (p,r) = BS.splitAt 8 entryBs
|
||||
let off = fromIntegral (N.word64 p)
|
||||
let size = fromIntegral (N.word32 (BS.take 4 r))
|
||||
NCQIdxEntry off size
|
||||
{-# INLINE decodeEntry #-}
|
||||
|
||||
|
||||
isNotPending :: Maybe CachedEntry -> Bool
|
||||
isNotPending = \case
|
||||
Just (PendingEntry {}) -> False
|
||||
_ -> True
|
||||
|
||||
isPending :: Maybe CachedEntry -> Bool
|
||||
isPending = not . isNotPending
|
||||
|
||||
|
||||
ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry)))
|
||||
ncqListTrackedFilesSTM NCQStorage2{..} = do
|
||||
fs <- readTVar ncqTrackedFiles
|
||||
for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached
|
||||
|
||||
ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry)))
|
||||
ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM
|
||||
|
|
@ -1788,7 +1788,7 @@ main = do
|
|||
g <- liftIO MWC.createSystemRandom
|
||||
let dir = testEnvDir
|
||||
let n = 30000
|
||||
let p = 0.15
|
||||
let p = 0.25
|
||||
|
||||
sizes <- replicateM n (uniformRM (4096, 256*1024) g)
|
||||
|
||||
|
|
Loading…
Reference in New Issue