This commit is contained in:
voidlizard 2025-07-28 16:47:38 +03:00
parent a8051ca302
commit 7365aa3813
10 changed files with 259 additions and 70 deletions

View File

@ -69,6 +69,7 @@ library
HBS2.Storage.NCQ3.Internal.Memtable
HBS2.Storage.NCQ3.Internal.Index
HBS2.Storage.NCQ3.Internal.MMapCache
HBS2.Storage.NCQ3.Internal.Files
HBS2.Storage.NCQ
HBS2.Storage.NCQ2
HBS2.Storage.NCQ2.Internal

View File

@ -6,10 +6,11 @@ import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Run
import HBS2.Storage.NCQ3.Internal.Memtable
import HBS2.Storage.NCQ3.Internal.Files
import Control.Monad.Trans.Cont
import Network.ByteOrder qualified as N
import Data.HashPSQ qualified as PSQ
import Data.HashPSQ qualified as HPSQ
import Data.Vector qualified as V
import Data.HashMap.Strict qualified as HM
import Data.ByteString qualified as BS
@ -40,6 +41,7 @@ ncqStorageOpen3 fp upd = do
let ncqMaxLog = 2 * ncqMinLog
let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2
let ncqMaxCachedIndex = 16
let ncqMaxCachedData = 64
let ncqIdleThrsh = 50.0
let ncqPostponeMerge = 300.0
let ncqPostponeSweep = 2 * ncqPostponeMerge
@ -50,27 +52,32 @@ ncqStorageOpen3 fp upd = do
let shardNum = fromIntegral cap
let wopNum = 2
ncqWriteQ <- newTVarIO mempty
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
ncqMMapCachedIdx <- newTVarIO PSQ.empty
ncqStateFiles <- newTVarIO mempty
ncqStateIndex <- newTVarIO mempty
ncqStateFileSeq <- newTVarIO 0
ncqStateVersion <- newTVarIO 0
ncqStateUsage <- newTVarIO mempty
ncqWrites <- newTVarIO 0
ncqWriteEMA <- newTVarIO 0.0
ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO
ncqReadReq <- newTQueueIO
ncqAlive <- newTVarIO False
ncqStopReq <- newTVarIO False
ncqSyncReq <- newTVarIO False
ncqWriteQ <- newTVarIO mempty
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
ncqMMapCachedIdx <- newTVarIO HPSQ.empty
ncqMMapCachedData <- newTVarIO HPSQ.empty
ncqStateFiles <- newTVarIO mempty
ncqStateIndex <- newTVarIO mempty
ncqStateFileSeq <- newTVarIO 0
ncqStateVersion <- newTVarIO 0
ncqStateUsage <- newTVarIO mempty
ncqStateFacts <- newTVarIO mempty
ncqWrites <- newTVarIO 0
ncqWriteEMA <- newTVarIO 0.0
ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO
ncqReadReq <- newTQueueIO
ncqAlive <- newTVarIO False
ncqStopReq <- newTVarIO False
ncqSyncReq <- newTVarIO False
ncqOnRunWriteIdle <- newTVarIO none
ncqSyncNo <- newTVarIO 0
ncqSyncNo <- newTVarIO 0
let ncq = NCQStorage3{..} & upd
mkdir (ncqGetWorkDir ncq)
liftIO (ncqTryLoadState ncq)
pure ncq
ncqWithStorage3 :: MonadUnliftIO m => FilePath -> (NCQStorage3 -> m a) -> m a

View File

