This commit is contained in:
voidlizard 2025-07-08 09:42:52 +03:00
parent a1e6ff50f9
commit b36cd7f667
5 changed files with 295 additions and 86 deletions

View File

@ -98,40 +98,8 @@ import System.FileLock as FL
type NCQPerks m = MonadIO m
data NCQStorageException =
NCQStorageAlreadyExist String
| NCQStorageSeedMissed
| NCQStorageTimeout
| NCQStorageCurrentAlreadyOpen
| NCQStorageCantOpenCurrent
| NCQStorageBrokenCurrent
| NCQMergeInvariantFailed String
| NCQStorageCantLock FilePath
deriving stock (Show,Typeable)
instance Exception NCQStorageException
newtype FilePrio = FilePrio (Down TimeSpec)
deriving newtype (Eq,Ord)
deriving stock (Generic,Show)
mkFilePrio :: TimeSpec -> FilePrio
mkFilePrio = FilePrio . Down
timeSpecFromFilePrio :: FilePrio -> TimeSpec
timeSpecFromFilePrio (FilePrio what) = getDown what
{-# INLINE timeSpecFromFilePrio #-}
data CachedEntry =
CachedEntry { cachedMmapedIdx :: ByteString
, cachedMmapedData :: ByteString
, cachedNway :: NWayHash
, cachedTs :: TVar TimeSpec
}
instance Show CachedEntry where
show _ = "CachedEntry{...}"
data WQItem =
WQItem { wqNew :: Bool
@ -1526,12 +1494,6 @@ ncqStorageMergeStep ncq@NCQStorage{..} = flip runContT pure do
unless r (throwIO (NCQMergeInvariantFailed (show e)))
posixToTimeSpec :: POSIXTime -> TimeSpec
posixToTimeSpec pt =
let (s, frac) = properFraction pt :: (Integer, POSIXTime)
ns = round (frac * 1e9)
in TimeSpec (fromIntegral s) ns
-- NOTE: incremental
-- now it may became incremental if we'll

View File

@ -4,16 +4,21 @@ import HBS2.Prelude
import HBS2.Data.Types.Refs
import HBS2.Hash
import HBS2.Data.Log.Structured.NCQ
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8
import Network.ByteOrder qualified as N
import Data.Ord (Down(..))
import Data.Coerce
import System.FilePath
import Data.Word
import Data.Data
import Control.Exception
import UnliftIO (TVar)
-- Log structure:
-- (SD)*
-- S ::= word32be, section prefix
@ -22,6 +27,20 @@ import Control.Exception
-- PREFIX ::= BYTESTRING(4)
-- DATA ::= BYTESTRING(n) | n == S - LEN(WORD32) - LEN(HASH) - LEN(PREFIX)
data NCQStorageException =
NCQStorageAlreadyExist String
| NCQStorageSeedMissed
| NCQStorageTimeout
| NCQStorageCurrentAlreadyOpen
| NCQStorageCantOpenCurrent
| NCQStorageBrokenCurrent
| NCQMergeInvariantFailed String
| NCQStorageCantLock FilePath
| NCQStorageCantMapFile FilePath
deriving stock (Show,Typeable)
instance Exception NCQStorageException
newtype FileKey = FileKey ByteString
deriving newtype (Eq,Ord,Hashable,Show)
@ -31,6 +50,50 @@ instance IsString FileKey where
instance Pretty FileKey where
pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s))
newtype DataFile a = DataFile a
newtype IndexFile a = IndexFile a
class ToFileName a where
toFileName :: a -> FilePath
instance ToFileName FileKey where
toFileName = BS8.unpack . coerce
instance ToFileName (DataFile FileKey) where
toFileName (DataFile fk) = dropExtension (toFileName fk) `addExtension` ".data"
instance ToFileName (IndexFile FileKey) where
toFileName (IndexFile fk) = dropExtension (toFileName fk) `addExtension` ".cq"
instance ToFileName (DataFile FilePath) where
toFileName (DataFile fp) = dropExtension fp `addExtension` ".data"
instance ToFileName (IndexFile FilePath) where
toFileName (IndexFile fp) = dropExtension fp `addExtension` ".cq"
newtype FilePrio = FilePrio (Down TimeSpec)
deriving newtype (Eq,Ord)
deriving stock (Generic,Show)
mkFilePrio :: TimeSpec -> FilePrio
mkFilePrio = FilePrio . Down
timeSpecFromFilePrio :: FilePrio -> TimeSpec
timeSpecFromFilePrio (FilePrio what) = getDown what
{-# INLINE timeSpecFromFilePrio #-}
data CachedEntry =
CachedEntry { cachedMmapedIdx :: ByteString
, cachedMmapedData :: ByteString
, cachedNway :: NWayHash
, cachedTs :: TVar TimeSpec
}
instance Show CachedEntry where
show _ = "CachedEntry{...}"
newtype NCQFullRecordLen a =
@ -121,3 +184,10 @@ data NCQFsckIssue =
deriving stock (Eq,Ord,Show,Data,Generic)
posixToTimeSpec :: POSIXTime -> TimeSpec
posixToTimeSpec pt =
let (s, frac) = properFraction pt :: (Integer, POSIXTime)
ns = round (frac * 1e9)
in TimeSpec (fromIntegral s) ns

View File

@ -102,11 +102,9 @@ import System.FileLock as FL
type FOff = Word64
data NCQEntry =
NCQEntryNew Int ByteString
-- | NCQEntryWritten Int FileKey FOff (Maybe ByteString)
newtype NCQEntry = NCQEntry ByteString
type Shard = TVar (HashMap HashRef (TVar NCQEntry))
type Shard = TVar (HashMap HashRef NCQEntry)
data NCQStorage2 =
NCQStorage2
@ -116,12 +114,15 @@ data NCQStorage2 =
, ncqWriteQLen :: Int
, ncqWriteBlock :: Int
, ncqMinLog :: Int
, ncqMaxCached :: Int
, ncqMemTable :: Vector Shard
, ncqWriteSem :: TSem
, ncqWriteQ :: TVar (Seq HashRef)
, ncqStorageStopReq :: TVar Bool
, ncqStorageSyncReq :: TVar Bool
, ncqSyncNo :: TVar Int
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
, ncqCachedEntries :: TVar Int
} deriving (Generic)
@ -136,6 +137,7 @@ ncqStorageOpen2 fp upd = do
let ncqWriteQLen = 1024 * 16
let ncqMinLog = 256 * megabytes
let ncqWriteBlock = 1024
let ncqMaxCached = 128
cap <- getNumCapabilities <&> fromIntegral
ncqWriteQ <- newTVarIO mempty
ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap)
@ -143,12 +145,26 @@ ncqStorageOpen2 fp upd = do
ncqStorageStopReq <- newTVarIO False
ncqStorageSyncReq <- newTVarIO False
ncqSyncNo <- newTVarIO 0
ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqCachedEntries <- newTVarIO 0
let ncq = NCQStorage2{..} & upd
mkdir (ncqGetWorkDir ncq)
ncqRepair ncq
pure ncq
ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a
ncqWithStorage fp action = flip runContT pure do
sto <- lift (ncqStorageOpen2 fp id)
w <- ContT $ withAsync (ncqStorageRun2 sto)
link w
r <- lift (action sto)
lift (ncqStorageStop2 sto)
wait w
pure r
ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath
ncqGetFileName ncq fp = ncqGetWorkDir ncq </> takeFileName fp
@ -175,15 +191,8 @@ ncqGetShard :: NCQStorage2 -> HashRef -> Shard
ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h
{-# INLINE ncqGetShard #-}
ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe (NCQEntry, TVar NCQEntry))
ncqLookupEntrySTM ncq h = do
readTVar (ncqGetShard ncq h)
<&> HM.lookup h
>>= \case
Nothing -> pure Nothing
Just tv -> do
v <- readTVar tv
pure $ Just (v, tv)
ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry)
ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h
ncqPutBS :: MonadUnliftIO m
=> NCQStorage2
@ -201,34 +210,72 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do
when (not stop && filled > ncqWriteQLen) STM.retry
n <- readTVar ncqSyncNo
ncqAlterEntrySTM ncq h $ \case
Just e -> Just e
Nothing -> Just (NCQEntryNew n bs)
Nothing -> Just (NCQEntry bs)
modifyTVar' ncqWriteQ (|> h)
signalTSem ncqWriteSem
pure h
ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry)
ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) <&> fmap fst
ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash)
ncqReadEntry :: ByteString -> Word64 -> Word32 -> ByteString
ncqReadEntry mmaped off size = BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmaped
{-# INLINE ncqReadEntry #-}
ncqSearchBS :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe ByteString)
ncqSearchBS ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do
now <- getTimeCoarse
lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . coerce)
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
for_ tracked $ \(fk, prio, mCached) -> case mCached of
Just CachedEntry{..} -> do
lookupEntry href (cachedMmapedIdx, cachedNway) >>= \case
Nothing -> none
Just (offset,size) -> do
atomically $ writeTVar cachedTs now
exit (Just $ ncqReadEntry cachedMmapedData offset size)
Nothing -> do
let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk))
let dataFile = ncqGetFileName ncq (toFileName (DataFile fk))
(idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile)
>>= orThrow (NCQStorageCantMapFile indexFile)
datBs <- liftIO $ mmapFileByteString dataFile Nothing
ce <- CachedEntry idxBs datBs idxNway <$> newTVarIO now
lookupEntry href (idxBs, idxNway) >>= \case
Nothing -> none
Just (offset, size) -> do
atomically do
modifyTVar ncqTrackedFiles (HPSQ.insert fk prio (Just ce))
modifyTVar ncqCachedEntries (+1)
evictIfNeededSTM ncq (Just 1)
exit $ Just (ncqReadEntry datBs offset size)
pure mzero
where
lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do
entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= toMPlus
pure
( fromIntegral $ N.word64 (BS.take 8 entryBs)
, fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) )
ncqAlterEntrySTM :: NCQStorage2 -> HashRef -> (Maybe NCQEntry -> Maybe NCQEntry) -> STM ()
ncqAlterEntrySTM ncq h alterFn = do
let shard = ncqGetShard ncq h
readTVar shard <&> HM.lookup h >>= \case
Just tve -> do
e <- readTVar tve
case alterFn (Just e) of
Nothing -> modifyTVar' shard (HM.delete h)
Just e' -> writeTVar tve e'
Nothing -> case alterFn Nothing of
Nothing -> modifyTVar' shard (HM.delete h)
Just e -> do
tve <- newTVar e
modifyTVar' shard (HM.insert h tve)
modifyTVar shard (HM.alter alterFn h)
data RunSt =
RunNew
@ -254,9 +301,9 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
maybe1 what none $ \(fk, fh) -> do
closeFd fh
let fname = BS8.unpack (coerce fk)
-- notice $ yellow "indexing" <+> pretty fname
idx <- ncqIndexFile ncq fname
idx <- ncqIndexFile ncq (DataFile fk)
ncqAddTrackedFile ncq (DataFile fk)
nwayHashMMapReadOnly idx >>= \case
Nothing -> err $ "can't open index" <+> pretty idx
Just (bs,nway) -> do
@ -332,7 +379,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
Right chu -> do
ws <- for chu $ \h -> do
atomically (ncqLookupEntrySTM ncq h) >>= \case
Just (r@(NCQEntryNew ns bs),t) -> do
Just (NCQEntry bs) -> do
lift (appendSection fh bs)
_ -> pure 0
@ -392,7 +439,6 @@ ncqFileFastCheck fp = do
throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s))
ncqStorageScanDataFile :: MonadIO m
=> NCQStorage2
-> FilePath
@ -421,20 +467,16 @@ ncqStorageScanDataFile ncq fp' action = do
next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs)
ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> FilePath -> m FilePath
ncqIndexFile n@NCQStorage2{} fp'' = do
ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> DataFile FileKey -> m FilePath
ncqIndexFile n@NCQStorage2{} fk = do
let fp' = addExtension (ncqGetFileName n fp'') ".data"
let fp = toFileName fk & ncqGetFileName n
let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n
let fp = ncqGetFileName n fp'
& takeBaseName
& (`addExtension` ".cq")
& ncqGetFileName n
trace $ "INDEX" <+> pretty fp' <+> pretty fp
debug $ "INDEX" <+> pretty fp <+> pretty dest
items <- S.toList_ do
ncqStorageScanDataFile n fp' $ \o w k v -> do
ncqStorageScanDataFile n fp $ \o w k _ -> do
let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32
let os = fromIntegral @_ @Word64 o & N.bytestring64
let record = os <> rs
@ -442,11 +484,109 @@ ncqIndexFile n@NCQStorage2{} fp'' = do
S.yield (coerce k, record)
let (dir,name) = splitFileName fp
let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$"
result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir name items
result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items
mv result fp
pure fp
mv result dest
pure dest
ncqAddTrackedFilesSTM :: NCQStorage2 -> [(FileKey, TimeSpec)] -> STM ()
ncqAddTrackedFilesSTM NCQStorage2{..} keys = do
old <- readTVar ncqTrackedFiles
let new = flip fix (old, keys) \next -> \case
(s, []) -> s
(s, (k,ts):xs) -> next (HPSQ.insert k (FilePrio (Down ts)) Nothing s, xs)
writeTVar ncqTrackedFiles new
ncqAddTrackedFile :: MonadIO m => NCQStorage2 -> DataFile FileKey -> m ()
ncqAddTrackedFile ncq fkey = do
let fname = ncqGetFileName ncq (toFileName fkey)
stat <- liftIO $ PFS.getFileStatus fname
let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat
let fk = fromString (takeFileName fname)
atomically $ ncqAddTrackedFilesSTM ncq [(fk, ts)]
ncqAddTrackedFilesIO :: MonadIO m => NCQStorage2 -> [FilePath] -> m ()
ncqAddTrackedFilesIO ncq fps = do
tsFiles <- catMaybes <$> forM fps \fp' -> liftIO $ do
catchIOError
(do
let fp = fromString fp'
let dataFile = ncqGetFileName ncq (toFileName (DataFile fp))
stat <- getFileStatus dataFile
let ts = modificationTimeHiRes stat
pure $ Just (fp, posixToTimeSpec ts))
(\e -> do
err $ "ncqAddTrackedFilesIO: failed to stat " <+> viaShow e
pure Nothing)
atomically $ ncqAddTrackedFilesSTM ncq tsFiles
evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM ()
evictIfNeededSTM NCQStorage2{..} howMany = do
cur <- readTVar ncqCachedEntries
let need = fromMaybe (cur `div` 2) howMany
excess = max 0 (cur + need - ncqMaxCached)
when (excess > 0) do
files <- readTVar ncqTrackedFiles <&> HPSQ.toList
oldest <- forM files \case
(k, prio, Just ce) -> do
ts <- readTVar (cachedTs ce)
pure (Just (ts, k, prio))
_ -> pure Nothing
let victims =
oldest
& catMaybes
& List.sortOn (\(ts,_,_) -> ts)
& List.take excess
for_ victims $ \(_,k,prio) -> do
modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing)
modifyTVar ncqCachedEntries (subtract 1)
{- HLINT ignore "Functor law" -}
ncqListTrackedFiles :: MonadIO m => NCQStorage2 -> m [FilePath]
ncqListTrackedFiles ncq = do
let wd = ncqGetWorkDir ncq
dirFiles wd
>>= mapM (pure . takeBaseName)
<&> List.filter (List.isPrefixOf "fossil-")
<&> HS.toList . HS.fromList
ncqRepair :: MonadIO m => NCQStorage2 -> m ()
ncqRepair me@NCQStorage2{..} = do
fossils <- ncqListTrackedFiles me
debug "ncqRepair"
debug $ vcat (fmap pretty fossils)
for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do
let dataFile = ncqGetFileName me $ toFileName (DataFile fo)
try @_ @IOException (ncqFileFastCheck dataFile) >>= \case
Left e -> do
err (viaShow e)
mv fo (dropExtension fo `addExtension` ".broken")
Right{} | i <= 1 -> do
let dataKey = DataFile (fromString fo)
idx <- doesFileExist (toFileName (IndexFile dataFile))
unless idx do
debug $ "indexing" <+> pretty (toFileName dataKey)
r <- ncqIndexFile me dataKey
debug $ "indexed" <+> pretty r
next (succ i)
ncqAddTrackedFile me dataKey
Right{} -> do
err $ "skip indexing" <+> pretty dataFile

