mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
55e96e79ea
commit
07c20a78eb
|
@ -45,14 +45,15 @@ common shared-properties
|
|||
, ImportQualifiedPost
|
||||
, LambdaCase
|
||||
, MultiParamTypeClasses
|
||||
, NumericUnderscores
|
||||
, OverloadedStrings
|
||||
, QuasiQuotes
|
||||
, ScopedTypeVariables
|
||||
, StandaloneDeriving
|
||||
, TupleSections
|
||||
, TypeApplications
|
||||
, TypeOperators
|
||||
, TypeFamilies
|
||||
, TypeOperators
|
||||
|
||||
|
||||
library
|
||||
|
@ -73,6 +74,7 @@ library
|
|||
, directory
|
||||
, filepath
|
||||
, filepattern
|
||||
, hashable
|
||||
, memory
|
||||
, microlens-platform
|
||||
, mmap
|
||||
|
|
|
@ -54,6 +54,9 @@ newtype DataFile a = DataFile a
|
|||
|
||||
newtype IndexFile a = IndexFile a
|
||||
|
||||
newtype StateFile = StateFile FileKey
|
||||
deriving newtype (IsString,Eq,Ord,Pretty)
|
||||
|
||||
class ToFileName a where
|
||||
toFileName :: a -> FilePath
|
||||
|
||||
|
@ -63,7 +66,6 @@ instance ToFileName FileKey where
|
|||
instance ToFileName (DataFile FileKey) where
|
||||
toFileName (DataFile fk) = dropExtension (toFileName fk) `addExtension` ".data"
|
||||
|
||||
|
||||
instance ToFileName (IndexFile FileKey) where
|
||||
toFileName (IndexFile fk) = dropExtension (toFileName fk) `addExtension` ".cq"
|
||||
|
||||
|
@ -73,6 +75,9 @@ instance ToFileName (DataFile FilePath) where
|
|||
instance ToFileName (IndexFile FilePath) where
|
||||
toFileName (IndexFile fp) = dropExtension fp `addExtension` ".cq"
|
||||
|
||||
instance ToFileName StateFile where
|
||||
toFileName (StateFile fk) = toFileName fk
|
||||
|
||||
newtype FilePrio = FilePrio (Down TimeSpec)
|
||||
deriving newtype (Eq,Ord)
|
||||
deriving stock (Generic,Show)
|
||||
|
@ -151,6 +156,14 @@ ncqTombPrefix = "T;;\x00"
|
|||
ncqMetaPrefix :: ByteString
|
||||
ncqMetaPrefix = "M;;\x00"
|
||||
|
||||
ncqIsMeta :: ByteString -> Maybe NCQSectionType
|
||||
ncqIsMeta bs = headMay [ t | (t,x) <- meta, BS.isPrefixOf x bs ]
|
||||
where meta = [ (R, ncqRefPrefix)
|
||||
, (B, ncqBlockPrefix)
|
||||
, (T, ncqTombPrefix)
|
||||
, (M, ncqMetaPrefix)
|
||||
]
|
||||
|
||||
ncqMakeSectionBS :: Maybe NCQSectionType
|
||||
-> HashRef
|
||||
-> ByteString
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,35 @@
|
|||
[dmz@serenity:~/w/hbs2]$ test-ncq test:root temp and debug off and test:ncq2:concurrent1 32 10000
|
||||
baseline 10000 3.852 1552.57 403.00
|
||||
1 3.74 1566.85 418.61
|
||||
2 2.22 1570.29 705.12
|
||||
3 1.82 1563.56 856.75
|
||||
4 1.69 1571.03 927.41
|
||||
5 1.68 1567.80 927.70
|
||||
6 1.63 1562.91 953.00
|
||||
7 1.64 1559.46 949.74
|
||||
8 1.62 1556.07 958.77
|
||||
9 1.59 1562.96 980.53
|
||||
10 1.62 1565.51 965.18
|
||||
11 1.59 1557.23 973.27
|
||||
12 1.59 1564.51 980.27
|
||||
13 1.62 1563.44 959.17
|
||||
14 1.61 1566.02 967.28
|
||||
15 1.61 1566.78 967.15
|
||||
16 1.65 1572.14 951.66
|
||||
17 1.63 1558.96 951.75
|
||||
18 1.63 1561.53 952.73
|
||||
19 1.63 1557.93 951.70
|
||||
20 1.60 1552.89 969.95
|
||||
21 1.62 1562.02 961.25
|
||||
22 1.61 1567.37 968.11
|
||||
23 1.60 1565.27 972.22
|
||||
24 1.62 1568.21 962.68
|
||||
25 1.60 1556.52 967.39
|
||||
26 1.62 1555.00 958.10
|
||||
27 1.64 1573.31 953.53
|
||||
28 1.63 1557.48 952.59
|
||||
29 1.66 1560.38 938.29
|
||||
30 1.62 1561.35 960.83
|
||||
31 1.63 1563.76 954.68
|
||||
32 1.61 1562.60 966.96
|
||||
|
|
@ -37,6 +37,9 @@ import Data.Config.Suckless.Script.File as SF
|
|||
|
||||
import DBPipe.SQLite hiding (field)
|
||||
|
||||
import System.Random.MWC as MWC
|
||||
|
||||
import System.IO.Temp as Temp
|
||||
import Data.Bits
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString qualified as BS
|
||||
|
@ -69,12 +72,14 @@ import System.IO.MMap
|
|||
import System.IO qualified as IO
|
||||
import System.Exit (exitSuccess, exitFailure)
|
||||
import System.Random
|
||||
import System.Random.Stateful
|
||||
import Safe
|
||||
import Lens.Micro.Platform
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Data.Hashable
|
||||
|
||||
import UnliftIO
|
||||
import UnliftIO.Async
|
||||
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
|
||||
|
@ -450,6 +455,82 @@ main = do
|
|||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "sqlite:nwrite" $ nil_ \case
|
||||
[ LitIntVal tn', LitIntVal n ] -> lift do
|
||||
|
||||
let tn = fromIntegral tn'
|
||||
let num = fromIntegral n
|
||||
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
for_ [1..tn] $ \tnn -> flip runContT pure do
|
||||
|
||||
let fnv = num `quot` tnn
|
||||
|
||||
mkdir "temp"
|
||||
|
||||
let tmp = "temp"
|
||||
|
||||
-- tmp <- ContT $ Temp.withTempDirectory "temp" "nwrite"
|
||||
|
||||
dbf <- liftIO $ Temp.emptyTempFile tmp "nwrite-.db"
|
||||
|
||||
db <- newDBPipeEnv dbPipeOptsDef dbf
|
||||
|
||||
pipe <- ContT $ withAsync (runPipe db)
|
||||
|
||||
tw <- newTVarIO 0
|
||||
|
||||
withDB db do
|
||||
ddl "create table if not exists block (hash blob not null primary key, value blob)"
|
||||
commitAll
|
||||
|
||||
withDB db do
|
||||
ddl [qc|
|
||||
pragma journal_mode=WAL;
|
||||
pragma synchronous=normal;
|
||||
|]
|
||||
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
ss <- replicateM num $ liftIO $ MWC.uniformRM (64*1024, 256*1024) g
|
||||
|
||||
liftIO $ pooledForConcurrentlyN_ tnn ss $ \size -> do
|
||||
lbs <- uniformByteStringM size g <&> LBS.fromStrict
|
||||
|
||||
let ha = hashObject @HbSync lbs
|
||||
|
||||
let sql = [qc|insert into block (hash, value) values(?,?) on conflict (hash) do nothing |]
|
||||
|
||||
withDB db do
|
||||
insert sql (coerce @_ @ByteString ha, lbs)
|
||||
atomically $ modifyTVar tw (+ (32 + size))
|
||||
|
||||
withDB db do
|
||||
commitAll
|
||||
|
||||
w <- readTVarIO tw
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9
|
||||
let tsec = realToFrac @_ @(Fixed E2) t
|
||||
|
||||
let total = realToFrac w
|
||||
|
||||
let speed = if t > 0 then total / t else 0
|
||||
let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2)
|
||||
let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2)
|
||||
|
||||
notice $ pretty tnn <+> pretty (tsec) <+> pretty totMegs <+> pretty speedMbs
|
||||
|
||||
none
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "sqlite:merkle:write" $ nil_ \case
|
||||
[ StringLike dbf, StringLike fname ] -> lift do
|
||||
db <- newDBPipeEnv dbPipeOptsDef dbf
|
||||
|
|
|
@ -10,6 +10,7 @@ import HBS2.Prelude.Plated
|
|||
import HBS2.OrDie
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Misc.PrettyStuff
|
||||
import HBS2.Clock
|
||||
import HBS2.Merkle
|
||||
import HBS2.Polling
|
||||
|
@ -20,6 +21,7 @@ import HBS2.Storage.Operations.ByteString
|
|||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import HBS2.Data.Log.Structured.SD
|
||||
import HBS2.Storage.NCQ
|
||||
import HBS2.Storage.NCQ2 as N2
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
|
@ -32,6 +34,9 @@ import Data.Config.Suckless.System
|
|||
|
||||
import DBPipe.SQLite hiding (field)
|
||||
|
||||
import System.Posix.Files qualified as PFS
|
||||
import Numeric (showHex)
|
||||
import Data.Ord (Down(..))
|
||||
import Data.Char
|
||||
import Data.Bits
|
||||
import Data.ByteString (ByteString)
|
||||
|
@ -590,48 +595,232 @@ testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do
|
|||
rm ncqDir
|
||||
|
||||
|
||||
testNCQ2Simple1 :: MonadUnliftIO m
|
||||
=> TestEnv
|
||||
testNCQ2Sweep1 :: forall c m . (MonadUnliftIO m, IsContext c)
|
||||
=> [Syntax c]
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQ2Simple1 TestEnv{..} = do
|
||||
debug "testNCQ2Simple1"
|
||||
testNCQ2Sweep1 syn TestEnv{..} = do
|
||||
debug $ "testNCQ2Sweep1" <+> pretty syn
|
||||
let tmp = testEnvDir
|
||||
let ncqDir = tmp
|
||||
q <- newTQueueIO
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
bz <- replicateM 100000 $ liftIO do
|
||||
let (opts, argz) = splitOpts [] syn
|
||||
|
||||
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
|
||||
bz <- replicateM n $ liftIO do
|
||||
n <- (`mod` (256*1024)) <$> uniformM @Int g
|
||||
uniformByteStringM n g
|
||||
|
||||
notice $ "generate" <+> pretty n <+> "blocks"
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
for bz $ \z -> do
|
||||
h <- ncqPutBS sto (Just B) Nothing z
|
||||
atomically $ writeTQueue q h
|
||||
found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize
|
||||
assertBool (show $ "found-immediate" <+> pretty h) (found > 0)
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
notice "perform merge"
|
||||
notice $ red "PERFORM MERGE"
|
||||
ncqMergeFull sto
|
||||
|
||||
notice $ "full sweep unused states"
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
ncqSweepStates sto
|
||||
ncqSweepFossils sto
|
||||
|
||||
notice $ "lookup" <+> pretty n <+> "blocks"
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
hashes <- atomically (STM.flushTQueue q)
|
||||
for_ hashes $ \ha -> do
|
||||
found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize
|
||||
assertBool (show $ "found" <+> pretty ha) (found > 0)
|
||||
-- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found)
|
||||
|
||||
|
||||
|
||||
testNCQ2Sweep2 :: forall c m . (MonadUnliftIO m, IsContext c)
|
||||
=> [Syntax c]
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQ2Sweep2 syn TestEnv{..} = do
|
||||
debug $ "testNCQ2Sweep2" <+> pretty syn
|
||||
let tmp = testEnvDir
|
||||
let ncqDir = tmp
|
||||
q <- newTQueueIO
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
let (opts, argz) = splitOpts [] syn
|
||||
|
||||
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
|
||||
notice $ "generate" <+> pretty n <+> "blocks"
|
||||
|
||||
bz <- replicateM n $ liftIO do
|
||||
n <- (`mod` (256*1024)) <$> uniformM @Int g
|
||||
uniformByteStringM n g
|
||||
|
||||
-- race (pause @'Seconds 260) do
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
for_ bz $ \z -> do
|
||||
h <- ncqPutBS sto (Just B) Nothing z
|
||||
atomically $ writeTQueue q h
|
||||
|
||||
notice "wait some time to see merge+sweep"
|
||||
pause @'Seconds 240
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
hashes <- atomically (STM.flushTQueue q)
|
||||
for_ hashes $ \ha -> do
|
||||
found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize
|
||||
assertBool (show $ "found" <+> pretty ha) (found > 0)
|
||||
|
||||
|
||||
|
||||
testNCQ2Kill1 :: forall c m . (MonadUnliftIO m, IsContext c)
|
||||
=> [Syntax c]
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQ2Kill1 syn TestEnv{..} = flip runContT pure do
|
||||
debug $ "testNCQ2Kill1" <+> pretty syn
|
||||
let tmp = testEnvDir
|
||||
let ncqDir = tmp
|
||||
q <- newTQueueIO
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
let (opts, argz) = splitOpts [] syn
|
||||
|
||||
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
|
||||
notice $ "generate" <+> pretty n <+> "blocks"
|
||||
|
||||
bz <- replicateM n $ liftIO do
|
||||
n <- (`mod` (256*1024)) <$> uniformM @Int g
|
||||
uniformByteStringM n g
|
||||
|
||||
-- race (pause @'Seconds 260) do
|
||||
|
||||
wIdle <- newEmptyTMVarIO
|
||||
|
||||
ncq1 <- ContT $ withAsync $ ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
ncqSetOnRunWriteIdle sto (atomically (putTMVar wIdle ()))
|
||||
for_ bz $ \z -> do
|
||||
h <- ncqPutBS sto (Just B) Nothing z
|
||||
atomically $ writeTQueue q h
|
||||
pause @'Seconds 300
|
||||
|
||||
notice $ red "WAIT FUCKING IDLE!"
|
||||
|
||||
atomically $ takeTMVar wIdle
|
||||
|
||||
notice $ red "GOT FUCKING IDLE!" <+> "lets see what happen now"
|
||||
|
||||
cancel ncq1
|
||||
|
||||
liftIO $ ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
hashes <- atomically (STM.flushTQueue q)
|
||||
for_ hashes $ \ha -> do
|
||||
found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize
|
||||
assertBool (show $ "found" <+> pretty ha) (found > 0)
|
||||
|
||||
|
||||
testNCQ2Simple1 :: forall c m . (MonadUnliftIO m, IsContext c)
|
||||
=> [Syntax c]
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQ2Simple1 syn TestEnv{..} = do
|
||||
debug $ "testNCQ2Simple1" <+> pretty syn
|
||||
let tmp = testEnvDir
|
||||
let ncqDir = tmp
|
||||
q <- newTQueueIO
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
let (opts, argz) = splitOpts [] syn
|
||||
|
||||
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
let l = headDef 5 $ drop 1 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
let s = headDef (256*1024) $ drop 2 [ fromIntegral (1024 * x) | LitIntVal x <- argz ]
|
||||
|
||||
notice $ "insert" <+> pretty n <+> "random blocks of size" <+> pretty s
|
||||
|
||||
thashes <- newTQueueIO
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
replicateM_ n do
|
||||
n <- (`mod` s) <$> uniformM @Int g
|
||||
z <- uniformByteStringM n g
|
||||
h <- ncqPutBS sto (Just B) Nothing z
|
||||
found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize
|
||||
atomically $ writeTQueue q h
|
||||
assertBool (show $ "found-immediate" <+> pretty h) (found > 0)
|
||||
atomically $ writeTQueue thashes h
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
hs <- atomically $ STM.flushTQueue thashes
|
||||
|
||||
flip fix (t0, List.length hs, hs) $ \loop (tp, num, xs) -> case xs of
|
||||
[] -> none
|
||||
(ha:rest) -> do
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
t2 <- if realToFrac (toNanoSecs (t1 - t0)) / 1e9 < 1.00 then do
|
||||
pure tp
|
||||
else do
|
||||
notice $ green "lookup" <+> pretty num
|
||||
pure t1
|
||||
|
||||
found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize
|
||||
assertBool (show $ "found" <+> pretty ha) (found > 0)
|
||||
unless (List.null hs) $ loop (t1, pred num, rest)
|
||||
|
||||
hashes <- atomically (STM.flushTQueue q)
|
||||
|
||||
notice $ "merge data"
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
notice "perform merge"
|
||||
ncqMergeFull sto
|
||||
ncqSweepStates sto
|
||||
ncqSweepFossils sto
|
||||
|
||||
notice $ "full sweep unused states"
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
ncqSweepStates sto
|
||||
ncqSweepFossils sto
|
||||
|
||||
notice $ "lookup" <+> pretty n <+> "blocks"
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
|
||||
replicateM_ l do
|
||||
|
||||
-- performMajorGC
|
||||
|
||||
(t1,_) <- timeItT do
|
||||
|
||||
for_ hashes $ \ha -> do
|
||||
found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize
|
||||
assertBool (show $ "found" <+> pretty ha) (found > 0)
|
||||
-- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found)
|
||||
|
||||
notice $ pretty (sec6 t1) <+> "lookup" <+> pretty n <+> "blocks"
|
||||
|
||||
|
||||
genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString
|
||||
genRandomBS g n = do
|
||||
n <- (`mod` (64*1024)) <$> uniformM @Int g
|
||||
uniformByteStringM n g
|
||||
|
||||
sec6 :: RealFrac a => a -> Fixed E6
|
||||
|
@ -832,7 +1021,7 @@ testNCQ2Repair1 TestEnv{..} = do
|
|||
assertBool (show $ "found-immediate" <+> pretty h) (found > 0)
|
||||
|
||||
ncqWithStorage ncqDir $ \sto -> liftIO do
|
||||
written <- N2.ncqListTrackedFiles sto
|
||||
written <- N2.ncqListDirFossils sto
|
||||
debug $ "TRACKED" <+> vcat (fmap pretty written)
|
||||
toDestroy <- pure (headMay written) `orDie` "no file written"
|
||||
|
||||
|
@ -856,6 +1045,44 @@ testNCQ2Repair1 TestEnv{..} = do
|
|||
-- assertBool (show $ "found-immediate" <+> pretty ha) (found > 0)
|
||||
-- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found)
|
||||
|
||||
|
||||
testWriteNThreads :: forall g m . (MonadUnliftIO m)
|
||||
=> FilePath
|
||||
-> Int
|
||||
-> Int
|
||||
-> m ()
|
||||
testWriteNThreads ncqDir tnn n = do
|
||||
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
wtf <- liftIO getPOSIXTime <&> show . round
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
w <- ncqWithStorage (ncqDir </> wtf <> show tnn) $ \sto -> do
|
||||
ss <- liftIO $ replicateM n $ MWC.uniformRM (64*1024, 256*1024) g
|
||||
|
||||
pooledForConcurrentlyN_ tnn ss $ \len -> do
|
||||
tbs <- liftIO $ genRandomBS g len
|
||||
ncqPutBS sto (Just B) Nothing tbs
|
||||
-- atomically $ modifyTVar' tss (+ len)
|
||||
|
||||
-- 32 bytes per key, 4 per len
|
||||
pure $ (List.length ss * 36) + sum ss
|
||||
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9
|
||||
|
||||
let total = realToFrac w
|
||||
|
||||
let speed = if t > 0 then total / t else 0
|
||||
let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2)
|
||||
let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2)
|
||||
|
||||
notice $ pretty tnn <+> pretty (sec2 t) <+> pretty totMegs <+> pretty speedMbs
|
||||
|
||||
|
||||
testNCQ2Concurrent1 :: MonadUnliftIO m
|
||||
=> Bool
|
||||
-> Int
|
||||
|
@ -867,7 +1094,7 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do
|
|||
|
||||
let tmp = testEnvDir
|
||||
let inputDir = tmp </> "input"
|
||||
let ncqDir = tmp </> "ncq-test-data"
|
||||
let ncqDir = tmp </> "ncq"
|
||||
|
||||
debug "preparing"
|
||||
|
||||
|
@ -875,44 +1102,45 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do
|
|||
|
||||
debug $ pretty inputDir
|
||||
|
||||
filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do
|
||||
size <- randomRIO (64*1024, 256*1024)
|
||||
w <- liftIO (randomIO :: IO Word8)
|
||||
let tbs = BS.replicate size w -- replicateM size w <&> BS.pack
|
||||
let ha = hashObject @HbSync tbs -- & show . pretty
|
||||
let fn = inputDir </> show (pretty ha)
|
||||
liftIO $ BS.writeFile fn tbs
|
||||
pure (fn, ha, BS.length tbs)
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
|
||||
debug "done"
|
||||
log <- liftIO $ Temp.emptyTempFile inputDir "log-.bin"
|
||||
|
||||
let fnv = V.fromList filez
|
||||
let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac
|
||||
(t0,size) <- timeItT do
|
||||
liftIO $ withFile log IO.AppendMode $ \hlog -> do
|
||||
replicateM_ n do
|
||||
size <- MWC.uniformRM (64*1024, 256*1024) g
|
||||
tbs <- genRandomBS g size
|
||||
let ha = hashObject @HbSync tbs
|
||||
let ss = coerce ha <> tbs
|
||||
let bssize = N.bytestring32 (fromIntegral $ BS.length ss)
|
||||
BS.hPut hlog (bssize <> ss)
|
||||
getFileSize log
|
||||
|
||||
notice "NO SHIT"
|
||||
|
||||
-- setLoggingOff @DEBUG
|
||||
let mbps = realToFrac size / (1024**2)
|
||||
let v0 = mbps / t0
|
||||
notice $ "baseline" <+> pretty n
|
||||
<+> pretty (sec3 t0)
|
||||
<+> pretty (realToFrac @_ @(Fixed E2) mbps)
|
||||
<+> pretty (sec2 v0)
|
||||
|
||||
for_ [1..tn] $ \tnn -> do
|
||||
|
||||
ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } )
|
||||
w <- ContT $ withAsync (ncqStorageRun2 ncq1)
|
||||
for_ [1..tn] $ \tnn -> liftIO do
|
||||
testWriteNThreads ncqDir tnn n
|
||||
|
||||
(t,_) <- timeItT $ liftIO do
|
||||
|
||||
pooledForConcurrentlyN_ tnn fnv $ \(n,ha,_) -> do
|
||||
co <- BS.readFile n
|
||||
ncqPutBS ncq1 (Just B) Nothing co
|
||||
|
||||
ncqStorageStop2 ncq1
|
||||
performMajorGC
|
||||
wait w
|
||||
rm ncqDir
|
||||
|
||||
let tt = realToFrac @_ @(Fixed E2) t
|
||||
let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2)
|
||||
notice $ pretty tnn <+> pretty tt <+> pretty speed
|
||||
testNCQ2Concurrent2 :: MonadUnliftIO m
|
||||
=> Int -- ^ threads
|
||||
-> Int -- ^ times
|
||||
-> Int -- ^ blocks
|
||||
-> TestEnv
|
||||
-> m ()
|
||||
|
||||
testNCQ2Concurrent2 tn times n TestEnv{..} = flip runContT pure do
|
||||
replicateM_ times do
|
||||
lift $ testWriteNThreads testEnvDir tn n
|
||||
|
||||
testNCQ2ConcurrentWriteSimple1 :: MonadUnliftIO m
|
||||
=> Int
|
||||
|
@ -1061,6 +1289,13 @@ main = do
|
|||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq:concurrent2" $ nil_ $ \case
|
||||
[ LitIntVal tn, LitIntVal times, LitIntVal n ] -> do
|
||||
debug $ "ncq:concurrent2" <+> pretty tn <+> pretty n
|
||||
runTest $ testNCQ2Concurrent2 (fromIntegral tn) (fromIntegral times) (fromIntegral n)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case
|
||||
[ LitIntVal tn, LitIntVal n ] -> do
|
||||
debug $ "ncq:concurrent1" <+> pretty tn <+> pretty n
|
||||
|
@ -1109,8 +1344,17 @@ main = do
|
|||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do
|
||||
runTest testNCQ2Simple1
|
||||
entry $ bindMatch "test:ncq2:simple1" $ nil_ $ \e -> do
|
||||
runTest (testNCQ2Simple1 e)
|
||||
|
||||
entry $ bindMatch "test:ncq2:sweep1" $ nil_ $ \e -> do
|
||||
runTest (testNCQ2Sweep1 e)
|
||||
|
||||
entry $ bindMatch "test:ncq2:kill1" $ nil_ $ \e -> do
|
||||
runTest (testNCQ2Kill1 e)
|
||||
|
||||
entry $ bindMatch "test:ncq2:sweep2" $ nil_ $ \e -> do
|
||||
runTest (testNCQ2Sweep2 e)
|
||||
|
||||
entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do
|
||||
runTest testNCQ2Repair1
|
||||
|
@ -1121,6 +1365,147 @@ main = do
|
|||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq2:wtf1" $ nil_ $ const do
|
||||
runTest $ \TestEnv{..} -> do
|
||||
let dir = testEnvDir
|
||||
r1 <- ncqWithStorage dir $ \sto -> do
|
||||
h <- ncqPutBS sto (Just B) Nothing "JOPAKITAPECHENTRESKI"
|
||||
loc <- ncqLocate2 sto h `orDie` "not found shit"
|
||||
let re@(k,r) = ncqEntryUnwrap sto $ ncqGetEntryBS sto loc
|
||||
notice $ pretty "MEM" <+> pretty (ncqEntrySize loc) <+> pretty (coerce @_ @HashRef k) <+> viaShow r
|
||||
pure re
|
||||
|
||||
ncqWithStorage dir $ \sto -> do
|
||||
let (k,v) = r1
|
||||
loc <- ncqLocate2 sto (coerce k) `orDie` "not found shit"
|
||||
let s0 = ncqGetEntryBS sto loc
|
||||
let (k1,r1) = ncqEntryUnwrap sto s0
|
||||
notice $ "FOSSIL:" <+> pretty (ncqEntrySize loc) <+> pretty (coerce @_ @HashRef k1) <+> viaShow r1
|
||||
assertBool "written-same" (r1 == v && k == k1)
|
||||
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq2:scan-index" $ nil_ \case
|
||||
[ StringLike dir, HashLike item ] -> do
|
||||
notice $ "SCAN DIR" <+> pretty dir <+> pretty item
|
||||
|
||||
ncqWithStorage dir $ \sto@NCQStorage2{..} -> do
|
||||
|
||||
-- let d = N2.ncqGetFileName sto ""
|
||||
|
||||
-- files <- dirFiles d <&> List.filter (List.isSuffixOf ".cq")
|
||||
|
||||
-- files <- N2.ncqListTrackedFiles sto
|
||||
|
||||
tracked <- N2.ncqListTrackedFiles sto
|
||||
|
||||
for_ tracked $ \(k,_,_) -> do
|
||||
|
||||
let indexFile = N2.ncqGetFileName sto (toFileName (IndexFile k))
|
||||
|
||||
(idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile)
|
||||
>>= orThrow (NCQStorageCantMapFile indexFile)
|
||||
|
||||
|
||||
notice $ "scan file" <+> pretty indexFile
|
||||
|
||||
stat <- liftIO $ PFS.getFileStatus indexFile
|
||||
-- -- FIXME: maybe-creation-time-actually
|
||||
let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat
|
||||
|
||||
nwayHashScanAll idxNway idxBs $ \_ k v -> do
|
||||
when (coerce k == item ) do
|
||||
|
||||
let off = fromIntegral $ N.word64 (BS.take 8 v)
|
||||
let size = fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 v))
|
||||
|
||||
notice $ yellow "found"
|
||||
<+> pretty (fromString @FileKey indexFile)
|
||||
<+> pretty (fromIntegral @_ @Word64 ts)
|
||||
<+> pretty (off,size,item)
|
||||
<+> pretty (foldMap (`showHex` "") (BS.unpack v) )
|
||||
|
||||
-- datBs <- liftIO $ mmapFileByteString dataFile Nothing
|
||||
|
||||
|
||||
none
|
||||
|
||||
e -> throwIO (BadFormException (mkList e))
|
||||
|
||||
entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do
|
||||
|
||||
runTest $ \TestEnv{..} -> do
|
||||
g <- liftIO MWC.createSystemRandom
|
||||
let dir = testEnvDir
|
||||
|
||||
let (_, argz) = splitOpts [] syn
|
||||
let n = headDef 10000 [ fromIntegral x | LitIntVal x <- argz ]
|
||||
|
||||
thashes <- newTVarIO mempty
|
||||
|
||||
ncqWithStorage dir $ \sto@NCQStorage2{..} -> do
|
||||
|
||||
notice $ "write+immediate delete" <+> pretty n <+> "records"
|
||||
|
||||
hashes <- replicateM n do
|
||||
|
||||
h <- ncqPutBS sto (Just B) Nothing =<< genRandomBS g (64*1024)
|
||||
ncqDelEntry sto h
|
||||
|
||||
t <- (ncqLocate2 sto h <&> fmap (N2.ncqIsTomb sto))
|
||||
>>= orThrowUser ("missed" <+> pretty h)
|
||||
|
||||
assertBool "tomb/1" t
|
||||
|
||||
pure h
|
||||
|
||||
|
||||
pause @'Seconds 5
|
||||
|
||||
atomically $ writeTVar thashes (HS.fromList hashes)
|
||||
|
||||
flip runContT pure $ callCC \exit -> do
|
||||
|
||||
for_ hashes $ \h -> do
|
||||
l <- lift (ncqLocate2 sto h)
|
||||
>>= orThrowUser ("missed" <+> pretty h)
|
||||
|
||||
unless (N2.ncqIsTomb sto l) do
|
||||
let (k,e') = ncqEntryUnwrap sto (ncqGetEntryBS sto l)
|
||||
|
||||
e <- orThrowUser "bad entry" e'
|
||||
err $ pretty l
|
||||
err $ "WTF?" <+> pretty (coerce @_ @HashRef k) <+> pretty h <+> viaShow (fst e)
|
||||
lfs <- readTVarIO ncqTrackedFiles
|
||||
|
||||
for_ lfs $ \TrackedFile{..} -> do
|
||||
npe <- readTVarIO tfCached <&> isNotPending
|
||||
err $ "FILE" <+> pretty npe <+> pretty tfKey
|
||||
|
||||
exit ()
|
||||
|
||||
ncqWithStorage dir $ \sto -> do
|
||||
-- notice "check deleted"
|
||||
hashes <- readTVarIO thashes
|
||||
|
||||
for_ hashes $ \h -> do
|
||||
|
||||
ncqLocate2 sto h >>= \case
|
||||
Nothing -> notice $ "not-found" <+> pretty h
|
||||
Just loc -> do
|
||||
|
||||
what <- (ncqLocate2 sto h <&> fmap (ncqGetEntryBS sto))
|
||||
>>= orThrowUser "NOT FOUND"
|
||||
|
||||
let (k,wtf) = ncqEntryUnwrap sto what
|
||||
let tomb = N2.ncqIsTomb sto loc
|
||||
|
||||
-- debug $ pretty (coerce @_ @HashRef k) <+> viaShow wtf <+> pretty tomb
|
||||
|
||||
assertBool (show $ "tomb/3" <+> pretty h) tomb
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case
|
||||
[ LitIntVal tn, LitIntVal n ] -> do
|
||||
runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n)
|
||||
|
@ -1149,6 +1534,7 @@ main = do
|
|||
[ LitIntVal n ] -> runTest $ testFilterEmulate1 True (fromIntegral n)
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
hidden do
|
||||
internalEntries
|
||||
entry $ bindMatch "#!" $ nil_ $ const none
|
||||
|
@ -1166,4 +1552,3 @@ main = do
|
|||
`finally` flushLoggers
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue