From ac629634c0a3f9a37baabd482c5aeafae5a0c786 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 21 Aug 2025 16:44:10 +0300 Subject: [PATCH] wip --- hbs2-peer/app/PeerMain.hs | 35 ++--- .../lib/HBS2/Storage/NCQ3/Internal.hs | 2 +- .../lib/HBS2/Storage/NCQ3/Internal/Run.hs | 5 +- hbs2-tests/test/NCQ3.hs | 130 +++++++++--------- hbs2-tests/test/NCQ3/EnduranceInProc.hs | 14 +- 5 files changed, 103 insertions(+), 83 deletions(-) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 65afd981..1b8409d2 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -31,8 +31,8 @@ import HBS2.Net.Proto.Notify import HBS2.Peer.Proto.Mailbox import HBS2.OrDie import HBS2.Storage.Simple --- import HBS2.Storage.NCQ3 -import HBS2.Storage.NCQ +import HBS2.Storage.NCQ3 +-- import HBS2.Storage.NCQ import HBS2.Storage.Operations.Missed import HBS2.Data.Detect @@ -822,13 +822,13 @@ runPeer opts = respawnOnError opts $ flip runContT pure do -- error "STOP" - -- let ncqPath = coerce pref "ncq3" - let ncqPath = coerce pref "ncq" + let ncqPath = coerce pref "ncq3" + -- let ncqPath = coerce pref "ncq" debug $ "storage prefix:" <+> pretty ncqPath -- s <- ContT $ ncqWithStorage ncqPath - s <- lift $ ncqStorageOpen ncqPath + s <- lift $ ncqStorageOpen ncqPath id -- simpleStorageInit @HbSync (Just pref) let blk = liftIO . hasBlock s @@ -1380,7 +1380,7 @@ runPeer opts = respawnOnError opts $ flip runContT pure do , monkeys ] - -- liftIO $ ncqStorageStop s + liftIO $ ncqStorageStop s pause @'Seconds 1 -- we want to clean up all resources @@ -1399,7 +1399,7 @@ checkMigration prefix = flip runContT pure $ callCC \exit -> do already <- Sy.doesDirectoryExist migration when (L.null blocks && not already) do - -- checkNCQ1 + checkNCQ1 exit () let reason = if already then @@ -1417,14 +1417,15 @@ checkMigration prefix = flip runContT pure $ callCC \exit -> do where - -- checkNCQ1 = do - -- let ncq1Dir = prefix "ncq" - -- ncq1Here <- Sy.doesDirectoryExist ncq1Dir - -- when ncq1Here do - -- notice $ yellow "found NCQv1 storage" - -- notice $ red "Run" <+> "hbs2-peer migrate" <+> pretty prefix - -- <> line - -- <> "to migrate the storage to a new version" - -- notice $ "You may also: backup" <+> pretty ncq1Dir <+> "or move it or remove permanently" - -- liftIO exitFailure + checkNCQ1 :: ContT () m () + checkNCQ1 = do + let ncq1Dir = prefix "ncq" + ncq1Here <- Sy.doesDirectoryExist ncq1Dir + when ncq1Here do + notice $ yellow "found NCQv1 storage" + notice $ red "Run" <+> "hbs2-peer migrate" <+> pretty prefix + <> line + <> "to migrate the storage to a new version" + notice $ "You may also: backup" <+> pretty ncq1Dir <+> "or move it or remove permanently" + liftIO exitFailure diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index b49212fc..240b347e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -204,7 +204,7 @@ ncqTryLoadState :: forall m. MonadUnliftIO m => NCQStorage -> m () -ncqTryLoadState me@NCQStorage{..} = do +ncqTryLoadState me@NCQStorage{..} = withSem ncqServiceSem do stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" ) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 96affe6d..dd8c7ee0 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -86,7 +86,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do -- debug $ "NOT FOUND SHIT" <+> pretty h answer Nothing >> exit () - spawnActivity measureWPS + -- spawnActivity measureWPS spawnActivity (ncqStateUpdateLoop ncq) @@ -113,7 +113,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do ncqSweepFiles ncq next lsB - spawnActivity $ postponed 10 $ compactLoop 10 30 do + spawnActivity $ postponed 20 $ compactLoop 10 30 do ncqIndexCompactStep ncq spawnActivity $ postponed 20 $ compactLoop 10 60 do @@ -234,6 +234,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do link a pure a + measureWPS :: m () measureWPS = void $ flip fix Nothing \loop -> \case Nothing -> do w <- readTVarIO ncqWrites diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 3418b337..04dcff10 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -331,6 +331,12 @@ ncq3Tests = do race (pause @'Seconds (realToFrac seconds) >> ncqStorageStop sto) $ forever do n <- liftIO $ uniformRM (1, 256*1024) g + + p <- liftIO $ uniformRM (0.00, 1.00) g + + when (p < 0.11 ) do + none + s <- liftIO $ genRandomBS g n h <- ncqPutBS sto (Just B) Nothing s liftIO $ appendFile writtenLog (show (pretty h <+> pretty n <> line)) @@ -347,97 +353,97 @@ ncq3Tests = do self <- liftIO getExecutablePath - flip runContT pure do + replicateM_ 5 do - p <- liftIO $ uniformM @Word32 g + flip runContT pure do - let path = path0 show p + let path = path0 - p <- ContT $ withProcessWait (proc self ["debug off" - , "and" - , "test:ncq3:long-write", show (pretty seconds), path - ]) + p <- ContT $ withProcessWait (proc self ["debug off" + , "and" + , "test:ncq3:long-write", show (pretty seconds), path + ]) - pid <- liftIO (PT.getPid p) `orDie` "oopsie!" + pid <- liftIO (PT.getPid p) `orDie` "oopsie!" - delta <- liftIO $ uniformRM (0.25, s + 0.10) g + delta <- liftIO $ uniformRM (0.25, s + 0.10) g - notice $ "Run" <+> "test:ncq3:long-write" - <+> green "pid" <+> viaShow pid - <+> pretty testEnvDir - <+> pretty (sec2 s) + notice $ "Run" <+> "test:ncq3:long-write" + <+> green "pid" <+> viaShow pid + <+> pretty testEnvDir + <+> pretty (sec2 s) - pause @'Seconds (realToFrac delta) + pause @'Seconds (realToFrac delta) - void $ runProcess (proc "kill" ["-9", show pid]) + void $ runProcess (proc "kill" ["-9", show pid]) - notice $ "Killed" <+> viaShow pid <+> pretty testEnvDir <+> "at" <+> pretty (sec2 delta) + notice $ "Killed" <+> viaShow pid <+> pretty testEnvDir <+> "at" <+> pretty (sec2 delta) - pause @'Seconds 2 + pause @'Seconds 2 - lift $ ncqWithStorage path $ \sto@NCQStorage{..} -> do - let log = ncqGetFileName sto "written.log" - hashes <- liftIO (readFile log) <&> fmap words . lines + lift $ ncqWithStorage path $ \sto@NCQStorage{..} -> do + let log = ncqGetFileName sto "written.log" + hashes <- liftIO (readFile log) <&> fmap words . lines - found <- newTVarIO 0 - foundBytes <- newTVarIO 0 - missedN <- newTVarIO 0 - missedBytes <- newTVarIO 0 + found <- newTVarIO 0 + foundBytes <- newTVarIO 0 + missedN <- newTVarIO 0 + missedBytes <- newTVarIO 0 - for_ hashes $ \case - [hs, slen] -> do + for_ hashes $ \case + [hs, slen] -> do - let h = fromString hs - let s = read slen :: Int + let h = fromString hs + let s = read slen :: Int - what <- ncqLocate sto h >>= mapM (ncqGetEntryBS sto) <&> join + what <- ncqLocate sto h >>= mapM (ncqGetEntryBS sto) <&> join - case what of - Just bs -> do + case what of + Just bs -> do - ok <- case ncqEntryUnwrap bs of - (_, Left{}) -> pure False + ok <- case ncqEntryUnwrap bs of + (_, Left{}) -> pure False - (k, Right (B, bss)) -> do - let good = HashRef (hashObject @HbSync bss) == h - -- debug $ "WTF?" <+> pretty (coerce @_ @HashRef k) - -- <+> pretty good - -- <+> pretty s - -- <+> pretty (BS.length bss) - pure good + (k, Right (B, bss)) -> do + let good = HashRef (hashObject @HbSync bss) == h + -- debug $ "WTF?" <+> pretty (coerce @_ @HashRef k) + -- <+> pretty good + -- <+> pretty s + -- <+> pretty (BS.length bss) + pure good - (_,Right (_, s)) -> pure True + (_,Right (_, s)) -> pure True - if ok then do + if ok then do - atomically do - modifyTVar found succ - modifyTVar foundBytes (+s) - else do + atomically do + modifyTVar found succ + modifyTVar foundBytes (+s) + else do + atomically do + modifyTVar missedN succ + modifyTVar missedBytes (+s) + -- err $ red "Entry corrupted!" + + Nothing -> do atomically do modifyTVar missedN succ modifyTVar missedBytes (+s) - -- err $ red "Entry corrupted!" - - Nothing -> do - atomically do - modifyTVar missedN succ - modifyTVar missedBytes (+s) - _ -> error "invalid record" + _ -> error "invalid record" - f <- readTVarIO found - fb <- readTVarIO foundBytes - mb <- readTVarIO missedBytes - mn <- readTVarIO missedN + f <- readTVarIO found + fb <- readTVarIO foundBytes + mb <- readTVarIO missedBytes + mn <- readTVarIO missedN - let okay = if mb <= ncqFsync then green "OK" else red "FAIL" + let okay = if mb <= ncqFsync then green "OK" else red "FAIL" - notice $ okay <+> "(found/lost)" - <+> pretty f <+> pretty fb <+> - "/" - <+> pretty mn <+> pretty mb + notice $ okay <+> "(found/lost)" + <+> pretty f <+> pretty fb <+> + "/" + <+> pretty mn <+> pretty mb entry $ bindMatch "test:ncq3:concurrent1" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do diff --git a/hbs2-tests/test/NCQ3/EnduranceInProc.hs b/hbs2-tests/test/NCQ3/EnduranceInProc.hs index a8c11e89..9873c01c 100644 --- a/hbs2-tests/test/NCQ3/EnduranceInProc.hs +++ b/hbs2-tests/test/NCQ3/EnduranceInProc.hs @@ -77,6 +77,10 @@ import Streaming.Prelude qualified as S {-HLINT ignore "Functor law"-} +data AbortException = AbortException + deriving stock (Show, Typeable) + +instance Exception AbortException data EnduranceFSM = EnduranceIdle @@ -89,6 +93,7 @@ data EnduranceFSM = | EnduranceDelRef | EnduranceStorm | EnduranceCalm + | EnduranceAbort | EnduranceStop buildCDF :: [(s, Double)] -> (V.Vector s, U.Vector Double) @@ -274,6 +279,7 @@ ncq3EnduranceTestInProc = do wMaxBlk <- int <$> lookupValueDef (mkInt 262144) "w:blk" wStormMin <- dbl <$> lookupValueDef (mkDouble 1.00) "w:stormmin" wStormMax <- dbl <$> lookupValueDef (mkDouble 60.00) "w:stormmax" + wAbort <- dbl <$> lookupValueDef (mkDouble 0.001) "w:abort" runTest \TestEnv{..} -> do g <- liftIO $ MWC.createSystemRandom @@ -302,11 +308,12 @@ ncq3EnduranceTestInProc = do , (EnduranceDelRef, wDelRef) , (EnduranceStorm, wStorm) , (EnduranceCalm, wCalm) + , (EnduranceAbort, wAbort) ] let dist = buildCDF actions -- ← подготовили один раз - fix \recover -> handle (\(e :: IOException) -> err (viaShow e) >> pause @'Seconds 1 >> recover) do + fix \recover -> handleAny (\e -> err (viaShow e) >> pause @'Seconds 1 >> recover) do flip runContT pure do @@ -437,6 +444,11 @@ ncq3EnduranceTestInProc = do pause @'Seconds (realToFrac n) getNextState >>= loop + EnduranceAbort -> do + debug $ red "EnduranceAbort" + pause @'Seconds 0.01 + throwIO AbortException + EnduranceStorm -> do now <- getTimeCoarse relaxTill <- readTVarIO trelaxTill