From 91a0af9ee32f8e2460cb47eb502583e52d8a53d2 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 3 Jul 2025 09:08:16 +0300 Subject: [PATCH] ncq2 new writer --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 26 +- .../lib/HBS2/Storage/NCQ/Types.hs | 33 +++ hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 255 +++++++++++++++--- hbs2-tests/test/TCQ.hs | 1 + hbs2-tests/test/TestNCQ.hs | 57 ++++ 5 files changed, 303 insertions(+), 69 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index dd6a30e7..ec6a7ff1 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -112,15 +112,6 @@ data NCQStorageException = instance Exception NCQStorageException -newtype FileKey = FileKey ByteString - deriving newtype (Eq,Ord,Hashable,Show) - -instance IsString FileKey where - fromString = FileKey . BS8.pack . dropExtension . takeFileName - -instance Pretty FileKey where - pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) - newtype FilePrio = FilePrio (Down TimeSpec) deriving newtype (Eq,Ord) deriving stock (Generic,Show) @@ -1290,21 +1281,6 @@ ncqStorageInit_ check path = do pure ncq -data NCQFsckException = - NCQFsckException - deriving stock (Show,Typeable) - -instance Exception NCQFsckException - -data NCQFsckIssueType = - FsckInvalidPrefix - | FsckInvalidContent - | FsckInvalidFileSize - deriving stock (Eq,Ord,Show,Data,Generic) - -data NCQFsckIssue = - NCQFsckIssue FilePath Word64 NCQFsckIssueType - deriving stock (Eq,Ord,Show,Data,Generic) ncqFsck :: MonadUnliftIO m => FilePath -> m [NCQFsckIssue] ncqFsck fp = do @@ -1368,7 +1344,7 @@ ncqFsckOne fp = do lastOff <- readTVarIO toff unless (fromIntegral (BS.length mmaped) == lastOff) do - emit (NCQFsckIssue fp lastOff FsckInvalidFileSize) + emit (NCQFsckIssue fp lastOff (FsckInvalidFileSize (fromIntegral lastOff))) tombs <- readTVarIO ttombs <&> realToFrac total <- readTVarIO ttotal <&> realToFrac diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 739a3d12..23b941de 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -6,8 +6,13 @@ import HBS2.Hash import Data.ByteString (ByteString) import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 import Network.ByteOrder qualified as N import Data.Coerce +import System.FilePath +import Data.Word +import Data.Data +import Control.Exception -- Log structure: -- (SD)* @@ -17,6 +22,17 @@ import Data.Coerce -- PREFIX ::= BYTESTRING(4) -- DATA ::= BYTESTRING(n) | n == S - LEN(WORD32) - LEN(HASH) - LEN(PREFIX) +newtype FileKey = FileKey ByteString + deriving newtype (Eq,Ord,Hashable,Show) + +instance IsString FileKey where + fromString = FileKey . BS8.pack . dropExtension . takeFileName + +instance Pretty FileKey where + pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) + + + newtype NCQFullRecordLen a = NCQFullRecordLen a deriving newtype (Num,Enum,Integral,Real,Ord,Eq) @@ -88,3 +104,20 @@ ncqMakeSectionBS t h bs = do {-# INLINE ncqMakeSectionBS #-} +data NCQFsckException = + NCQFsckException | NCQFsckIssueExt NCQFsckIssueType + deriving stock (Show,Typeable) + +instance Exception NCQFsckException + +data NCQFsckIssueType = + FsckInvalidPrefix + | FsckInvalidContent + | FsckInvalidFileSize Integer + deriving stock (Eq,Ord,Show,Data,Generic) + +data NCQFsckIssue = + NCQFsckIssue FilePath Word64 NCQFsckIssueType + deriving stock (Eq,Ord,Show,Data,Generic) + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index b2ffef21..1611ca21 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -100,18 +100,22 @@ import UnliftIO.IO.File import System.FileLock as FL +type FOff = Word64 + data NCQEntry = - NCQEntryNew Int ByteString - | NCQEntryJustWritten Int Fd ByteString - | NCQEntrySynced Fd Word64 + NCQEntryNew Int ByteString + -- | NCQEntryWritten Int FileKey FOff (Maybe ByteString) type Shard = TVar (HashMap HashRef (TVar NCQEntry)) data NCQStorage2 = NCQStorage2 - { ncqFsync :: Int + { ncqRoot :: FilePath + , ncqGen :: Int + , ncqFsync :: Int , ncqWriteQLen :: Int , ncqWriteBlock :: Int + , ncqMinLog :: Int , ncqMemTable :: Vector Shard , ncqWriteSem :: TSem , ncqWriteQ :: TVar (Seq HashRef) @@ -120,19 +124,39 @@ data NCQStorage2 = , ncqSyncNo :: TVar Int } deriving (Generic) + +megabytes :: forall a . Integral a => a +megabytes = 1024 ^ 2 + ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 ncqStorageOpen2 fp upd = do - let ncqFsync = 16 * 1024^2 + let ncqRoot = fp + let ncqGen = 0 + let ncqFsync = 16 * megabytes let ncqWriteQLen = 1024 * 16 - let ncqWriteBlock = 4096 * 4 + let ncqMinLog = 256 * megabytes + let ncqWriteBlock = 1024 cap <- getNumCapabilities <&> fromIntegral ncqWriteQ <- newTVarIO mempty ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap) - ncqMemTable <- V.fromList <$> replicateM (max 2 (cap `div` 2)) (newTVarIO mempty) + ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty) ncqStorageStopReq <- newTVarIO False ncqStorageSyncReq <- newTVarIO False ncqSyncNo <- newTVarIO 0 - pure $ NCQStorage2{..} & upd + let ncq = NCQStorage2{..} & upd + + mkdir (ncqGetWorkDir ncq) + + pure ncq + +ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath +ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp + +ncqGetWorkDir :: NCQStorage2 -> FilePath +ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen + +ncqGetLockFileName :: NCQStorage2 -> FilePath +ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 NCQStorage2{..} = do @@ -145,9 +169,11 @@ ncqStorageSync2 NCQStorage2{..} = do ncqShardIdx :: NCQStorage2 -> HashRef -> Int ncqShardIdx NCQStorage2{..} h = fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable +{-# INLINE ncqShardIdx #-} ncqGetShard :: NCQStorage2 -> HashRef -> Shard ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h +{-# INLINE ncqGetShard #-} ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe (NCQEntry, TVar NCQEntry)) ncqLookupEntrySTM ncq h = do @@ -203,70 +229,211 @@ ncqAlterEntrySTM ncq h alterFn = do tve <- newTVar e modifyTVar' shard (HM.insert h tve) + +data RunSt = + RunNew + | RunWrite (FileKey, Fd, Int, Int) + | RunSync (FileKey, Fd, Int, Int, Bool) + ncqStorageRun2 :: forall m . MonadUnliftIO m => NCQStorage2 -> m () ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do jobQ <- newTQueueIO + closeQ <- newTQueueIO - fname <- liftIO $ emptyTempFile "." "datafile-.data" + closer <- ContT $ withAsync $ liftIO $ forever do + atomically (readTQueue closeQ) >>= \(fk, fh) -> do + closeFd fh + let fname = BS8.unpack (coerce fk) + -- notice $ yellow "indexing" <+> pretty fname + idx <- ncqIndexFile ncq fname + nwayHashMMapReadOnly idx >>= \case + Nothing -> err $ "can't open index" <+> pretty idx + Just (bs,nway) -> do + nwayHashScanAll nway bs $ \_ k _ -> do + unless (k == emptyKey) do + none + atomically do + ncqAlterEntrySTM ncq (coerce k) (const Nothing) - let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } - fh0 <- liftIO (PosixBase.openFd fname Posix.ReadWrite flags) - - ContT $ bracket none $ const do - liftIO $ closeFd fh0 + link closer jobz <- ContT $ withAsync $ forever (atomically (readTQueue jobQ) >>= join) link jobz - flip fix (fh0,0) $ \loop (fh,w) -> do + ContT $ bracket none $ const $ liftIO do + fhh <- atomically (STM.flushTQueue closeQ) + for_ fhh ( closeFd . snd ) - sync <- readTVarIO ncqStorageSyncReq + flip fix RunNew $ \loop -> \case - when (w > ncqFsync || sync) do - -- liftIO (appendEntry fh undefined (NCQEntryNew 0 "")) - liftIO (fileSynchronise fh) - atomically do - writeTVar ncqStorageSyncReq False - modifyTVar' ncqSyncNo succ - loop (fh,0) + RunNew -> do + stop <- readTVarIO ncqStorageStopReq + mt <- readTVarIO ncqWriteQ <&> Seq.null - chunk <- atomically do - stop <- readTVar ncqStorageStopReq - sy <- readTVar ncqStorageSyncReq - chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) + when (stop && mt) do + exit () - if | Seq.null chunk && stop -> pure $ Left () - | Seq.null chunk && not (stop || sy) -> STM.retry - | otherwise -> pure $ Right chunk + (fk,fhx) <- openNewDataFile + loop $ RunWrite (fk,fhx,0,0) - case chunk of - Left{} -> exit () - Right chu -> do - ws <- for chu $ \h -> do - atomically (ncqLookupEntrySTM ncq h) >>= \case - Just (r@(NCQEntryNew n bs),t) -> do - n <- lift (appendSection fh h bs) - atomically (writeTVar t (NCQEntryJustWritten n fh bs)) - pure n + RunSync (fk, fh, w, total, continue) -> do - _ -> pure 0 + sync <- readTVarIO ncqStorageSyncReq + let needClose = total >= ncqMinLog - loop (fh, w + sum ws) + rest <- if not (sync || needClose || w > ncqFsync) then + pure w + else liftIO do + s <- Posix.fileSize <$> Posix.getFdStatus fh + void (appendSection fh (fileTailRecord s)) + fileSynchronise fh + atomically do + writeTVar ncqStorageSyncReq False + modifyTVar' ncqSyncNo succ + pure 0 + + if | needClose && continue -> do + atomically $ writeTQueue closeQ (fk, fh) + loop RunNew + + | not continue -> exit () + + | otherwise -> loop $ RunWrite (fk, fh, rest, total) + + RunWrite (fk, fh, w, total') -> do + + chunk <- atomically do + stop <- readTVar ncqStorageStopReq + sy <- readTVar ncqStorageSyncReq + chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) + + if | Seq.null chunk && stop -> pure $ Left () + | Seq.null chunk && not (stop || sy) -> STM.retry + | otherwise -> pure $ Right chunk + + case chunk of + Left{} -> loop $ RunSync (fk, fh, w, total', False) -- exit () + Right chu -> do + ws <- for chu $ \h -> do + atomically (ncqLookupEntrySTM ncq h) >>= \case + Just (r@(NCQEntryNew ns bs),t) -> do + lift (appendSection fh bs) + + _ -> pure 0 + + let written = sum ws + loop $ RunSync (fk, fh, w + written, total' + written, True) where + emptyKey = BS.replicate ncqKeyLen 0 + + zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload + where zeroPayload = N.bytestring64 0 + zeroHash = HashRef (hashObject zeroPayload) + {-# INLINE zeroSyncEntry #-} + + zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) + {-# INLINE zeroSyncEntrySize #-} + + -- 1. It's B-record + -- 2. It's last w64be == fileSize + -- 3. It's hash == hash (bytestring64be fileSize) + -- 4. recovery-strategy: start-to-end, end-to-start + fileTailRecord w = do + -- on open: last w64be == fileSize + let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) + let h = hashObject @HbSync paylo & coerce + ncqMakeSectionBS (Just B) h paylo + {-# INLINE fileTailRecord #-} + appendSection :: forall m . MonadUnliftIO m => Fd - -> HashRef -> ByteString - -> m Int + -> m Int -- (FOff, Int) - appendSection fh h section = do + appendSection fh section = do + -- off <- liftIO $ fdSeek fh SeekFromEnd 0 + -- pure (fromIntegral off, fromIntegral len) liftIO (Posix.fdWrite fh section) <&> fromIntegral {-# INLINE appendSection #-} + openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) + openNewDataFile = do + fname <- liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data" + touch fname + let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } + (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) + +ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () +ncqFileFastCheck fp = do + mmaped <- liftIO $ mmapFileByteString fp Nothing + let size = BS.length mmaped + let s = BS.drop (size - 8) mmaped & N.word64 + + unless ( BS.length mmaped == fromIntegral s ) do + throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s)) + + + +ncqStorageScanDataFile :: MonadIO m + => NCQStorage2 + -> FilePath + -> ( Integer -> Integer -> HashRef -> ByteString -> m () ) + -> m () +ncqStorageScanDataFile ncq fp' action = do + let fp = ncqGetFileName ncq fp' + mmaped <- liftIO (mmapFileByteString fp Nothing) + + flip runContT pure $ callCC \exit -> do + flip fix (0,mmaped) $ \next (o,bs) -> do + + when (BS.length bs < ncqSLen) $ exit () + + let w = BS.take ncqSLen bs & N.word32 & fromIntegral + + when (BS.length bs < ncqSLen + w) $ exit () + + let kv = BS.drop ncqSLen bs + + let k = BS.take ncqKeyLen kv & coerce @_ @HashRef + let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv + + lift (action o (fromIntegral w) k v) + + next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs) + + +ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> FilePath -> m FilePath +ncqIndexFile n@NCQStorage2{} fp'' = do + + let fp' = addExtension (ncqGetFileName n fp'') ".data" + + let fp = ncqGetFileName n fp' + & takeBaseName + & (`addExtension` ".cq") + & ncqGetFileName n + + trace $ "INDEX" <+> pretty fp' <+> pretty fp + + items <- S.toList_ do + ncqStorageScanDataFile n fp' $ \o w k v -> do + let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32 + let os = fromIntegral @_ @Word64 o & N.bytestring64 + let record = os <> rs + -- debug $ "write record" <+> pretty (BS.length record) + S.yield (coerce k, record) + + let (dir,name) = splitFileName fp + + result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir name items + + mv result fp + + pure fp + diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs index 6ac4697f..4c11dcd2 100644 --- a/hbs2-tests/test/TCQ.hs +++ b/hbs2-tests/test/TCQ.hs @@ -203,6 +203,7 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:poke" $ \case [ isOpaqueOf @TCQ -> Just tcq ] -> lift do ncq <- getNCQ tcq diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 17918fb1..fec8cab2 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -637,6 +637,50 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2) notice $ pretty tnn <+> pretty tt <+> pretty speed + +testNCQ2ConcurrentWriteSimple1 :: MonadUnliftIO m + => Int + -> Int + -> TestEnv + -> m () + +testNCQ2ConcurrentWriteSimple1 tn n TestEnv{..} = flip runContT pure do + + let tmp = testEnvDir + let inputDir = tmp "input" + let ncqDir = tmp "ncq-test-data" + + debug "preparing" + + mkdir inputDir + + debug $ pretty inputDir + + filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do + size <- randomRIO (64*1024, 256*1024) + w <- liftIO (randomIO :: IO Word8) + let tbs = BS.replicate size w -- replicateM size w <&> BS.pack + let ha = hashObject @HbSync tbs -- & show . pretty + let fn = inputDir show (pretty ha) + liftIO $ BS.writeFile fn tbs + pure (fn, ha, BS.length tbs) + + debug "done" + + let fnv = V.fromList filez + let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac + + -- setLoggingOff @DEBUG + + ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } ) + w <- ContT $ withAsync (ncqStorageRun2 ncq1) + + liftIO $ pooledForConcurrentlyN_ tn fnv $ \(n,ha,_) -> do + co <- BS.readFile n + ncqPutBS ncq1 (Just B) Nothing co + + liftIO $ ncqStorageStop2 ncq1 + main :: IO () main = do @@ -746,6 +790,19 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case + [ StringLike fn ] -> do + ncqFileFastCheck fn + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case + [ LitIntVal tn, LitIntVal n ] -> do + runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n) + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do debug $ "ncq:concurrent1" <+> pretty tn <+> pretty n