@ -0,0 +1,40 @@
module HBS2.Storage.NCQ3.Internal.Files where
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import System.Posix.Files qualified as PFS
import Data.List qualified as List
ncqGetFileName :: NCQStorage3 -> FilePath -> FilePath
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
ncqGetWorkDir :: NCQStorage3 -> FilePath
ncqGetWorkDir NCQStorage3{..} = ncqRoot </> show ncqGen
ncqGetLockFileName :: NCQStorage3 -> FilePath
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
ncqGetNewFileKey :: forall f m . (ToFileName f, MonadIO m)
=> NCQStorage3
-> ( FileKey -> f )
-> m FileKey
ncqGetNewFileKey me@NCQStorage3{..} fnameOf = fix \next -> do
n <- atomically $ stateTVar ncqStateFileSeq (\x -> (x, succ x))
here <- doesFileExist (ncqGetFileName me (toFileName $ fnameOf n))
if here then next else pure n
ncqListFilesBy :: forall m . MonadUnliftIO m => NCQStorage3 -> (FilePath -> Bool) -> m [(POSIXTime, FileKey)]
ncqListFilesBy me@NCQStorage3{..} filt = do
w <- dirFiles (ncqGetWorkDir me)
<&> filter (filt . takeFileName)
r <- for w $ \fn -> do
ts <- liftIO (PFS.getFileStatus fn) <&> PFS.modificationTimeHiRes
pure (ts, fromString (takeBaseName fn))
pure $ List.sortOn ( Down . fst ) r

View File

@ -4,6 +4,7 @@ import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Memtable
import HBS2.Storage.NCQ3.Internal.Files
import System.Posix.Files qualified as PFS
import Streaming.Prelude qualified as S
@ -79,6 +80,7 @@ ncqIndexFile n@NCQStorage3{..} fk = runMaybeT do
ncqStateUpdate n do
ncqStateAddIndexFile ts fki
ncqStateAddDataFile (coerce fk)
ncqStateAddFact (FI fk (IndexFile fki))
(bs,nw) <- toMPlus midx
@ -91,7 +93,6 @@ ncqIndexFile n@NCQStorage3{..} fk = runMaybeT do
pure dest
ncqStorageScanDataFile :: MonadIO m
=> NCQStorage3
-> FilePath

View File

