From ba0a631ee24b225e9d4e9254871b3c51b9be0114 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 21 Jul 2025 09:10:21 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 35 +++--- hbs2-tests/test/TestNCQ.hs | 123 ++++++++++++++++++++++ 2 files changed, 144 insertions(+), 14 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 7f26c97b..53a89e3f 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -183,6 +183,8 @@ data NCQStorage2 = , ncqSweepSem :: TSem , ncqMergeTasks :: TVar Int , ncqOnRunWriteIdle :: TVar (IO ()) + + , ncqReadSem :: TSem } megabytes :: forall a . Integral a => a @@ -202,7 +204,7 @@ ncqStorageOpen2 fp upd = do let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqMaxCached = 128 let ncqIdleThrsh = 50.00 - let ncqPostponeMerge = 30.00 + let ncqPostponeMerge = 600.00 let ncqPostponeSweep = 2 * ncqPostponeMerge let ncqLuckyNum = 2 @@ -233,6 +235,8 @@ ncqStorageOpen2 fp upd = do ncqMergeTasks <- newTVarIO 0 ncqOnRunWriteIdle <- newTVarIO none + ncqReadSem <- atomically $ newTSem 1 + ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" @@ -469,7 +473,7 @@ ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a)) -> HashRef -> (Location -> m (Seek (f a))) -> m (f a) -ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do +ncqSeekInFossils ncq@NCQStorage2{..} href action = withSem ncqReadSem $ useVersion ncq $ const do tracked <- readTVarIO ncqTrackedFiles let l = V.length tracked @@ -493,7 +497,7 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do go i (a+1) r Just CachedEntry{..} -> do - liftIO (lookupEntry href (cachedMmapedIdx, cachedNway)) >>= \case + liftIO (ncqLookupIndex href (cachedMmapedIdx, cachedNway)) >>= \case Nothing -> go (i+1) 0 r Just (offset, size) -> do now <- getTimeCoarse @@ -504,18 +508,21 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do go 0 0 mempty - where - {-# INLINE lookupEntry #-} - lookupEntry hx (mmaped, nway) = do - fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) - where - {-# INLINE decodeEntry #-} - decodeEntry entryBs = do - let (p,r) = BS.splitAt 8 entryBs - let off = fromIntegral (N.word64 p) - let size = fromIntegral (N.word32 (BS.take 4 r)) - ( off, size ) +ncqLookupIndex :: MonadUnliftIO m + => HashRef + -> (ByteString, NWayHash) + -> m (Maybe ( NCQOffset, NCQSize )) +ncqLookupIndex hx (mmaped, nway) = do + fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) + where + {-# INLINE decodeEntry #-} + decodeEntry entryBs = do + let (p,r) = BS.splitAt 8 entryBs + let off = fromIntegral (N.word64 p) + let size = fromIntegral (N.word32 (BS.take 4 r)) + ( off, size ) +{-# INLINE ncqLookupIndex #-} ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) ncqLocate2 ncq href = do diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index dabe1a17..68ea8ca6 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -752,6 +752,7 @@ testNCQ2Simple1 syn TestEnv{..} = do let l = headDef 5 $ drop 1 [ fromIntegral x | LitIntVal x <- argz ] let s = headDef (256*1024) $ drop 2 [ fromIntegral (1024 * x) | LitIntVal x <- argz ] + notice $ "insert" <+> pretty n <+> "random blocks of size" <+> pretty s thashes <- newTQueueIO @@ -819,6 +820,125 @@ testNCQ2Simple1 syn TestEnv{..} = do notice $ pretty (sec6 t1) <+> "lookup" <+> pretty n <+> "blocks" + +testNCQ2Lookup1:: forall c m . (MonadUnliftIO m, IsContext c) + => [Syntax c] + -> TestEnv + -> m () + +testNCQ2Lookup1 syn TestEnv{..} = do + debug $ "testNCQ2Simple1" <+> pretty syn + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + let (opts, argz) = splitOpts [("-r",1),("-m",0)] syn + + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] + let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ] + let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ] + let r = (4*1024, 64*1024) + + let rt = headDef 2 [ fromIntegral x | ListVal [StringLike "-r", LitIntVal x ] <- opts ] + let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ] + + notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts + + thashes <- newTQueueIO + + sizes <- liftIO $ replicateM n (uniformRM r g ) + + ncqWithStorage ncqDir $ \sto -> liftIO do + pooledForConcurrentlyN_ 8 sizes $ \size -> do + z <- uniformByteStringM size g + h <- ncqPutBS sto (Just B) Nothing z + atomically $ writeTQueue thashes h + + + hs <- atomically $ STM.flushTQueue thashes + + when merge do + notice "merge full" + ncqMergeFull sto + + ffs <- N2.ncqListTrackedFiles sto + notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs) + + replicateM_ nl do + + tfound <- newTVarIO 0 + + t0 <- getTimeCoarse + + void $ flip runContT pure $ callCC \exit -> do + + readQ <- newTQueueIO + + reader <- replicateM rt $ ContT $ withAsync $ fix \next -> do + + (h, answ) <- atomically $ readTQueue readQ + + f1 <- ncqLookupEntry sto h <&> isJust + + when f1 do + atomically (putTMVar answ True) >> next + + ffs <- liftIO $ N2.ncqListTrackedFiles sto + + for_ ffs $ \(f, ce, te) -> do + + -- when (isNotPending ce) do + case ce of + Just (PendingEntry{}) -> none + + Just (CachedEntry{..}) -> do + found <- ncqLookupIndex h (cachedMmapedIdx, cachedNway) <&> isJust + + when found do + atomically (putTMVar answ True) >> next + + Nothing -> do + + tnow <- getTimeCoarse >>= newTVarIO + + let indexFile = N2.ncqGetFileName sto (toFileName (IndexFile f)) + let dataFile = N2.ncqGetFileName sto (toFileName (DataFile f)) + + what@(idxBs, idxNway) <- nwayHashMMapReadOnly indexFile `orDie` "mmap fucked" + datBs <- mmapFileByteString dataFile Nothing + + let ce = CachedEntry idxBs datBs idxNway tnow + + atomically $ writeTVar te (Just ce) + + found <- ncqLookupIndex h what <&> isJust + + when found do + atomically (putTMVar answ True) >> next + + atomically (putTMVar answ False) >> next + + liftIO $ pooledForConcurrentlyN_ nt hs $ \h -> do + answ <- newEmptyTMVarIO + atomically $ writeTQueue readQ (h, answ) + found <- atomically $ takeTMVar answ + + when found do + atomically $ modifyTVar' tfound succ + + t1 <- getTimeCoarse + + let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3 + + found <- readTVarIO tfound + + notice $ "scan all files" <+> pretty dt <+> pretty found + + -- pause @'Seconds 5 + + genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString genRandomBS g n = do uniformByteStringM n g @@ -1347,6 +1467,9 @@ main = do entry $ bindMatch "test:ncq2:simple1" $ nil_ $ \e -> do runTest (testNCQ2Simple1 e) + entry $ bindMatch "test:ncq2:lookup1" $ nil_ $ \e -> do + runTest (testNCQ2Lookup1 e) + entry $ bindMatch "test:ncq2:sweep1" $ nil_ $ \e -> do runTest (testNCQ2Sweep1 e)