hbs2/hbs2-tests/test/NCQ3/Endurance.hs

668 lines
23 KiB
Haskell

{-# Language AllowAmbiguousTypes #-}
{-# Language RecordWildCards #-}
{-# Language MultiWayIf #-}
module NCQ3.Endurance where
import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Hash
import HBS2.Data.Types.Refs
import HBS2.Misc.PrettyStuff
import HBS2.Storage
import HBS2.Storage.NCQ3
import HBS2.Storage.NCQ3.Internal.Prelude
import Data.Config.Suckless.Syntax
import Data.Config.Suckless.Script as SC
import Data.Config.Suckless.System
import NCQTestCommon
import Lens.Micro.Platform
import Data.Either
import Data.HashPSQ qualified as HPSQ
import Data.HashMap.Strict qualified as HM
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as LBS
import System.Random.MWC as MWC
import Control.Monad.Trans.Cont
import System.Environment (getExecutablePath)
import System.Process.Typed as PT
import System.IO qualified as IO
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U
import Streaming.Prelude qualified as S
{-HLINT ignore "Functor law"-}
data EnduranceFSM =
EnduranceIdle
| EndurancePutBlk
| EnduranceHasBlk
| EnduranceGetBlk
| EnduranceHasSeedBlk
| EnduranceDelBlk
| EndurancePutRef
| EnduranceGetRef
| EnduranceDelRef
| EnduranceStorm
| EnduranceCalm
| EnduranceKill
| EnduranceExit
| EnduranceMerge
| EnduranceCompact
| EnduranceSweep
| EnduranceStop
buildCDF :: [(s, Double)] -> (V.Vector s, U.Vector Double)
buildCDF xs =
let states = V.fromList (map fst xs)
cdf = U.fromList (scanl1 (+) (map snd xs))
in (states, cdf)
-- выборка по бинарному поиску
sampleState :: MonadIO m => GenIO -> (V.Vector s, U.Vector Double) -> m s
sampleState g (states,cdf) = do
let total = U.last cdf
r <- liftIO $ uniformRM (0,total) g
pure $ states V.! binarySearch cdf r
binarySearch :: U.Vector Double -> Double -> Int
binarySearch vec x = go 0 (U.length vec - 1)
where
go l r
| l >= r = l
| otherwise =
let mid = (l+r) `div` 2
in if x <= vec U.! mid
then go l mid
else go (mid+1) r
-- | Pick a random key from a HashPSQ
getRandomFromPSQ :: forall k p v m . (MonadIO m, Hashable k, Ord k, Ord p)
=> MWC.GenIO
-> TVar (HPSQ.HashPSQ k p v)
-> m (Maybe k)
getRandomFromPSQ g tvar = do
psq <- readTVarIO tvar
let n = HPSQ.size psq
if n == 0
then pure Nothing
else do
dropn <- liftIO $ uniformRM (0, n-1) g
let e = fmap (view _1) . headMay $ drop dropn $ HPSQ.toList psq
pure e
toConsole :: MonadIO m => Handle -> Doc AnsiStyle -> m ()
toConsole ss doc = liftIO $ hPutDoc ss (doc <> line)
-- | Deleted = Left (), Alive = Right size
type BlockState = Either () Integer
-- | Deleted = Left (), Alive = Right destination
type RefState = Either () HashRef
validateTestResult :: forall m . MonadUnliftIO m => FilePath -> m ()
validateTestResult logFile = do
blocks <- newTVarIO (mempty :: HM.HashMap HashRef BlockState)
refs <- newTVarIO (mempty :: HM.HashMap HashRef RefState)
let dict = makeDict @C do
-- block-written: remember size
entry $ bindMatch "block-written" $ nil_ \case
[ HashLike h, LitIntVal n ] ->
atomically $ modifyTVar blocks (HM.insert h (Right n))
_ -> none
-- block-deleted: mark deleted
entry $ bindMatch "block-deleted" $ nil_ \case
[ HashLike h ] ->
atomically $ modifyTVar blocks (HM.insert h (Left ()))
_ -> none
entry $ bindMatch "has-seed-block-result" $ nil_ \case
[ HashLike _, LitIntVal _ ] -> none
[ HashLike h] -> err $ red "missed seed block (2)" <+> pretty h
_ -> none
-- has-block-result
entry $ bindMatch "has-block-result" $ nil_ \case
[ HashLike h, LitIntVal n ] -> do
really <- readTVarIO blocks <&> HM.lookup h
case really of
Just (Right n0) | n0 == n -> none
Just (Left ()) -> err $ red "has-block says present, but deleted" <+> pretty h
_ -> err $ red "has-block mismatch" <+> pretty h
[ HashLike h ] -> do
really <- readTVarIO blocks <&> HM.lookup h
case really of
Just (Left ()) -> none
Nothing -> none
Just (Right _) -> err $ red "has-block says missing, but we have" <+> pretty h
_ -> none
-- get-block-result
entry $ bindMatch "get-block-result" $ nil_ \case
[ HashLike h, HashLike _hx ] -> do
really <- readTVarIO blocks <&> HM.lookup h
case really of
Just (Right _) -> none
Just (Left ()) -> err $ red "get-block returned data for deleted block" <+> pretty h
Nothing -> err $ red "get-block returned data for unknown block" <+> pretty h
[ HashLike h ] -> do
really <- readTVarIO blocks <&> HM.lookup h
case really of
Just (Right _) -> err $ red "get-block missing, but expected present" <+> pretty h
_ -> none
_ -> none
-- ref-updated
entry $ bindMatch "ref-updated" $ nil_ \case
[ HashLike h, HashLike hdest ] ->
atomically $ modifyTVar refs (HM.insert h (Right hdest))
_ -> none
-- get-ref-result
entry $ bindMatch "get-ref-result" $ nil_ \case
[ HashLike h, HashLike hdest ] -> do
really <- readTVarIO refs <&> HM.lookup h
case really of
Just (Right h0) | h0 == hdest -> none
Just (Left ()) -> err $ red "get-ref returned value for deleted ref" <+> pretty h
_ -> err $ red "get-ref mismatch" <+> pretty h <+> "got" <+> pretty hdest
[ HashLike h ] -> do
really <- readTVarIO refs <&> HM.lookup h
case really of
Just (Left ()) -> none
Nothing -> none
Just (Right _) -> err $ red "get-ref says missing, but we have" <+> pretty h
_ -> none
-- ref-deleted
entry $ bindMatch "ref-deleted" $ nil_ \case
[ HashLike h ] ->
atomically $ modifyTVar refs (HM.insert h (Left ()))
_ -> none
entry $ bindMatch "compact" $ nil_ $ const none
entry $ bindMatch "merge" $ nil_ $ const none
entry $ bindMatch "sweep" $ nil_ $ const none
-- читаем лог построчно и скармливаем dict
rs <- lines <$> liftIO (IO.readFile logFile)
for_ rs $ \s -> case parseTop s of
Left{} -> none
Right syn -> void $ run dict syn
-- финальная статистика
bs <- readTVarIO blocks
rs' <- readTVarIO refs
notice $ green "validate done"
<+> "blocks:" <+> pretty (length [() | Right _ <- HM.elems bs])
<+> "deleted-blocks:" <+> pretty (length [() | Left () <- HM.elems bs])
<+> "refs:" <+> pretty (length [() | Right _ <- HM.elems rs'])
<+> "deleted-refs:" <+> pretty (length [() | Left () <- HM.elems rs'])
ncq3EnduranceTest :: forall m . MonadUnliftIO m => MakeDictM C m ()
ncq3EnduranceTest = do
entry $ bindMatch "test:ncq3:endurance:inner" $ nil_ $ \syn -> do
let (opts,args) = splitOpts [] syn
path <- orThrowUser "path not set" $ headMay [ x | StringLike x <- args ]
testEnduranceInner @C path
entry $ bindMatch "test:ncq3:endurance" $ nil_ $ \syn -> do
let dbl = \case
LitScientificVal x -> realToFrac x
LitIntVal x -> realToFrac x
_ -> 0.00
let int = \case
LitScientificVal x -> floor x
LitIntVal x -> fromIntegral x
_ -> 0
wSeed <- int <$> lookupValueDef (mkInt 1000) "w:seed"
wWindow <- int <$> lookupValueDef (mkInt 100000) "w:win"
wIdle <- dbl <$> lookupValueDef (mkDouble 200.00) "w:idle"
wIdleDef <- dbl <$> lookupValueDef (mkDouble 0.25) "w:idle:def"
wMaxBlk <- int <$> lookupValueDef (mkInt 65536) "w:maxblk"
wPutBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:putblk"
wGetBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:getblk"
wHasBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:hasblk"
wDelBlk <- dbl <$> lookupValueDef (mkDouble 3.00) "w:delblk"
wPutRef <- dbl <$> lookupValueDef (mkDouble 5.00) "w:putref"
wGetRef <- dbl <$> lookupValueDef (mkDouble 10.00) "w:getref"
wDelRef <- dbl <$> lookupValueDef (mkDouble 1.00) "w:delref"
wStorm <- dbl <$> lookupValueDef (mkDouble 0.80) "w:storm"
wStormMin <- dbl <$> lookupValueDef (mkDouble 1.00) "w:stormmin"
wStormMax <- dbl <$> lookupValueDef (mkDouble 60.00) "w:stormmax"
wCalm <- dbl <$> lookupValueDef (mkDouble 0.001) "w:calm"
wKill <- dbl <$> lookupValueDef (mkDouble 0.00) "w:kill"
wExit <- dbl <$> lookupValueDef (mkDouble 0.001) "w:exit"
wMerge <- dbl <$> lookupValueDef (mkDouble 0.005) "w:merge"
wCompact <- dbl <$> lookupValueDef (mkDouble 0.005) "w:compact"
wSweep <- dbl <$> lookupValueDef (mkDouble 0.005) "w:sweep"
wNum <- int <$> lookupValueDef (mkInt 10000) "w:num"
runTest \TestEnv{..} -> do
g <- liftIO $ MWC.createSystemRandom
let (opts,args) = splitOpts [] syn
let n = headDef wNum [ fromIntegral x | LitIntVal x <- args ]
rest <- newTVarIO n
blocks <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double () )
seed <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double () )
refs <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double HashRef )
killed <- newTVarIO 0
stopped <- newTVarIO 0
merged <- newTVarIO 0
sweeped <- newTVarIO 0
compacted <- newTVarIO 0
let getRandomBlock = liftIO $ getRandomFromPSQ g blocks
let getRandomSeedBlock = liftIO $ getRandomFromPSQ g seed
let getRandomRef = liftIO $ getRandomFromPSQ g refs
let d = makeDict do
entry $ bindMatch "ref-updated" $ nil_ \case
[HashLike h, HashLike r] -> do
w <- liftIO $ uniformRM (0,1.0) g
atomically do
modifyTVar refs (HPSQ.insert h w r)
size <- readTVar refs <&> HPSQ.size
when (size > wWindow ) do
modifyTVar refs HPSQ.deleteMin
_ -> none
entry $ bindMatch "block-written" $ nil_ \case
[HashLike h, _] -> do
w <- liftIO $ uniformRM (0,1.0) g
atomically do
modifyTVar blocks (HPSQ.insert h w ())
size <- readTVar blocks <&> HPSQ.size
when (size > wWindow ) do
modifyTVar blocks HPSQ.deleteMin
_ -> none
-- pI <- rublookupValue "endurance:idle"
--
debug $ red "pKill" <+> pretty wKill
let actions = [ (EnduranceIdle, wIdle)
, (EndurancePutBlk, wPutBlk)
, (EnduranceGetBlk, wGetBlk)
, (EnduranceHasSeedBlk, wHasBlk)
, (EnduranceHasBlk, wHasBlk)
, (EnduranceDelBlk, wDelBlk)
, (EndurancePutRef, wPutRef)
, (EnduranceGetRef, wGetRef)
, (EnduranceDelRef, wDelRef)
, (EnduranceStorm, wStorm)
, (EnduranceCalm, wCalm)
, (EnduranceMerge, wMerge)
, (EnduranceCompact, wCompact)
, (EnduranceSweep, wSweep)
, (EnduranceKill, wKill)
, (EnduranceExit, wExit)
]
let dist = buildCDF actions -- ← подготовили один раз
let inner = "test:ncq3:endurance:inner"
self <- liftIO getExecutablePath
let conf = proc self [ "debug on"
, "and"
, "test:ncq3:endurance:inner", testEnvDir
] & setStdin createPipe & setStdout createPipe
ncqWithStorage testEnvDir $ \sto -> do
replicateM_ wSeed do
n <- liftIO $ uniformRM (1, wMaxBlk) g
bs <- liftIO $ LBS.fromStrict <$> genRandomBS g n
putBlock (AnyStorage sto) bs >>= \case
Just h -> atomically $ modifyTVar seed (HPSQ.insert (HashRef h) 1.0 ())
Nothing -> err $ red "can't write seed block"
ncqWithStorage testEnvDir $ \sto -> do
seeds <- readTVarIO seed <&> HPSQ.toList
for_ seeds $ \(h,_,_) -> do
here <- hasBlock (AnyStorage sto) (coerce h)
unless (isJust here) do
err $ "missed seed block (1)" <+> pretty h
let handler e = err (viaShow e) >> debug "RECOVERING" >> pause @'Seconds 3
fix \recover -> handleAny (\e -> handler e >> recover) do
flip runContT pure do
p <- startProcess conf -- ContT $ withProcessWait conf
storms <- newTQueueIO
let inp = getStdin p
let logFile = testEnvDir </> "op.log"
pread <- ContT $ withAsync $ flip runContT pure $ callCC \stop -> do
let outp = getStdout p
fix \loop -> do
s <- liftIO (try @_ @IOException (IO.hGetLine outp)) >>= \case
Left e -> err (red "pread:" <+> viaShow e) >> stop ()
Right s -> pure s
liftIO do
appendFile logFile (s <> "\n")
void $ try @_ @SomeException (parseTop s & either (err.viaShow) (void . run d))
putStrLn s
loop
ContT $ withAsync $ forever do
join $ atomically (readTQueue storms)
ContT $ withAsync $ forever do
rest <- readTVarIO rest
b <- readTVarIO blocks <&> HPSQ.size
r <- readTVarIO refs <&> HPSQ.size
k <- readTVarIO killed
s <- readTVarIO stopped
c <- readTVarIO compacted
m <- readTVarIO merged
sw <- readTVarIO sweeped
notice $ green "status"
<+> "rest:" <+> pretty rest
<+> "b:" <+> pretty b
<+> "r:" <+> pretty r
<+> "m:" <+> pretty m
<+> "sw:" <+> pretty sw
<+> "c:" <+> pretty c
<+> "k:" <+> pretty k
<+> "s:" <+> pretty s
pause @'Seconds 1
liftIO $ hSetBuffering inp LineBuffering
pid <- liftIO (PT.getPid p) `orDie` "oopsie!"
info $ "spawned" <+> pretty inner <+> viaShow pid
let getNextState = sampleState g dist
let defaultIdle = realToFrac wIdleDef :: Timeout 'Seconds
idleTime <- newTVarIO defaultIdle
trelaxTill <- newTVarIO 0
flip fix EnduranceIdle \loop -> \case
EnduranceIdle -> do
readTVarIO idleTime >>= pause
r <- readTVarIO rest
if r <= 0 then do
loop EnduranceStop
else do
getNextState >>= loop
EndurancePutBlk -> do
bsize <- liftIO $ uniformRM (1, wMaxBlk) g
toConsole inp ("write-random-block" <+> viaShow bsize)
atomically $ modifyTVar rest pred
getNextState >>= loop
EnduranceDelBlk -> do
blk <- getRandomBlock
for_ blk $ \h -> do
toConsole inp ("del-block" <+> pretty h)
getNextState >>= loop
EnduranceHasBlk -> do
blk <- getRandomBlock
for_ blk $ \h -> do
toConsole inp ("has-block" <+> pretty h)
getNextState >>= loop
EnduranceHasSeedBlk -> do
blk <- getRandomSeedBlock
for_ blk $ \h -> do
toConsole inp ("has-seed-block" <+> pretty h)
getNextState >>= loop
EnduranceGetBlk -> do
blk <- getRandomBlock
for_ blk $ \h -> do
toConsole inp ("get-block" <+> pretty h)
getNextState >>= loop
EndurancePutRef -> do
href <- liftIO (genRandomBS g 32) <&> HashRef . coerce
blk <- getRandomBlock
for_ blk $ \val -> do
toConsole inp ("set-ref" <+> pretty href <+> pretty val)
atomically $ modifyTVar rest pred
getNextState >>= loop
EnduranceGetRef -> do
e <- getRandomRef
for_ e $ \h ->
toConsole inp ("get-ref" <+> pretty h)
getNextState >>= loop
EnduranceDelRef -> do
e <- getRandomRef
for_ e $ \h ->
toConsole inp ("del-ref" <+> pretty h)
getNextState >>= loop
EnduranceMerge -> do
toConsole inp "merge"
atomically $ modifyTVar merged succ
getNextState >>= loop
EnduranceCompact -> do
toConsole inp "compact"
atomically $ modifyTVar compacted succ
getNextState >>= loop
EnduranceSweep -> do
toConsole inp "sweep"
atomically $ modifyTVar sweeped succ
getNextState >>= loop
EnduranceExit -> do
toConsole inp "exit"
debug $ yellow "inner process stopped?"
liftIO $ race (pause @'Seconds 1) (waitExitCode p) >>= \case
Right{} -> none
Left{} -> do
debug $ red "force inner process to stop"
stopProcess p
atomically $ modifyTVar stopped succ
lift recover
EnduranceKill -> do
debug $ red "KILL" <+> viaShow pid
cancel pread
hFlush inp
liftIO $ appendFile logFile "; killed"
pause @'Seconds 0.25
void $ runProcess (proc "kill" ["-9", show pid])
notice $ red "Killed" <+> viaShow pid
atomically $ modifyTVar killed succ
pause @'Seconds 0.5
lift recover
EnduranceStop -> do
liftIO $ hClose inp
wait pread
stopProcess p
notice $ green "done"
notice $ "validate" <+> pretty logFile
liftIO $ validateTestResult logFile
EnduranceCalm -> do
n <- liftIO $ uniformRM (0.5,10.00) g
debug $ "CALM" <+> pretty n
pause @'Seconds (realToFrac n)
getNextState >>= loop
EnduranceStorm -> do
now <- getTimeCoarse
relaxTill <- readTVarIO trelaxTill
itn <- readTVarIO idleTime
if | itn < defaultIdle -> do
loop EnduranceIdle
| now < relaxTill -> do
debug $ yellow "storm on cooldown"
loop EnduranceIdle
| otherwise -> do
t0 <- liftIO $ uniformRM (wStormMin,wStormMax) g
debug $ red "FIRE IN DA HOLE!" <+> pretty t0
atomically $ writeTQueue storms do
atomically $ writeTVar idleTime 0
pause @'Seconds (realToFrac t0)
atomically $ writeTVar idleTime defaultIdle
t1 <- getTimeCoarse
-- add 10 sec cooldown
atomically $ writeTVar trelaxTill (t1 + ceiling 10e9)
getNextState >>= loop
testEnduranceInner :: forall c m . (MonadUnliftIO m, IsContext c, Exception (BadFormException c))
=> FilePath
-> m ()
testEnduranceInner path = flip runContT pure $ callCC \exit -> do
g <- liftIO $ MWC.createSystemRandom
debug $ red "storage path" <+> pretty path
hSetBuffering stdout LineBuffering
sto <- ContT $ ncqWithStorage path
forever $ callCC \again -> do
s' <- liftIO (try @_ @IOException getLine)
<&> fromRight "exit"
<&> parseTop >>= \case
Left e -> err (viaShow e) >> again ()
Right s -> pure (fmap (fixContext @C @c) s)
lift (try @_ @SomeException (run @c (dict g sto) s')) >>= \case
Left e -> err (viaShow e)
Right (StringLike "done") -> do
toConsole stderr $ "INNER PROCESS TO EXIT"
exit ()
Right _ -> none
where
dict g sto@NCQStorage{..} = makeDict @c @m do
entry $ bindMatch "exit" $ const do
pure $ mkSym "done"
entry $ bindMatch "write-random-block" $ nil_ \case
[ LitIntVal n ] -> do
s <- liftIO $ genRandomBS g (fromIntegral n)
h <- putBlock (AnyStorage sto) (LBS.fromStrict s) >>= orThrowUser "block-not-written"
liftIO $ print $ "block-written" <+> pretty h <+> pretty (BS.length s)
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "has-block" $ nil_ \case
[ HashLike h ] -> do
s <- hasBlock (AnyStorage sto) (coerce h)
liftIO $ print $ "has-block-result" <+> pretty h <+> pretty s
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "has-seed-block" $ nil_ \case
[ HashLike h ] -> do
s <- hasBlock (AnyStorage sto) (coerce h)
liftIO $ print $ "has-seed-block-result" <+> pretty h <+> pretty s
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "get-block" $ nil_ \case
[ HashLike h ] -> do
s <- getBlock (AnyStorage sto) (coerce h)
let hx = fmap (hashObject @HbSync) s
liftIO $ print $ "get-block-result" <+> pretty h <+> pretty hx
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "del-block" $ nil_ \case
[ HashLike h ] -> do
delBlock (AnyStorage sto) (coerce h)
liftIO $ print $ "block-deleted" <+> pretty h
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "set-ref" $ nil_ \case
[ HashLike h, HashLike hdest ] -> lift do
updateRef (AnyStorage sto) (RefAlias2 mempty h) (coerce hdest)
liftIO $ print $ "ref-updated" <+> pretty h <+> pretty hdest
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "get-ref" $ nil_ \case
[ HashLike h ] -> lift do
what <- getRef (AnyStorage sto) (RefAlias2 mempty h)
liftIO $ print $ "get-ref-result" <+> pretty h <+> pretty what
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "del-ref" $ nil_ \case
[ HashLike h ] -> lift do
delRef (AnyStorage sto) (RefAlias2 mempty h)
liftIO $ print $ "ref-deleted" <+> pretty h
e -> throwIO (BadFormException @c (mkList e))
entry $ bindMatch "merge" $ nil_ $ const do
ncqSetFlag ncqMergeReq
liftIO $ print $ "merge"
entry $ bindMatch "compact" $ nil_ $ const do
ncqSetFlag ncqCompactReq
liftIO $ print $ "compact"
entry $ bindMatch "sweep" $ nil_ $ const do
ncqSetFlag ncqSweepReq
liftIO $ print $ "sweep"