This commit is contained in:
voidlizard 2025-07-21 11:02:39 +03:00
parent ba0a631ee2
commit e08f68fbaf
2 changed files with 131 additions and 12 deletions

View File

@ -108,8 +108,8 @@ type FOff = Word64
data NCQEntry =
NCQEntry
{ ncqEntryData :: ByteString
, ncqDumped :: TVar (Maybe FileKey)
{ ncqEntryData :: !ByteString
, ncqDumped :: !(TVar (Maybe FileKey))
}
type Shard = TVar (HashMap HashRef NCQEntry)
@ -126,10 +126,9 @@ data NCQFlag =
NCQMergeNow | NCQCompactNow
deriving (Eq,Ord,Generic)
data Location =
InFossil FileKey !ByteString !NCQOffset !NCQSize
| InMemory ByteString
InFossil {-# UNPACK #-} !FileKey !ByteString !NCQOffset !NCQSize
| InMemory {-# UNPACK #-} !ByteString
instance Pretty Location where
pretty = \case
@ -163,6 +162,7 @@ data NCQStorage2 =
, ncqMemTable :: Vector Shard
, ncqWriteQ :: TVar (Seq HashRef)
, ncqWriteOps :: Vector (TQueue (IO ()))
, ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location))
, ncqStorageTasks :: TVar Int
, ncqStorageStopReq :: TVar Bool
, ncqStorageSyncReq :: TVar Bool
@ -184,7 +184,6 @@ data NCQStorage2 =
, ncqMergeTasks :: TVar Int
, ncqOnRunWriteIdle :: TVar (IO ())
, ncqReadSem :: TSem
}
megabytes :: forall a . Integral a => a
@ -235,7 +234,7 @@ ncqStorageOpen2 fp upd = do
ncqMergeTasks <- newTVarIO 0
ncqOnRunWriteIdle <- newTVarIO none
ncqReadSem <- atomically $ newTSem 1
ncqReadReq <- newTQueueIO
ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList
@ -473,7 +472,7 @@ ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a))
-> HashRef
-> (Location -> m (Seek (f a)))
-> m (f a)
ncqSeekInFossils ncq@NCQStorage2{..} href action = withSem ncqReadSem $ useVersion ncq $ const do
ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do
tracked <- readTVarIO ncqTrackedFiles
let l = V.length tracked
@ -524,12 +523,18 @@ ncqLookupIndex hx (mmaped, nway) = do
( off, size )
{-# INLINE ncqLookupIndex #-}
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
ncqLocate2 ncq href = do
ncqLocateActually :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
ncqLocateActually ncq href = do
inMem <- ncqLookupEntry ncq href <&> fmap (InMemory . ncqEntryData)
inFo <- listToMaybe <$> ncqSeekInFossils ncq href \loc -> pure (SeekStop [loc])
pure $ inMem <|> inFo
ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location)
ncqLocate2 NCQStorage2{..} href = do
answ <- newEmptyTMVarIO
atomically $ writeTQueue ncqReadReq (href, answ)
atomically $ takeTMVar answ
data RunSt =
RunNew
| RunWrite (FileKey, Fd, Int, Int)
@ -574,6 +579,38 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ))
replicateM_ 2 $ spawnActivity $ fix \next -> do
(h, answ) <- atomically $ readTQueue ncqReadReq
let answer l = atomically (putTMVar answ l)
let lookupCached fk = \case
PendingEntry{} -> none
CachedEntry{..} -> do
ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case
Nothing -> none
Just (!offset,!size) -> do
answer (Just (InFossil fk cachedMmapedData offset size))
next
{-# INLINE lookupCached #-}
ncqLookupEntry ncq h >>= \case
Nothing -> none
Just e -> answer (Just (InMemory (ncqEntryData e))) >> next
useVersion ncq $ const do
tracked <- readTVarIO ncqTrackedFiles
for_ tracked $ \(TrackedFile{..}) -> do
readTVarIO tfCached >>= \case
Just ce -> lookupCached tfKey ce
Nothing -> ncqLoadTrackedFile ncq TrackedFile{..} >>= \case
Nothing -> err $ "unable to load index" <+> pretty tfKey
Just ce -> lookupCached tfKey ce
next
let shLast = V.length ncqWriteOps - 1
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
let q = ncqWriteOps ! i

View File

@ -821,13 +821,85 @@ testNCQ2Simple1 syn TestEnv{..} = do
testNCQ2Lookup2:: forall c m . (MonadUnliftIO m, IsContext c)
=> [Syntax c]
-> TestEnv
-> m ()
testNCQ2Lookup2 syn TestEnv{..} = do
debug $ "testNCQ2Lookup2" <+> pretty syn
let tmp = testEnvDir
let ncqDir = tmp
q <- newTQueueIO
g <- liftIO MWC.createSystemRandom
let (opts, argz) = splitOpts [("-m",0)] syn
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ]
let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ]
let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ]
let r = (4*1024, 64*1024)
let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ]
notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts
thashes <- newTQueueIO
sizes <- liftIO $ replicateM n (uniformRM r g )
res <- newTQueueIO
ncqWithStorage ncqDir $ \sto -> liftIO do
pooledForConcurrentlyN_ 8 sizes $ \size -> do
z <- uniformByteStringM size g
h <- ncqPutBS sto (Just B) Nothing z
atomically $ writeTQueue thashes h
hs <- atomically $ STM.flushTQueue thashes
when merge do
notice "merge full"
ncqMergeFull sto
ffs <- N2.ncqListTrackedFiles sto
notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs)
replicateM_ nl do
tfound <- newTVarIO 0
t0 <- getTimeCoarse
liftIO $ pooledForConcurrentlyN_ nt hs $ \h -> do
found <- ncqLocate2 sto h <&> isJust
when found do
atomically $ modifyTVar' tfound succ
t1 <- getTimeCoarse
let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3
atomically $ writeTQueue res dt
found <- readTVarIO tfound
notice $ "scan all files" <+> pretty dt <+> pretty found
m <- atomically (STM.flushTQueue res)
<&> List.sort
<&> \x -> atDef 0 x (List.length x `quot` 2)
notice $ "median" <+> pretty m
testNCQ2Lookup1:: forall c m . (MonadUnliftIO m, IsContext c)
=> [Syntax c]
-> TestEnv
-> m ()
testNCQ2Lookup1 syn TestEnv{..} = do
debug $ "testNCQ2Simple1" <+> pretty syn
debug $ "testNCQ2Lookup1" <+> pretty syn
let tmp = testEnvDir
let ncqDir = tmp
q <- newTQueueIO
@ -866,6 +938,8 @@ testNCQ2Lookup1 syn TestEnv{..} = do
ffs <- N2.ncqListTrackedFiles sto
notice $ "database prepared" <+> pretty (List.length ffs) <+> pretty (List.length hs)
res <- newTQueueIO
replicateM_ nl do
tfound <- newTVarIO 0
@ -931,12 +1005,17 @@ testNCQ2Lookup1 syn TestEnv{..} = do
t1 <- getTimeCoarse
let dt = realToFrac (toNanoSecs (t1 - t0)) / 1e9 :: Fixed E3
atomically $ writeTQueue res dt
found <- readTVarIO tfound
notice $ "scan all files" <+> pretty dt <+> pretty found
-- pause @'Seconds 5
m <- atomically (STM.flushTQueue res)
<&> List.sort
<&> \x -> atDef 0 x (List.length x `quot` 2)
notice $ "median" <+> pretty m
genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString
@ -1470,6 +1549,9 @@ main = do
entry $ bindMatch "test:ncq2:lookup1" $ nil_ $ \e -> do
runTest (testNCQ2Lookup1 e)
entry $ bindMatch "test:ncq2:lookup2" $ nil_ $ \e -> do
runTest (testNCQ2Lookup2 e)
entry $ bindMatch "test:ncq2:sweep1" $ nil_ $ \e -> do
runTest (testNCQ2Sweep1 e)