View File

@ -1220,6 +1220,7 @@ executable test-ncq
, mmap
, zstd
, unix
, mwc-random

View File

@ -21,7 +21,7 @@ import HBS2.Storage.Operations.ByteString
import HBS2.System.Logger.Simple.ANSI
import HBS2.Storage.NCQ
import HBS2.Storage.NCQ2
import HBS2.Storage.NCQ2 as N2
import HBS2.Data.Log.Structured.NCQ
import HBS2.CLI.Run.Internal.Merkle
@ -68,6 +68,8 @@ import System.IO.MMap
import System.IO qualified as IO
import System.Exit (exitSuccess, exitFailure)
import System.Random
import System.Random.MWC as MWC
import System.Random.Stateful
import System.Random.Shuffle (shuffleM)
import Safe
import Lens.Micro.Platform
@ -76,6 +78,7 @@ import System.IO.Temp qualified as Temp
import System.Mem
import UnliftIO
import UnliftIO.Async
import Test.Tasty.HUnit
import Text.InterpolatedString.Perl6 (qc)
@ -582,6 +585,37 @@ testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do
rm ncqDir
testNCQ2Simple1 :: MonadUnliftIO m
=> TestEnv
-> m ()
testNCQ2Simple1 TestEnv{..} = do
debug "testNCQ2Simple1"
let tmp = testEnvDir
let ncqDir = tmp
q <- newTQueueIO
g <- liftIO MWC.createSystemRandom
bz <- replicateM 1000 $ liftIO do
n <- (`mod` (256*1024)) <$> uniformM @Int g
uniformByteStringM n g
ncqWithStorage ncqDir $ \sto -> liftIO do
for bz $ \z -> do
h <- ncqPutBS sto (Just B) Nothing z
atomically $ writeTQueue q h
found <- ncqSearchBS sto h <&> maybe (-1) BS.length
assertBool (show $ "found-immediate" <+> pretty h) (found > 0)
ncqWithStorage ncqDir $ \sto -> liftIO do
hashes <- atomically (STM.flushTQueue q)
for_ hashes $ \ha -> do
found <- ncqSearchBS sto ha <&> maybe (-1) BS.length
assertBool (show $ "found-immediate" <+> pretty ha) (found > 0)
-- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found)
testNCQ2Concurrent1 :: MonadUnliftIO m
=> Bool
-> Int
@ -793,6 +827,8 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do
runTest testNCQ2Simple1
entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case
[ StringLike fn ] -> do