From e08f68fbaf0d8d04247558caa7f582c13b3e7c44 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 21 Jul 2025 11:02:39 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 57 ++++++++++++--- hbs2-tests/test/TestNCQ.hs | 86 ++++++++++++++++++++++- 2 files changed, 131 insertions(+), 12 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 53a89e3f..19f37592 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -108,8 +108,8 @@ type FOff = Word64 data NCQEntry = NCQEntry - { ncqEntryData :: ByteString - , ncqDumped :: TVar (Maybe FileKey) + { ncqEntryData :: !ByteString + , ncqDumped :: !(TVar (Maybe FileKey)) } type Shard = TVar (HashMap HashRef NCQEntry) @@ -126,10 +126,9 @@ data NCQFlag = NCQMergeNow | NCQCompactNow deriving (Eq,Ord,Generic) - data Location = - InFossil FileKey !ByteString !NCQOffset !NCQSize - | InMemory ByteString + InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize + | InMemory {-# UNPACK #-} !ByteString instance Pretty Location where pretty = \case @@ -163,6 +162,7 @@ data NCQStorage2 = , ncqMemTable :: Vector Shard , ncqWriteQ :: TVar (Seq HashRef) , ncqWriteOps :: Vector (TQueue (IO ())) + , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) , ncqStorageTasks :: TVar Int , ncqStorageStopReq :: TVar Bool , ncqStorageSyncReq :: TVar Bool @@ -184,7 +184,6 @@ data NCQStorage2 = , ncqMergeTasks :: TVar Int , ncqOnRunWriteIdle :: TVar (IO ()) - , ncqReadSem :: TSem } megabytes :: forall a . Integral a => a @@ -235,7 +234,7 @@ ncqStorageOpen2 fp upd = do ncqMergeTasks <- newTVarIO 0 ncqOnRunWriteIdle <- newTVarIO none - ncqReadSem <- atomically $ newTSem 1 + ncqReadReq <- newTQueueIO ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList @@ -473,7 +472,7 @@ ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a)) -> HashRef -> (Location -> m (Seek (f a))) -> m (f a) -ncqSeekInFossils ncq@NCQStorage2{..} href action = withSem ncqReadSem $ useVersion ncq $ const do +ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do tracked <- readTVarIO ncqTrackedFiles let l = V.length tracked @@ -524,12 +523,18 @@ ncqLookupIndex hx (mmaped, nway) = do ( off, size ) {-# INLINE ncqLookupIndex #-} -ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) -ncqLocate2 ncq href = do +ncqLocateActually :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) +ncqLocateActually ncq href = do inMem <- ncqLookupEntry ncq href <&> fmap (InMemory . ncqEntryData) inFo <- listToMaybe <$> ncqSeekInFossils ncq href \loc -> pure (SeekStop [loc]) pure $ inMem <|> inFo +ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) +ncqLocate2 NCQStorage2{..} href = do + answ <- newEmptyTMVarIO + atomically $ writeTQueue ncqReadReq (href, answ) + atomically $ takeTMVar answ + data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) @@ -574,6 +579,38 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ)) + replicateM_ 2 $ spawnActivity $ fix \next -> do + (h, answ) <- atomically $ readTQueue ncqReadReq + + let answer l = atomically (putTMVar answ l) + + let lookupCached fk = \case + PendingEntry{} -> none + CachedEntry{..} -> do + ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case + Nothing -> none + Just (!offset,!size) -> do + answer (Just (InFossil fk cachedMmapedData offset size)) + next + {-# INLINE lookupCached #-} + + ncqLookupEntry ncq h >>= \case + Nothing -> none + Just e -> answer (Just (InMemory (ncqEntryData e))) >> next + + useVersion ncq $ const do + + tracked <- readTVarIO ncqTrackedFiles + + for_ tracked $ \(TrackedFile{..}) -> do + readTVarIO tfCached >>= \case + Just ce -> lookupCached tfKey ce + Nothing -> ncqLoadTrackedFile ncq TrackedFile{..} >>= \case + Nothing -> err $ "unable to load index" <+> pretty tfKey + Just ce -> lookupCached tfKey ce + + next + let shLast = V.length ncqWriteOps - 1 spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do let q = ncqWriteOps ! i diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 68ea8ca6..d372af7c 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -821,13 +821,85 @@ testNCQ2Simple1 syn TestEnv{..} = do +testNCQ2Lookup2:: forall c m . (MonadUnliftIO m, IsContext c) + => [Syntax c] + -> TestEnv + -> m () + +testNCQ2Lookup2 syn TestEnv{..} = do + debug $ "testNCQ2Lookup2" <+> pretty syn + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + let (opts, argz) = splitOpts [("-m",0)] syn + + let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] + let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ] + let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ] + let r = (4*1024, 64*1024) + + let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ] + + notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts + + thashes <- newTQueueIO + + sizes <- liftIO $ replicateM n (uniformRM r g ) + + res <- newTQueueIO + + ncqWithStorage ncqDir $ \sto -> liftIO do + pooledForConcurrentlyN_ 8 sizes $ \size -> do + z <- uniformByteStringM size g + h <- ncqPutBS sto (Just B) Nothing z + atomically $ writeTQueue thashes h + + hs <- atomically $ STM.flushTQueue thashes + + when merge do + notice "merge full" + ncqMergeFull sto + + ffs <- N2.ncqListTrackedFiles sto + notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs) + + replicateM_ nl do + + tfound <- newTVarIO 0 + + t0 <- getTimeCoarse + + liftIO $ pooledForConcurrentlyN_ nt hs $ \h -> do + found <- ncqLocate2 sto h <&> isJust + when found do + atomically $ modifyTVar' tfound succ + + t1 <- getTimeCoarse + + let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3 + atomically $ writeTQueue res dt + + found <- readTVarIO tfound + + notice $ "scan all files" <+> pretty dt <+> pretty found + + m <- atomically (STM.flushTQueue res) + <&> List.sort + <&> \x -> atDef 0 x (List.length x `quot` 2) + + notice $ "median" <+> pretty m + + testNCQ2Lookup1:: forall c m . (MonadUnliftIO m, IsContext c) => [Syntax c] -> TestEnv -> m () testNCQ2Lookup1 syn TestEnv{..} = do - debug $ "testNCQ2Simple1" <+> pretty syn + debug $ "testNCQ2Lookup1" <+> pretty syn let tmp = testEnvDir let ncqDir = tmp q <- newTQueueIO @@ -866,6 +938,8 @@ testNCQ2Lookup1 syn TestEnv{..} = do ffs <- N2.ncqListTrackedFiles sto notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs) + res <- newTQueueIO + replicateM_ nl do tfound <- newTVarIO 0 @@ -931,12 +1005,17 @@ testNCQ2Lookup1 syn TestEnv{..} = do t1 <- getTimeCoarse let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3 + atomically $ writeTQueue res dt found <- readTVarIO tfound notice $ "scan all files" <+> pretty dt <+> pretty found - -- pause @'Seconds 5 + m <- atomically (STM.flushTQueue res) + <&> List.sort + <&> \x -> atDef 0 x (List.length x `quot` 2) + + notice $ "median" <+> pretty m genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString @@ -1470,6 +1549,9 @@ main = do entry $ bindMatch "test:ncq2:lookup1" $ nil_ $ \e -> do runTest (testNCQ2Lookup1 e) + entry $ bindMatch "test:ncq2:lookup2" $ nil_ $ \e -> do + runTest (testNCQ2Lookup2 e) + entry $ bindMatch "test:ncq2:sweep1" $ nil_ $ \e -> do runTest (testNCQ2Sweep1 e)