diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index e54a0a02..bfb5a2ba 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -29,7 +29,7 @@ ncqStorageOpen fp upd = do let ncqRoot = fp let ncqGen = 0 -- let ncqFsync = 16 * megabytes - let ncqFsync = 32 * megabytes + let ncqFsync = 16 * megabytes let ncqWriteQLen = 1024 * 4 let ncqMinLog = 512 * megabytes let ncqMaxLog = 32 * gigabytes @@ -123,6 +123,7 @@ ncqPutBlock0 sto lbs wait = do Nothing -> Just <$> work Just l | ncqIsTomb l -> Just <$> work _ -> pure (Just ohash) + -- _ -> Just <$> work where bs = LBS.toStrict lbs ohash = HashRef $ hashObject @HbSync bs @@ -288,9 +289,9 @@ instance IsTomb Location where ncqGetEntryBS :: MonadUnliftIO m => NCQStorage -> Location -> m (Maybe ByteString) ncqGetEntryBS me = \case InMemory bs -> pure $ Just bs - InFossil fk off size -> do + InFossil fk off size -> ncqWithState me $ const do try @_ @SomeException (ncqGetCachedData me fk) >>= \case - Left{} -> pure Nothing + Left e -> err (viaShow e) >> pure Nothing Right (CachedData mmap) -> do pure $ Just $ BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs index 977983f6..f82c5191 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs @@ -8,6 +8,16 @@ import System.Posix.Files qualified as PFS import Data.List qualified as List +removeFile :: MonadIO m => FilePath -> m () +removeFile fp = do + debug $ "removeFile" <+> pretty fp + rm fp + +moveFile :: MonadIO m => FilePath -> FilePath -> m () +moveFile a b = do + debug $ "moveFile" <+> pretty a <+> pretty b + mv a b + ncqGetFileName :: forall f . ToFileName f => NCQStorage -> f -> FilePath ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName (toFileName fp) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs index a3bf67bf..f81612f9 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs @@ -39,7 +39,23 @@ ncqEntryUnwrapValue v = case ncqIsMeta v of Nothing -> Left v {-# INLINE ncqEntryUnwrapValue #-} - +-- FIXME: wrong-algoritm +-- +-- контр-пример: +-- индексируем два файла с глобальным индексом, одновременно +-- (но после пробега) значение меняется в памяти и пишется индекс +-- а потом мы пишем свой индекс -- и таким образом, менее актуальное +-- значение всплывает наверх. гонка. +-- При один файл = один индекс порядок был всегда однозначен. +-- теперь же в один индекс попадают значения из разных файлов. +-- а мы какой возьмем? +-- возможно, кстати, timestamp(index) == max(timestamp(idx(a)), timestamp(idx(b))) +-- так как мы: пишем в merged файл значения, отсутствующие в индексе (и памяти -- как нам +-- кажется /т.к гонка/) +-- единственное, что нам нужно -- что бы этот индекс +-- получил таймстемп меньше, чем возможно актуальное значение. вопрос, +-- как этого добиться +-- ncqFossilMergeStep :: forall m . MonadUnliftIO m => NCQStorage -> m Bool @@ -66,7 +82,7 @@ ncqFossilMergeStep me@NCQStorage{..} = withSem ncqServiceSem $ flip runContT pu outFile <- liftIO $ emptyTempFile p tpl ContT $ bracket none $ const do - rm outFile + removeFile outFile liftIO $ withBinaryFileAtomic outFile WriteMode $ \fwh -> do fd <- handleToFd fwh @@ -102,7 +118,8 @@ ncqFossilMergeStep me@NCQStorage{..} = withSem ncqServiceSem $ flip runContT pu let newFile = ncqGetFileName me f3 - mv outFile newFile + debug $ "MOVED" <+> pretty outFile <+> pretty newFile + moveFile outFile newFile ss <- liftIO (PFS.getFileStatus newFile) <&> fromIntegral . PFS.fileSize @@ -124,7 +141,7 @@ ncqFileFastCheck fp = do -- debug $ "ncqFileFastCheck" <+> pretty fp - mmaped <- liftIO $ mmapFileByteString fp Nothing + mmaped <- liftIO $ logErr "ncqFileFastCheck" ( mmapFileByteString fp Nothing) let size = BS.length mmaped let s = BS.drop (size - 8) mmaped & N.word64 @@ -136,7 +153,7 @@ ncqFileTryRecover fp = do debug $ yellow "ncqFileTryRecover" <+> pretty fp - mmaped <- liftIO $ mmapFileByteString fp Nothing + mmaped <- liftIO $ logErr "ncqFileTryRecover" (mmapFileByteString fp Nothing) r <- flip runContT pure $ callCC \exit -> do diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index fb6e01d1..043c620e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -107,7 +107,7 @@ ncqIndexFile n fk = runMaybeT do result <- lift $ nwayWriteBatch ncqIndexAlloc dir idxTemp items - mv result dest + moveFile result dest stat <- liftIO $ PFS.getFileStatus dest let ts = PFS.modificationTimeHiRes stat @@ -190,7 +190,7 @@ ncqIndexCompactStep me@NCQStorage{..} = withSem ncqServiceSem $ flip runContT pu liftIO $ PFS.setFileTimesHiRes result ts ts fki <- ncqGetNewFileKey me IndexFile - mv result (ncqGetFileName me (IndexFile fki)) + moveFile result (ncqGetFileName me (IndexFile fki)) debug $ "state update" <+> pretty a <+> pretty b <+> "=>" <+> pretty fki ncqStateUpdate me do @@ -207,7 +207,7 @@ ncqStorageScanDataFile :: MonadIO m -> m () ncqStorageScanDataFile ncq fp' action = do let fp = ncqGetFileName ncq fp' - mmaped <- liftIO (mmapFileByteString fp Nothing) + mmaped <- liftIO $ logErr "ncqStorageScanDataFile" (mmapFileByteString fp Nothing) flip runContT pure $ callCC \exit -> do flip fix (0,mmaped) $ \next (o,bs) -> do diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs index e2a3bd8e..fdfa55b7 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs @@ -38,7 +38,7 @@ ncqGetCachedData ncq@NCQStorage{..} = where load fk = do let path = ncqGetFileName ncq (DataFile fk) - bs <- liftIO (mmapFileByteString path Nothing) + bs <- liftIO $ logErr "ncqGetCachedData" (mmapFileByteString path Nothing) pure (CachedData bs) ncqGetCachedIndex :: MonadUnliftIO m => NCQStorage -> FileKey -> m CachedIndex diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 54e7b7c5..19aee37e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -107,13 +107,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do ncqSweepFiles ncq next lsB - spawnActivity $ postponed 10 $ compactLoop 10 300 do + spawnActivity $ postponed 10 $ compactLoop 10 60 do ncqIndexCompactStep ncq - spawnActivity $ postponed 20 $ compactLoop 10 600 do + spawnActivity $ postponed 20 $ compactLoop 10 120 do ncqFossilMergeStep ncq - flip fix RunNew $ \loop -> \case RunFin -> do debug "exit storage" @@ -169,7 +168,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do RunWrite (fk, fh, w, total') -> do - let timeoutMicro = 30_000_000 + let timeoutMicro = 10_000_000 chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStopReq @@ -212,8 +211,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do fk <- ncqGetNewFileKey ncq DataFile + + ncqStateUpdate ncq (ncqStateAddDataFile fk) + let fname = ncqGetFileName ncq (DataFile fk) - touch fname + -- touch fname let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs index f096cf76..d22b5cf0 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs @@ -39,12 +39,12 @@ ncqSweepFiles me@NCQStorage{..} = withSem ncqServiceSem do for_ indexes $ \(_, k) -> unless (HS.member k live) do let fn = ncqGetFileName me (IndexFile k) debug $ yellow "REMOVING" <+> pretty (takeFileName fn) - rm fn + removeFile fn for_ fossils $ \(_, k) -> unless (HS.member k live) do let fn = ncqGetFileName me (DataFile k) debug $ yellow "REMOVING" <+> pretty (takeFileName fn) - rm fn + removeFile fn ncqSweepObsoleteStates :: forall m . MonadUnliftIO m => NCQStorage -> m () @@ -61,7 +61,7 @@ ncqSweepObsoleteStates me@NCQStorage{..} = withSem ncqServiceSem do when (f /= k && t < ts) do debug $ yellow "TO REMOVE" <+> pretty (toFileName (StateFile f)) - rm (ncqGetFileName me (StateFile f)) + removeFile (ncqGetFileName me (StateFile f)) case r of Left e -> err ("SweepStates failed" <+> viaShow e) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index b25d8720..587ff72f 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -206,3 +206,9 @@ ncqDeferredWriteOpSTM NCQStorage{..} work = do nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) writeTQueue (ncqWriteOps ! nw) work +logErr :: forall x a m . (Pretty x, MonadUnliftIO m) => x -> m a -> m a +logErr loc m = handle (\(e::SomeException) -> err (pretty loc <> ":" <> viaShow e) >> throwIO e) m + + + + diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 990a57e9..bbb2673f 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1211,7 +1211,8 @@ executable test-ncq ghc-options: hs-source-dirs: test main-is: TestNCQ.hs - other-modules: NCQTestCommon NCQ3 NCQ3.Endurance + other-modules: NCQTestCommon NCQ3 NCQ3.Endurance NCQ3.EnduranceInProc + build-depends: base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq , network diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 7bc92a56..7b8f68a8 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -37,6 +37,7 @@ import Data.Config.Suckless.System import NCQTestCommon import NCQ3.Endurance +import NCQ3.EnduranceInProc import Data.Generics.Labels import Lens.Micro.Platform @@ -720,6 +721,31 @@ ncq3Tests = do when (raw /= coerce ref) $ failure "refs:shape: last 32B != RAW_REF_KEY" + + entry $ bindMatch "test:ncq3:storage:tails" $ nil_ $ \e -> runTest $ \TestEnv{..} -> do + g <- liftIO MWC.createSystemRandom + what <- newTVarIO (mempty :: HashSet HashRef) + + ncqWithStorage testEnvDir $ \sto -> do + + replicateM_ 100 do + n <- liftIO $ uniformRM (1,1024) g + bs <- liftIO $ genRandomBS g n + ha <- putBlock (AnyStorage sto) (LBS.fromStrict bs) `orDie` "not written" + debug $ "written" <+> pretty ha <+> pretty n + atomically $ modifyTVar what (HS.insert (coerce ha)) + + notice "pause 30 sec" + pause @'Seconds 30 + + ncqWithStorage testEnvDir $ \sto -> do + hss <- readTVarIO what + for_ hss $ \h -> do + found <- hasBlock (AnyStorage sto) (coerce h) + liftIO $ assertBool (show $ "found" <+> pretty h) (isJust found) + + notice $ "all" <+> pretty (HS.size hss) <+> "found" + brief "basic full storage test" $ args [ arg "number (def: 100000)" "n" , arg "del. probability (def: 0.10)" "pD" @@ -837,6 +863,7 @@ ncq3Tests = do ncq3EnduranceTest + ncq3EnduranceTestInProc testNCQ3Concurrent1 :: MonadUnliftIO m => Bool diff --git a/hbs2-tests/test/NCQ3/Endurance.hs b/hbs2-tests/test/NCQ3/Endurance.hs index cfb938d9..ca93d714 100644 --- a/hbs2-tests/test/NCQ3/Endurance.hs +++ b/hbs2-tests/test/NCQ3/Endurance.hs @@ -254,16 +254,16 @@ ncq3EnduranceTest = do LitIntVal x -> fromIntegral x _ -> 0 - wIdle <- dbl <$> lookupValueDef (mkDouble 100.00) "w:idle" - wIdleDef <- dbl <$> lookupValueDef (mkDouble 0.25) "w:idle:def" - wPutBlk <- dbl <$> lookupValueDef (mkDouble 20.00) "w:putblk" + wIdle <- dbl <$> lookupValueDef (mkDouble 200.00) "w:idle" + wIdleDef <- dbl <$> lookupValueDef (mkDouble 0.25) "w:idle:def" + wPutBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:putblk" wGetBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:getblk" - wHasBlk <- dbl <$> lookupValueDef (mkDouble 40.00) "w:hasblk" + wHasBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:hasblk" wDelBlk <- dbl <$> lookupValueDef (mkDouble 3.00) "w:delblk" wPutRef <- dbl <$> lookupValueDef (mkDouble 5.00) "w:putref" wGetRef <- dbl <$> lookupValueDef (mkDouble 10.00) "w:getref" wDelRef <- dbl <$> lookupValueDef (mkDouble 1.00) "w:delref" - wStorm <- dbl <$> lookupValueDef (mkDouble 0.50) "w:storm" + wStorm <- dbl <$> lookupValueDef (mkDouble 0.80) "w:storm" wKill <- dbl <$> lookupValueDef (mkDouble 0.0004) "w:kill" wNum <- int <$> lookupValueDef (mkInt 10000) "w:num" @@ -385,7 +385,7 @@ ncq3EnduranceTest = do let getNextState = sampleState g dist - let defaultIdle = 0.25 :: Timeout 'Seconds + let defaultIdle = realToFrac wIdleDef :: Timeout 'Seconds idleTime <- newTVarIO defaultIdle trelaxTill <- newTVarIO 0 @@ -402,7 +402,7 @@ ncq3EnduranceTest = do getNextState >>= loop EndurancePutBlk -> do - bsize <- liftIO $ uniformRM (1, 65536) g + bsize <- liftIO $ uniformRM (1, 256*1024) g liftIO $ IO.hPrint inp ("write-random-block" <+> viaShow bsize) atomically $ modifyTVar rest pred getNextState >>= loop @@ -503,6 +503,8 @@ testEnduranceInner path = flip runContT pure $ callCC \exit -> do debug $ red "storage path" <+> pretty path + hSetBuffering stdout LineBuffering + sto <- ContT $ ncqWithStorage path forever $ callCC \again -> do @@ -528,14 +530,14 @@ testEnduranceInner path = flip runContT pure $ callCC \exit -> do [ LitIntVal n ] -> do s <- liftIO $ genRandomBS g (fromIntegral n) h <- putBlock (AnyStorage sto) (LBS.fromStrict s) >>= orThrowUser "block-not-written" - notice $ "block-written" <+> pretty h <+> pretty (BS.length s) + liftIO $ print $ "block-written" <+> pretty h <+> pretty (BS.length s) e -> throwIO (BadFormException @c (mkList e)) entry $ bindMatch "has-block" $ nil_ \case [ HashLike h ] -> do s <- hasBlock (AnyStorage sto) (coerce h) - notice $ "has-block-result" <+> pretty h <+> pretty s + liftIO $ print $ "has-block-result" <+> pretty h <+> pretty s e -> throwIO (BadFormException @c (mkList e)) @@ -543,35 +545,35 @@ testEnduranceInner path = flip runContT pure $ callCC \exit -> do [ HashLike h ] -> do s <- getBlock (AnyStorage sto) (coerce h) let hx = fmap (hashObject @HbSync) s - notice $ "get-block-result" <+> pretty h <+> pretty hx + liftIO $ print $ "get-block-result" <+> pretty h <+> pretty hx e -> throwIO (BadFormException @c (mkList e)) entry $ bindMatch "del-block" $ nil_ \case [ HashLike h ] -> do delBlock (AnyStorage sto) (coerce h) - notice $ "block-deleted" <+> pretty h + liftIO $ print $ "block-deleted" <+> pretty h e -> throwIO (BadFormException @c (mkList e)) entry $ bindMatch "set-ref" $ nil_ \case [ HashLike h, HashLike hdest ] -> lift do updateRef (AnyStorage sto) (RefAlias2 mempty h) (coerce hdest) - notice $ "ref-updated" <+> pretty h <+> pretty hdest + liftIO $ print $ "ref-updated" <+> pretty h <+> pretty hdest e -> throwIO (BadFormException @c (mkList e)) entry $ bindMatch "get-ref" $ nil_ \case [ HashLike h ] -> lift do what <- getRef (AnyStorage sto) (RefAlias2 mempty h) - notice $ "get-ref-result" <+> pretty h <+> pretty what + liftIO $ print $ "get-ref-result" <+> pretty h <+> pretty what e -> throwIO (BadFormException @c (mkList e)) entry $ bindMatch "del-ref" $ nil_ \case [ HashLike h ] -> lift do delRef (AnyStorage sto) (RefAlias2 mempty h) - notice $ "ref-deleted" <+> pretty h + liftIO $ print $ "ref-deleted" <+> pretty h e -> throwIO (BadFormException @c (mkList e)) diff --git a/hbs2-tests/test/NCQ3/EnduranceInProc.hs b/hbs2-tests/test/NCQ3/EnduranceInProc.hs new file mode 100644 index 00000000..435475e5 --- /dev/null +++ b/hbs2-tests/test/NCQ3/EnduranceInProc.hs @@ -0,0 +1,456 @@ +{-# Language AllowAmbiguousTypes #-} +{-# Language RecordWildCards #-} +{-# Language MultiWayIf #-} +module NCQ3.EnduranceInProc where + + +import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Misc.PrettyStuff +import HBS2.Clock +import HBS2.Merkle +import HBS2.Polling +import HBS2.Peer.Proto.AnyRef + +import HBS2.Storage +import HBS2.Storage.Simple +import HBS2.Storage.Operations.ByteString +import HBS2.Storage.NCQ3 +import HBS2.Storage.NCQ3.Internal.Files +import HBS2.Storage.NCQ3.Internal.Index +import HBS2.Storage.NCQ3.Internal.Fossil +import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Sweep +import HBS2.Storage.NCQ3.Internal + +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Data.Log.Structured.SD +import HBS2.Data.Log.Structured.NCQ + +import HBS2.CLI.Run.Internal.Merkle + +import Data.Config.Suckless.Syntax +import Data.Config.Suckless.Script as SC +import Data.Config.Suckless.System + +import NCQTestCommon + +import Data.Generics.Labels +import Lens.Micro.Platform +import Network.ByteOrder qualified as N +import System.TimeIt +import Data.Fixed +import Data.HashSet qualified as HS +import Data.Either +import Data.HashPSQ qualified as HPSQ +import Data.HashMap.Strict qualified as HM +import Test.Tasty.HUnit +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Ord +import Data.Set qualified as Set +import System.Random.MWC as MWC +import Control.Concurrent.STM qualified as STM +import Data.List qualified as List +import Control.Monad.Trans.Cont +import Control.Monad.Except +import System.IO.Temp qualified as Temp +import System.Environment (getExecutablePath) +import System.Process.Typed as PT +import System.IO qualified as IO +import System.IO.Error +import System.Posix.IO qualified as Posix +import GHC.IO.Handle qualified as GHC +import System.Random.Stateful +import qualified Data.Vector as V +import qualified Data.Vector.Unboxed as U +import UnliftIO +import UnliftIO.IO.File +import UnliftIO.IO as IO +import UnliftIO.Directory + +import Streaming.Prelude qualified as S + +{-HLINT ignore "Functor law"-} + + +data EnduranceFSM = + EnduranceIdle + | EndurancePutBlk + | EnduranceHasBlk + | EnduranceGetBlk + | EnduranceDelBlk + | EndurancePutRef + | EnduranceGetRef + | EnduranceDelRef + | EnduranceStorm + | EnduranceCalm + | EnduranceStop + +buildCDF :: [(s, Double)] -> (V.Vector s, U.Vector Double) +buildCDF xs = + let states = V.fromList (map fst xs) + cdf = U.fromList (scanl1 (+) (map snd xs)) + in (states, cdf) + +-- выборка по бинарному поиску +sampleState :: MonadIO m => GenIO -> (V.Vector s, U.Vector Double) -> m s +sampleState g (states,cdf) = do + let total = U.last cdf + r <- liftIO $ uniformRM (0,total) g + pure $ states V.! binarySearch cdf r + +binarySearch :: U.Vector Double -> Double -> Int +binarySearch vec x = go 0 (U.length vec - 1) + where + go l r + | l >= r = l + | otherwise = + let mid = (l+r) `div` 2 + in if x <= vec U.! mid + then go l mid + else go (mid+1) r + +-- | Pick a random key from a HashPSQ +getRandomFromPSQ :: forall k p v m . (MonadIO m, Hashable k, Ord k, Ord p) + => MWC.GenIO + -> TVar (HPSQ.HashPSQ k p v) + -> m (Maybe k) +getRandomFromPSQ g tvar = do + psq <- readTVarIO tvar + let n = HPSQ.size psq + if n == 0 + then pure Nothing + else do + dropn <- liftIO $ uniformRM (0, n-1) g + let e = fmap (view _1) . headMay $ drop dropn $ HPSQ.toList psq + pure e + + +-- | Deleted = Left (), Alive = Right size +type BlockState = Either () Integer + +-- | Deleted = Left (), Alive = Right destination +type RefState = Either () HashRef + +addHashRef :: forall m v . (MonadIO m) => GenIO -> TVar (HashPSQ HashRef Double v) -> HashRef -> v -> m () +addHashRef g what h v = do + w <- liftIO $ uniformRM (0,1.0) g + atomically do + modifyTVar what (HPSQ.insert h w v) + size <- readTVar what <&> HPSQ.size + when (size > 100000 ) do + modifyTVar what HPSQ.deleteMin + + +validateTestResult :: forall m . MonadUnliftIO m => FilePath -> m () +validateTestResult logFile = do + + blocks <- newTVarIO (mempty :: HM.HashMap HashRef BlockState) + refs <- newTVarIO (mempty :: HM.HashMap HashRef RefState) + + let dict = makeDict @C do + + -- block-written: remember size + entry $ bindMatch "block-written" $ nil_ \case + [ HashLike h, LitIntVal n ] -> + atomically $ modifyTVar blocks (HM.insert h (Right n)) + _ -> none + + -- block-deleted: mark deleted + entry $ bindMatch "block-deleted" $ nil_ \case + [ HashLike h ] -> + atomically $ modifyTVar blocks (HM.insert h (Left ())) + _ -> none + + -- has-block-result + entry $ bindMatch "has-block-result" $ nil_ \case + [ HashLike h, LitIntVal n ] -> do + really <- readTVarIO blocks <&> HM.lookup h + case really of + Just (Right n0) | n0 == n -> none + Just (Left ()) -> err $ red "has-block says present, but deleted" <+> pretty h + _ -> err $ red "has-block mismatch" <+> pretty h + + [ HashLike h ] -> do + really <- readTVarIO blocks <&> HM.lookup h + case really of + Just (Left ()) -> none + Nothing -> none + Just (Right _) -> err $ red "has-block says missing, but we have" <+> pretty h + _ -> none + + -- get-block-result + entry $ bindMatch "get-block-result" $ nil_ \case + [ HashLike h, HashLike _hx ] -> do + really <- readTVarIO blocks <&> HM.lookup h + case really of + Just (Right _) -> none + Just (Left ()) -> err $ red "get-block returned data for deleted block" <+> pretty h + Nothing -> err $ red "get-block returned data for unknown block" <+> pretty h + + [ HashLike h ] -> do + really <- readTVarIO blocks <&> HM.lookup h + case really of + Just (Right _) -> err $ red "get-block missing, but expected present" <+> pretty h + _ -> none + _ -> none + + -- ref-updated + entry $ bindMatch "ref-updated" $ nil_ \case + [ HashLike h, HashLike hdest ] -> + atomically $ modifyTVar refs (HM.insert h (Right hdest)) + _ -> none + + -- get-ref-result + entry $ bindMatch "get-ref-result" $ nil_ \case + [ HashLike h, HashLike hdest ] -> do + really <- readTVarIO refs <&> HM.lookup h + case really of + Just (Right h0) | h0 == hdest -> none + Just (Left ()) -> err $ red "get-ref returned value for deleted ref" <+> pretty h + _ -> err $ red "get-ref mismatch" <+> pretty h <+> "got" <+> pretty hdest + + [ HashLike h ] -> do + really <- readTVarIO refs <&> HM.lookup h + case really of + Just (Left ()) -> none + Nothing -> none + Just (Right _) -> err $ red "get-ref says missing, but we have" <+> pretty h + _ -> none + + -- ref-deleted + entry $ bindMatch "ref-deleted" $ nil_ \case + [ HashLike h ] -> + atomically $ modifyTVar refs (HM.insert h (Left ())) + _ -> none + + -- читаем лог построчно и скармливаем dict + rs <- lines <$> liftIO (IO.readFile logFile) + for_ rs $ \s -> case parseTop s of + Left{} -> none + Right syn -> void $ run dict syn + + -- финальная статистика + bs <- readTVarIO blocks + rs' <- readTVarIO refs + notice $ green "validate done" + <+> "blocks:" <+> pretty (length [() | Right _ <- HM.elems bs]) + <+> "deleted-blocks:" <+> pretty (length [() | Left () <- HM.elems bs]) + <+> "refs:" <+> pretty (length [() | Right _ <- HM.elems rs']) + <+> "deleted-refs:" <+> pretty (length [() | Left () <- HM.elems rs']) + +ncq3EnduranceTestInProc :: forall m . MonadUnliftIO m => MakeDictM C m () +ncq3EnduranceTestInProc = do + + entry $ bindMatch "test:ncq3:endurance:inproc" $ nil_ $ \syn -> do + + let dbl = \case + LitScientificVal x -> realToFrac x + LitIntVal x -> realToFrac x + _ -> 0.00 + + let int = \case + LitScientificVal x -> floor x + LitIntVal x -> fromIntegral x + _ -> 0 + + wIdle <- dbl <$> lookupValueDef (mkDouble 200.00) "w:idle" + wIdleDef <- dbl <$> lookupValueDef (mkDouble 0.25) "w:idle:def" + wPutBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:putblk" + wGetBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:getblk" + wHasBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:hasblk" + wDelBlk <- dbl <$> lookupValueDef (mkDouble 3.00) "w:delblk" + wPutRef <- dbl <$> lookupValueDef (mkDouble 5.00) "w:putref" + wGetRef <- dbl <$> lookupValueDef (mkDouble 10.00) "w:getref" + wDelRef <- dbl <$> lookupValueDef (mkDouble 1.00) "w:delref" + wStorm <- dbl <$> lookupValueDef (mkDouble 0.05) "w:storm" + wCalm <- dbl <$> lookupValueDef (mkDouble 0.001) "w:calm" + wNum <- int <$> lookupValueDef (mkInt 10000) "w:num" + wMaxBlk <- int <$> lookupValueDef (mkInt 262144) "w:blk" + wStormMin <- dbl <$> lookupValueDef (mkDouble 1.00) "w:stormmin" + wStormMax <- dbl <$> lookupValueDef (mkDouble 60.00) "w:stormmax" + + runTest \TestEnv{..} -> do + g <- liftIO $ MWC.createSystemRandom + + let (opts,args) = splitOpts [] syn + + let n = headDef wNum [ fromIntegral x | LitIntVal x <- args ] + + storms <- newTQueueIO + + rest <- newTVarIO n + blocks <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double () ) + refs <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double HashRef ) + killed <- newTVarIO 0 + + let getRandomBlock = liftIO $ getRandomFromPSQ g blocks + let getRandomRef = liftIO $ getRandomFromPSQ g refs + + let actions = [ (EnduranceIdle, wIdle) + , (EndurancePutBlk, wPutBlk) + , (EnduranceGetBlk, wGetBlk) + , (EnduranceHasBlk, wHasBlk) + , (EnduranceDelBlk, wDelBlk) + , (EndurancePutRef, wPutRef) + , (EnduranceGetRef, wGetRef) + , (EnduranceDelRef, wDelRef) + , (EnduranceStorm, wStorm) + , (EnduranceCalm, wCalm) + ] + + let dist = buildCDF actions -- ← подготовили один раз + + fix \recover -> handle (\(e :: IOException) -> err (viaShow e) >> pause @'Seconds 1 >> recover) do + + flip runContT pure do + + let logFile = testEnvDir "op.log" + + let + writeLog :: forall m1 . MonadIO m1 => Doc AnsiStyle -> m1 () + writeLog mess = liftIO (appendFile logFile (show $ mess <> line)) + + ContT $ withAsync $ forever do + join $ atomically (readTQueue storms) + + ContT $ withAsync $ forever do + rest <- readTVarIO rest + b <- readTVarIO blocks <&> HPSQ.size + r <- readTVarIO refs <&> HPSQ.size + k <- readTVarIO killed + + notice $ green "status" + <+> "rest:" <+> pretty rest + <+> "b:" <+> pretty b + <+> "r:" <+> pretty r + <+> "k:" <+> pretty k + + pause @'Seconds 1 + + let getNextState = sampleState g dist + + let defaultIdle = realToFrac wIdleDef :: Timeout 'Seconds + + idleTime <- newTVarIO defaultIdle + trelaxTill <- newTVarIO 0 + + sto <- ContT $ ncqWithStorage testEnvDir + + flip fix EnduranceIdle \loop -> \case + EnduranceIdle -> do + readTVarIO idleTime >>= pause + r <- readTVarIO rest + if r <= 0 then loop EnduranceStop else getNextState >>= loop + + EndurancePutBlk -> do + bsize <- liftIO $ uniformRM (1, wMaxBlk) g + bs <- LBS.fromStrict <$> liftIO (genRandomBS g bsize) + h <- liftIO $ putBlock sto bs `orDie` "can't write block" + let mess = "block-written" <+> pretty h <+> pretty (LBS.length bs) + addHashRef g blocks (coerce h) () + debug mess + writeLog mess + atomically $ modifyTVar rest pred + getNextState >>= loop + + EnduranceDelBlk -> do + blk <- getRandomBlock + for_ blk $ \h -> do + liftIO $ delBlock sto (coerce h) + let mess = "block-deleted" <+> pretty h + debug mess + writeLog mess + + getNextState >>= loop + + EnduranceHasBlk -> do + blk <- getRandomBlock + for_ blk $ \h -> do + f <- lift $ hasBlock sto (coerce h) + let mess = "has-block-result" <+> pretty h <+> pretty f + debug mess + writeLog mess + + getNextState >>= loop + + EnduranceGetBlk -> do + blk <- getRandomBlock + for_ blk $ \h -> do + mbs <- lift $ getBlock sto (coerce h) + + let mess = case mbs of + Just bs -> "get-block-result" <+> pretty h <+> pretty (hashObject @HbSync bs) + Nothing -> "get-block-result" <+> pretty h + + debug mess + writeLog mess + + getNextState >>= loop + + EndurancePutRef -> do + href <- liftIO (genRandomBS g 32) <&> HashRef . coerce + blk <- getRandomBlock + for_ blk $ \val -> do + lift $ updateRef sto (RefAlias2 mempty href) (coerce val) + addHashRef g refs href (HashRef $ hashObject @HbSync val) + let mess = "ref-updated" <+> pretty href <+> pretty val + debug mess + writeLog mess + + atomically $ modifyTVar rest pred + getNextState >>= loop + + EnduranceGetRef -> do + e <- getRandomRef + for_ e $ \h -> do + what <- lift $ getRef sto (RefAlias2 mempty h) + let mess = "get-ref-result" <+> pretty h <+> pretty what + debug mess + writeLog mess + + getNextState >>= loop + + EnduranceDelRef -> do + e <- getRandomRef + for_ e $ \h -> do + lift $ delRef sto (RefAlias2 mempty h) + let mess = "ref-deleted" <+> pretty h + debug mess + writeLog mess + + getNextState >>= loop + + EnduranceStop -> do + notice $ green "done" + notice $ "validate" <+> pretty logFile + liftIO $ validateTestResult logFile + + EnduranceCalm -> do + n <- liftIO $ uniformRM (0.5,10.00) g + debug $ "CALM" <+> pretty n + pause @'Seconds (realToFrac n) + getNextState >>= loop + + EnduranceStorm -> do + now <- getTimeCoarse + relaxTill <- readTVarIO trelaxTill + itn <- readTVarIO idleTime + if | itn < defaultIdle -> loop EnduranceIdle + | now < relaxTill -> loop EnduranceIdle + | otherwise -> do + t0 <- liftIO $ uniformRM (wStormMin,wStormMax) g + debug $ red "FIRE IN DA HOLE!" <+> pretty t0 + atomically $ writeTQueue storms do + atomically $ writeTVar idleTime 0 + pause @'Seconds (realToFrac t0) + atomically $ writeTVar idleTime defaultIdle + t1 <- getTimeCoarse + atomically $ writeTVar trelaxTill (t1 + ceiling 10e9) + getNextState >>= loop + + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 38e1dada..f4aa7256 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -696,9 +696,9 @@ main = do ncq3Tests - hidden do - internalEntries - entry $ bindMatch "#!" $ nil_ $ const none + -- hidden do + internalEntries + entry $ bindMatch "#!" $ nil_ $ const none setupLogger