@ -2,38 +2,55 @@ module HBS2.Storage.NCQ3.Internal.MMapCache where
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Files
import Data.HashPSQ as HPSQ
import System.IO.MMap
ncqGetCachedIndex :: forall m . MonadUnliftIO m
=> NCQStorage3
-> FileKey
-> m CachedIndex
ncqGetCachedIndex ncq@NCQStorage3{..} fk = do
cacheLookupOrInsert :: forall m val.
MonadUnliftIO m
=> Int -- ^ max size
-> (FileKey -> m val) -- ^ loader
-> TVar (HashPSQ FileKey CachePrio val) -- ^ the cache
-> FileKey
-> m val
cacheLookupOrInsert maxSize load cacheTVar fk = do
now <- getTimeCoarse
atomically (HPSQ.lookup fk <$> readTVar ncqMMapCachedIdx) >>= \case
Just (_, idx) -> do
atomically $ modifyTVar' ncqMMapCachedIdx (HPSQ.insert fk now idx)
pure idx
atomically (HPSQ.lookup fk <$> readTVar cacheTVar) >>= \case
Just (_, val) -> do
atomically $ modifyTVar' cacheTVar (HPSQ.insert fk now val)
pure val
Nothing -> do
val <- load fk
atomically do
old <- readTVar cacheTVar
let new =
if HPSQ.size old >= maxSize
then HPSQ.insert fk now val (HPSQ.deleteMin old)
else HPSQ.insert fk now val old
writeTVar cacheTVar new
pure val
ncqGetCachedData :: MonadUnliftIO m => NCQStorage3 -> FileKey -> m CachedData
ncqGetCachedData ncq@NCQStorage3{..} =
cacheLookupOrInsert ncqMaxCachedData load ncqMMapCachedData
where
load fk = do
let path = ncqGetFileName ncq (toFileName (DataFile fk))
bs <- liftIO (mmapFileByteString path Nothing)
pure (CachedData bs)
ncqGetCachedIndex :: MonadUnliftIO m => NCQStorage3 -> FileKey -> m CachedIndex
ncqGetCachedIndex ncq@NCQStorage3{..} =
cacheLookupOrInsert ncqMaxCachedIndex load ncqMMapCachedIdx
where
load fk = do
let path = ncqGetFileName ncq (toFileName (IndexFile fk))
nwayHashMMapReadOnly path >>= \case
Nothing -> throwIO $ NCQStorageCantMapFile path
Just (bs, nway) -> do
let new = CachedIndex bs nway
atomically do
cache <- readTVar ncqMMapCachedIdx
let cache' =
if HPSQ.size cache >= ncqMaxCachedIndex
then HPSQ.deleteMin cache
else cache
writeTVar ncqMMapCachedIdx (HPSQ.insert fk now new cache')
pure new
Just (bs, nway) -> pure (CachedIndex bs nway)
ncqDelCachedIndex :: forall m . MonadUnliftIO m
=> NCQStorage3
@ -44,3 +61,11 @@ ncqDelCachedIndex NCQStorage3{..} fk =
atomically (modifyTVar ncqMMapCachedIdx$ HPSQ.delete fk)
ncqDelCachedData :: forall m . MonadUnliftIO m
=> NCQStorage3
-> FileKey
-> m ()
ncqDelCachedData NCQStorage3{..} fk =
atomically (modifyTVar ncqMMapCachedData $ HPSQ.delete fk)

View File

@ -19,6 +19,9 @@ module HBS2.Storage.NCQ3.Internal.Prelude
, StateFile(..)
, FilePrio(..)
, NCQStorageException(..)
, NCQFsckException(..)
, NCQFsckIssue(..)
, NCQFsckIssueType(..)
, ByteString
, Vector, (!)
, Seq(..), (|>),(<|)
@ -53,7 +56,7 @@ import Data.HashPSQ (HashPSQ)
import Data.IntMap (IntMap)
import Data.Set (Set)
import Data.Ord (Down(..))
import System.IO.MMap as Exported
import UnliftIO as Exported
import UnliftIO.Concurrent as Exported

View File

@ -4,7 +4,7 @@ module HBS2.Storage.NCQ3.Internal.Run where
import HBS2.Storage.NCQ.Types hiding (FileKey)
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Files
import HBS2.Storage.NCQ3.Internal.Memtable
import HBS2.Storage.NCQ3.Internal.Index
import HBS2.Storage.NCQ3.Internal.MMapCache
@ -79,12 +79,11 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
for_ tracked $ \(_, fk) -> do
CachedIndex bs nw <- ncqGetCachedIndex ncq fk
ncqLookupIndex h (bs, nw) >>= \case
Just (IndexEntry fk o s) -> undefined >> next
Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> next
Nothing -> none
answer Nothing >> next
spawnActivity measureWPS
flip fix RunNew $ \loop -> \case

View File

@ -2,12 +2,18 @@ module HBS2.Storage.NCQ3.Internal.State where
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.Files
import Data.Config.Suckless.Script
import Data.List qualified as List
import Control.Monad.Reader
import Control.Monad.Trans.Cont
import Data.HashSet qualified as HS
import Data.Set qualified as Set
import Data.ByteString qualified as BS
import UnliftIO.IO.File
import Network.ByteOrder qualified as N
import UnliftIO.IO
import System.IO qualified as IO
@ -15,24 +21,6 @@ newtype StateOP a =
StateOP { fromStateOp :: ReaderT NCQStorage3 STM a }
deriving newtype (Functor,Applicative,Monad,MonadReader NCQStorage3)
ncqGetFileName :: NCQStorage3 -> FilePath -> FilePath
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
ncqGetWorkDir :: NCQStorage3 -> FilePath
ncqGetWorkDir NCQStorage3{..} = ncqRoot </> show ncqGen
ncqGetLockFileName :: NCQStorage3 -> FilePath
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
ncqGetNewFileKey :: forall f m . (ToFileName f, MonadIO m)
=> NCQStorage3
-> ( FileKey -> f )
-> m FileKey
ncqGetNewFileKey me@NCQStorage3{..} fnameOf = fix \next -> do
n <- atomically $ stateTVar ncqStateFileSeq (\x -> (x, succ x))
here <- doesFileExist (ncqGetFileName me (toFileName $ fnameOf n))
if here then next else pure n
{- HLINT ignore "Eta reduce"-}
ncqStateUpdate :: MonadIO m
@ -41,12 +29,13 @@ ncqStateUpdate :: MonadIO m
-> m ()
ncqStateUpdate ncq@NCQStorage3{..} action = do
snkFile <- ncqGetNewFileKey ncq StateFile <&> ncqGetFileName ncq . toFileName . StateFile
(n,i,f) <- atomically do
runReaderT (fromStateOp action) ncq
n <- readTVar ncqStateFileSeq
i <- readTVar ncqStateIndex
f <- readTVar ncqStateFiles
pure (n,i,f)
(n,i,f,facts) <- atomically do
runReaderT (fromStateOp action) ncq
n <- readTVar ncqStateFileSeq
i <- readTVar ncqStateIndex
f <- readTVar ncqStateFiles
fa <- readTVar ncqStateFacts
pure (n,i,f,fa)
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
for_ i $ \(Down p, fk) -> do
@ -55,6 +44,9 @@ ncqStateUpdate ncq@NCQStorage3{..} action = do
for_ f $ \fk -> do
IO.hPrint fh $ "f" <+> pretty fk
for_ facts $ \(FI (DataFile a) (IndexFile b)) -> do
IO.hPrint fh $ "fi" <+> pretty a <+> pretty b
IO.hPrint fh $ "n" <+> pretty n
ncqStateAddDataFile :: FileKey -> StateOP ()
@ -63,6 +55,12 @@ ncqStateAddDataFile fk = do
StateOP $ lift do
modifyTVar ncqStateFiles (HS.insert fk)
ncqStateAddFact :: Fact -> StateOP ()
ncqStateAddFact fact = do
NCQStorage3{..} <- ask
StateOP $ lift do
modifyTVar ncqStateFacts (Set.insert fact)
ncqStateAddIndexFile :: POSIXTime
-> FileKey
-> StateOP ()
@ -73,3 +71,87 @@ ncqStateAddIndexFile ts fk = do
modifyTVar' ncqStateIndex $ \xs ->
List.sortOn fst ((Down ts, fk) : xs)
ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m ()
ncqFileFastCheck fp = do
-- debug $ "ncqFileFastCheck" <+> pretty fp
mmaped <- liftIO $ mmapFileByteString fp Nothing
let size = BS.length mmaped
let s = BS.drop (size - 8) mmaped & N.word64
unless ( BS.length mmaped == fromIntegral s ) do
throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s))
ncqTryLoadState :: forall m. MonadUnliftIO m
=> NCQStorage3
-> m ()
ncqTryLoadState me@NCQStorage3{..} = do
stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" )
flip runContT pure $ callCC \exit -> do
for stateFiles $ \(_,fn) -> do
none
none
-- for_ stateFiles $ \(d,f) -> do
-- notice $ "state-file" <+> pretty (toFileName (StateFile f))
-- tryLoadState :: forall m. MonadUnliftIO m
-- => NCQStorage3
-- -> StateFile FileKey
-- -> m (Maybe (HashSet FileKey, [(Down POSIXTime, FileKey)], FileKey))
-- tryLoadState me@NCQStorage3{..} fk = do
-- debug $ "tryLoadState" <+> pretty fk
-- (fset, idxList, n) <- liftIO (readState fk)
-- let checkFile :: DataFile FileKey -> m Bool
-- checkFile fo = flip fix 0 \next (i :: Int) -> do
-- let dataFile = ncqGetFileName me (toFileName fo)
-- let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce fo)))
-- doesFileExist dataFile >>= \case
-- False -> do
-- rm indexFile
-- pure False
-- True -> do
-- try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case
-- Left e -> do
-- err (viaShow e)
-- stillThere <- doesFileExist dataFile
-- when stillThere do
-- let broken = dropExtension dataFile `addExtension` ".broken"
-- mv dataFile broken
-- rm indexFile
-- warn $ red "renamed" <+> pretty dataFile <+> pretty broken
-- pure False
-- Right{} | i > 1 -> pure False
-- Right{} -> do
-- exists <- doesFileExist indexFile
-- if exists
-- then pure True
-- else do
-- debug $ "indexing" <+> pretty (toFileName fo)
-- _ <- ncqIndexFile me fo
-- debug $ "indexed" <+> pretty indexFile
-- next (i + 1)
-- results <- forM (HS.toList fset) (checkFile . DataFile)
-- pure $
-- if and results
-- then Just (fset, idxList, n)
-- else Nothing

