From a49ee574de8ac9ce452397eddb26fab2134ba1e2 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 13 Jul 2025 16:20:33 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 135 ++++++++++++++++------ hbs2-tests/test/TestNCQ.hs | 26 ++++- 2 files changed, 124 insertions(+), 37 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index e72f3d42..ba1d9888 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -140,8 +140,11 @@ data NCQStorage2 = , ncqStorageTasks :: TVar Int , ncqStorageStopReq :: TVar Bool , ncqStorageSyncReq :: TVar Bool + , ncqMergeReq :: TVar Bool + , ncqMergeSem :: TSem , ncqSyncNo :: TVar Int , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) + , ncqStaged :: TVar (HashSet FileKey) , ncqStateVersion :: TVar StateVersion , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) , ncqCachedEntries :: TVar Int @@ -170,8 +173,11 @@ ncqStorageOpen2 fp upd = do ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty) ncqStorageStopReq <- newTVarIO False ncqStorageSyncReq <- newTVarIO False + ncqMergeReq <- newTVarIO False + ncqMergeSem <- atomically (newTSem 1) ncqSyncNo <- newTVarIO 0 ncqTrackedFiles <- newTVarIO HPSQ.empty + ncqStaged <- newTVarIO mempty ncqStateVersion <- newTVarIO 0 ncqStateUsage <- newTVarIO mempty ncqCachedEntries <- newTVarIO 0 @@ -402,18 +408,21 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do spawnActivity measureWPS + spawnActivity $ forever $ (>> pause @'Seconds 10) do + notice $ yellow "remove unused files" + spawnActivity $ fix \again -> (>> again) do ema <- readTVarIO ncqWriteEMA + mergeReq <- atomically $ stateTVar ncqMergeReq (,False) - if ema > ncqIdleThrsh then do + if ema > ncqIdleThrsh || mergeReq then do pause @'Seconds 2.5 else do mq <- newEmptyTMVarIO spawnJob $ do - -- merged <- ncqStorageMergeStep ncq - let merged = True + merged <- ncqMergeStep ncq atomically $ putTMVar mq merged -- TODO: detect-dead-merge @@ -461,7 +470,8 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do -- FIXME: slow! liftIO (ncqStateUpdate ncq [F 0 fk]) atomically do - writeTVar ncqStorageSyncReq False + modifyTVar ncqStaged (HS.insert fk) + writeTVar ncqStorageSyncReq False modifyTVar' ncqSyncNo succ pure 0 @@ -539,6 +549,9 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () ncqFileFastCheck fp = do + + -- debug $ "ncqFileFastCheck" <+> pretty fp + mmaped <- liftIO $ mmapFileByteString fp Nothing let size = BS.length mmaped let s = BS.drop (size - 8) mmaped & N.word64 @@ -656,10 +669,22 @@ ncqListTrackedFiles :: MonadIO m => NCQStorage2 -> m [FilePath] ncqListTrackedFiles ncq = do let wd = ncqGetWorkDir ncq dirFiles wd - >>= mapM (pure . takeBaseName) - <&> List.filter (List.isPrefixOf "fossil-") + >>= mapM (pure . takeFileName) + <&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f) <&> HS.toList . HS.fromList +ncqListStateFiles :: MonadIO m => NCQStorage2 -> m [FilePath] +ncqListStateFiles ncq = do + let wd = ncqGetWorkDir ncq + dirFiles wd + >>= mapM (pure . takeBaseName) + <&> List.filter (List.isPrefixOf "state-") + >>= mapM (\x -> (,x) <$> timespecOf x) + <&> fmap snd . List.sortOn Down + + where + timespecOf x = liftIO do + posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x) ncqLoadSomeIndexes :: MonadIO m => NCQStorage2 -> [FileKey] -> m () ncqLoadSomeIndexes ncq@NCQStorage2{..} keys = do @@ -696,38 +721,75 @@ ncqLoadIndexes ncq@NCQStorage2{..} = do <&> List.take (ncqMaxCached `div` 2) . HPSQ.keys ncqLoadSomeIndexes ncq w + ncqRepair :: MonadIO m => NCQStorage2 -> m () ncqRepair me@NCQStorage2{} = do - fossils <- ncqListTrackedFiles me + states <- ncqListStateFiles me - -- TODO: use-state - warn $ red "TODO: use state for load" + fossils <- flip fix states $ \next -> \case + [] -> do + warn $ yellow "no valid state found; start from scratch" + ncqListTrackedFiles me <&> fmap (DataFile . fromString) - for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do - let dataFile = ncqGetFileName me $ toFileName (DataFile fo) - try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case - Left e -> do - err (viaShow e) - -- TODO: try-fix-later - mv dataFile (dropExtension dataFile `addExtension` ".broken") - rm (ncqGetFileName me (toFileName (IndexFile fo))) + (s:ss) -> tryLoadState s >>= \case + Just files -> do + debug $ yellow "used state" <+> pretty s + pure files + Nothing -> do + warn $ red "inconsistent state" <+> pretty s + next ss - Right{} | i <= 1 -> do - let dataKey = DataFile (fromString fo) - idx <- doesFileExist (toFileName (IndexFile dataFile)) + mapM_ (ncqAddTrackedFile me) fossils - unless idx do - debug $ "indexing" <+> pretty (toFileName dataKey) - r <- ncqIndexFile me dataKey - debug $ "indexed" <+> pretty r - next (succ i) + void $ liftIO (ncqStateUpdate me mempty) - void $ ncqAddTrackedFile me dataKey + where - Right{} -> do - err $ "skip indexing" <+> pretty dataFile + readState path = liftIO do + keys <- BS8.readFile (ncqGetFileName me path) + <&> filter (not . BS8.null) . BS8.lines + pure $ fmap (DataFile . coerce @_ @FileKey) keys + + tryLoadState path = liftIO do + + debug $ "tryLoadState" <+> pretty path + + state <- readState path + + let checkFile fo = flip fix 0 $ \next i -> do + let dataFile = ncqGetFileName me (toFileName fo) + let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce @_ @FileKey fo))) + + -- debug $ "checkFile" <+> pretty dataFile + + try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case + + Left e -> do + err (viaShow e) + here <- doesFileExist dataFile + when here do + let broken = dropExtension dataFile `addExtension` ".broken" + mv dataFile broken + rm indexFile + warn $ red "renamed" <+> pretty dataFile <+> pretty broken + + pure False + + Right{} | i > 1 -> pure False + + Right{} -> do + exists <- doesFileExist indexFile + if exists then do + pure True + else do + debug $ "indexing" <+> pretty (toFileName fo) + r <- ncqIndexFile me fo + debug $ "indexed" <+> pretty r + next (succ i) + + results <- forM state checkFile + pure $ if and results then Just state else Nothing - void (liftIO $ ncqStateUpdate me mempty) ncqRefHash :: NCQStorage2 -> HashRef -> HashRef ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) @@ -765,7 +827,6 @@ ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do ops <- for ops' $ \case f@(F _ fk) -> do - -- let idxFile = ncqGetFileName me (toFileName $ IndexFile fk) let datFile = ncqGetFileName me (toFileName $ DataFile fk) e2 <- doesFileExist datFile @@ -812,13 +873,17 @@ ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m () ncqDumpCurrentState me@NCQStorage2{..} = do - keys <- readTVarIO ncqTrackedFiles <&> List.sort . HPSQ.keys + keys <- atomically do + a1 <- readTVar ncqTrackedFiles <&> HPSQ.keys + a2 <- readTVar ncqStaged + pure (HS.toList (a2 <> HS.fromList a1)) + name <- ncqGetNewStateName me writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys]) -- FIXME: sometime-causes-no-such-file-or-directory -ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool -ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT pure do +ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool +ncqMergeStep ncq@NCQStorage2{..} = withSem $ ncqRunTask ncq False $ flip runContT pure do liftIO do @@ -850,6 +915,10 @@ ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT where + withSem m = bracket enter leave (const m) + where enter = atomically (waitTSem ncqMergeSem) + leave = const $ atomically (signalTSem ncqMergeSem) + ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewMergeName n@NCQStorage2{} = do let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data") diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index eb2b4196..3c0bc1e2 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -697,7 +697,7 @@ testNCQ2Merge1 n TestEnv{..} = do notice $ "merge whatever possible" n <- flip fix 0 \next i -> do - N2.ncqStorageMergeStep sto >>= \case + N2.ncqMergeStep sto >>= \case False -> pure i True -> next (succ i) @@ -714,12 +714,15 @@ testNCQ2Merge1 n TestEnv{..} = do notice $ "after-merge" <+> pretty (sec3 t3) <+> pretty (HS.size w1) <+> pretty (HS.size n2) + pause @'Seconds 300 + testFilterEmulate1 :: MonadUnliftIO m - => Int + => Bool + -> Int -> TestEnv -> m () -testFilterEmulate1 n TestEnv{..} = do +testFilterEmulate1 doMerge n TestEnv{..} = do let tmp = testEnvDir let ncqDir = tmp @@ -734,6 +737,7 @@ testFilterEmulate1 n TestEnv{..} = do noHs' <- newTVarIO (mempty :: HashSet HashRef) ncqWithStorage ncqDir $ \sto -> liftIO do + for bz $ \z -> do h <- ncqPutBS sto (Just B) Nothing z atomically $ modifyTVar' hs' (HS.insert h) @@ -756,6 +760,12 @@ testFilterEmulate1 n TestEnv{..} = do ncqWithStorage ncqDir $ \sto -> liftIO do + when doMerge do + notice "merge data" + fix $ \next -> ncqMergeStep sto >>= \case + True -> next + False -> none + for_ [1..5] $ \i -> do notice $ "-- pass" <+> pretty i <+> "--" @@ -812,6 +822,8 @@ testNCQ2Repair1 TestEnv{..} = do atomically $ writeTQueue q h found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize assertBool (show $ "found-immediate" <+> pretty h) (found > 0) + + ncqWithStorage ncqDir $ \sto -> liftIO do written <- N2.ncqListTrackedFiles sto debug $ "TRACKED" <+> vcat (fmap pretty written) toDestroy <- pure (headMay written) `orDie` "no file written" @@ -826,6 +838,8 @@ testNCQ2Repair1 TestEnv{..} = do rm cq BS.appendFile f shit + notice "after destroying storage" + ncqWithStorage ncqDir $ \sto -> liftIO do hashes <- atomically (STM.flushTQueue q) for_ hashes $ \ha -> do @@ -1120,7 +1134,11 @@ main = do pause @'Seconds 120 entry $ bindMatch "test:filter:emulate-1" $ nil_ $ \case - [ LitIntVal n ] -> runTest $ testFilterEmulate1 (fromIntegral n) + [ LitIntVal n ] -> runTest $ testFilterEmulate1 False (fromIntegral n) + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:filter:emulate:merged" $ nil_ $ \case + [ LitIntVal n ] -> runTest $ testFilterEmulate1 True (fromIntegral n) e -> throwIO $ BadFormException @C (mkList e) hidden do