View File

@ -2,6 +2,7 @@ module HBS2.Storage.NCQ3.Internal.Types where
import HBS2.Storage.NCQ3.Internal.Prelude
import Numeric (readHex)
import Text.Printf
data CachedData = CachedData !ByteString
@ -17,8 +18,13 @@ type StateVersion = Word64
newtype FileKey = FileKey Word32
deriving newtype (Eq,Ord,Show,Num,Enum,Pretty,Hashable)
deriving stock instance Eq (DataFile FileKey)
deriving stock instance Ord (DataFile FileKey)
deriving stock instance Eq (IndexFile FileKey)
deriving stock instance Ord (IndexFile FileKey)
instance IsString FileKey where
fromString = FileKey . read
fromString = FileKey . maybe maxBound fst . headMay . readHex . drop 1 . dropWhile (/= '-') . takeBaseName
instance ToFileName (DataFile FileKey) where
toFileName (DataFile fk) = ncqMakeFossilName fk
@ -47,6 +53,10 @@ instance Pretty Location where
InFossil k o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s
InMemory _ -> "in-memory"
data Fact =
FI (DataFile FileKey) (IndexFile FileKey) -- file X has index Y
deriving stock (Eq,Ord)
data NCQStorage3 =
NCQStorage3
{ ncqRoot :: FilePath
@ -60,13 +70,16 @@ data NCQStorage3 =
, ncqMinLog :: Int
, ncqMaxLog :: Int
, ncqMaxCachedIndex :: Int
, ncqMaxCachedData :: Int
, ncqIdleThrsh :: Double
, ncqMMapCachedIdx :: TVar (HashPSQ FileKey CachePrio CachedIndex)
, ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData)
, ncqStateFiles :: TVar (HashSet FileKey)
, ncqStateIndex :: TVar [(Down POSIXTime, FileKey)] -- backward timestamp orde
, ncqStateFileSeq :: TVar FileKey
, ncqStateVersion :: TVar StateVersion
, ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey))
, ncqStateFacts :: TVar (Set Fact)
, ncqMemTable :: Vector Shard
, ncqWrites :: TVar Int
, ncqWriteEMA :: TVar Double -- for writes-per-seconds

View File

@ -46,3 +46,21 @@ ncq3Tests = do
bs <- liftIO $ genRandomBS g n
ncqPutBS sto (Just B) Nothing bs
entry $ bindMatch "test:ncq3:write-reopen" $ nil_ $ \e ->do
let (opts,args) = splitOpts [] e
let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ]
g <- liftIO MWC.createSystemRandom
runTest $ \TestEnv{..} -> do
ncqWithStorage3 testEnvDir $ \sto -> do
notice $ "write" <+> pretty num <+> "blocks"
replicateM_ num do
n <- liftIO $ uniformRM (1024, 256*1024) g
bs <- liftIO $ genRandomBS g n
ncqPutBS sto (Just B) Nothing bs
notice $ "reopen"
ncqWithStorage3 testEnvDir $ \sto -> do
pause @'Seconds 2
notice $ "done"