From db41293fa2eab2c4072be23bee392c1d5837700c Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Mon, 12 May 2025 10:56:08 +0300 Subject: [PATCH] wip, compiles --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 196 ++++++++++++++--------- hbs2-tests/test/TestCQ.hs | 5 - 2 files changed, 124 insertions(+), 77 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index ec61c17d..5c44ee83 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -21,7 +21,7 @@ import Network.ByteOrder qualified as N import Data.HashMap.Strict (HashMap) import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe -import Data.Ord (Down(..)) +import Data.Ord (Down(..),comparing) import Control.Concurrent.STM qualified as STM import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) @@ -53,6 +53,7 @@ import System.Posix.Types as Posix import System.Posix.IO.ByteString as Posix import System.Posix.Unistd import System.Posix.Files (getFileStatus, modificationTimeHiRes) +import System.IO.Error (catchIOError) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) -- import Foreign.Ptr @@ -92,10 +93,10 @@ mkFilePrio :: TimeSpec -> FilePrio mkFilePrio = FilePrio . Down data CachedEntry = - CachedEntry { cachedTs :: TimeSpec - , cachedMmapedIdx :: ByteString + CachedEntry { cachedMmapedIdx :: ByteString , cachedMmapedData :: ByteString , cachedNway :: NWayHash + , cachedTs :: TVar TimeSpec } data NCQStorage = @@ -105,8 +106,7 @@ data NCQStorage = , ncqSyncSize :: Int , ncqMinLog :: Int , ncqMaxLog :: Int - , ncqMaxCachedIdx :: Int - , ncqMaxCachedData :: Int + , ncqMaxCached :: Int , ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) @@ -210,13 +210,17 @@ ncqAddCachedSTM now limit tv k v = do writeTVar tv (HPSQ.insert k now v dst) - ncqAddTrackedFilesIO :: MonadIO m => NCQStorage -> [FilePath] -> m () ncqAddTrackedFilesIO ncq fps = do - tsFiles <- forM fps \fp -> do - stat <- liftIO $ getFileStatus fp - let ts = modificationTimeHiRes stat - pure (FileKey (fromString fp), TimeSpec (floor ts) 0) + tsFiles <- catMaybes <$> forM fps \fp -> liftIO $ do + catchIOError + (do + stat <- getFileStatus fp + let ts = modificationTimeHiRes stat + pure $ Just (FileKey (fromString fp), TimeSpec (floor ts) 0)) + (\e -> do + err $ "ncqAddTrackedFilesIO: failed to stat " <+> pretty fp <+> pretty (displayException e) + pure Nothing) atomically $ ncqAddTrackedFilesSTM ncq tsFiles @@ -540,9 +544,40 @@ ncqLocatedSize = \case InCurrent (_,s) -> fromIntegral s InFossil _ (_,s) -> fromIntegral s + + +evictIfNeededSTM :: NCQStorage -> Maybe Int -> STM () +evictIfNeededSTM NCQStorage{..} howMany = do + cur <- readTVar ncqCachedEntries + + let need = fromMaybe (cur `div` 2) howMany + excess = max 0 (cur + need - ncqMaxCached) + + when (excess > 0) do + files <- readTVar ncqTrackedFiles <&> HPSQ.toList + + -- собрать [(ts, k, prio)] с чтением TVar + oldest <- forM files \case + (k, prio, Just ce) -> do + ts <- readTVar (cachedTs ce) + pure (Just (ts, k, prio)) + _ -> pure Nothing + + let victims = + oldest + & catMaybes + & List.sortOn (\(ts,_,_) -> ts) + & List.take excess + + for_ victims $ \(_,k,prio) -> do + modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing) + modifyTVar ncqCachedEntries (subtract 1) + + + ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do - + -- сначала проверяем очередь и current l1 <- atomically do inQ <- readTVar ncqWriteQueue <&> (fmap snd . HPSQ.lookup h) <&> fmap InWriteQueue inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent @@ -551,57 +586,52 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do for_ l1 $ exit . Just now <- getTimeCoarse + tracked <- readTVarIO ncqTrackedFiles - (cachedIdx, rest) <- atomically do - cached <- readTVar ncqCachedIndexes - other' <- readTVar ncqTrackedFiles <&> HPSQ.keys - let other = [ x | x <- other', not (HPSQ.member x cached) ] - pure (cached, other) + for_ (HPSQ.toList tracked) $ \(fk, prio, mCached) -> do + case mCached of + Just (CachedEntry{..}) -> do + lookupEntry h (cachedMmapedIdx, cachedNway) <&> fmap (InFossil fk) >>= \case + Just loc -> do + atomically $ writeTVar cachedTs now - for_ (HPSQ.toList cachedIdx) $ \(fk,_,nway) -> do - lookupEntry h nway <&> fmap (InFossil fk) >>= \case - Nothing -> pure Nothing -- none - other -> do - atomically $ modifyTVar ncqCachedIndexes (HPSQ.insert fk now nway) - exit other + exit (Just loc) - -- TODO: use-filter-for-faster-scan - -- 1. Какой фильтр? - -- 2. Как и когда его перестраивать? - -- 2.1 На открытии? Будет расти время открытия (но можно параллельно) - -- + Nothing -> pure () - for_ rest $ \r -> runMaybeT do - let fn = ncqGetIndexFileName ncq r + Nothing -> void $ runMaybeT do + let indexFile = ncqGetIndexFileName ncq fk + let dataFile = ncqGetDataFileName ncq fk - nway' <- liftIO (nwayHashMMapReadOnly fn) + (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) >>= toMPlus + datBs <- liftIO $ mmapFileByteString dataFile Nothing - when (isNothing nway') do - err ("NCQStorage: can't mmap file" <+> pretty fn) + e <- lookupEntry h (idxBs, idxNway) <&> fmap (InFossil fk) >>= toMPlus - nway <- toMPlus nway' + liftIO $ atomically do + files <- readTVar ncqTrackedFiles + case HPSQ.lookup fk files of + Just (p, _) -> do + ce <- CachedEntry idxBs datBs idxNway <$> newTVar now + modifyTVar ncqTrackedFiles (HPSQ.insert fk p (Just ce)) + modifyTVar ncqCachedEntries (+1) + evictIfNeededSTM ncq (Just 1) + Nothing -> pure () - e <- lookupEntry h nway <&> fmap (InFossil r) >>= toMPlus - - liftIO (mmapFileByteString (ncqGetDataFileName ncq r) Nothing) >>= \mmaped -> - atomically do - ncqAddCachedSTM now ncqMaxCachedIdx ncqCachedIndexes r nway - ncqAddCachedSTM now ncqMaxCachedData ncqCachedData r mmaped - - lift (exit (Just e)) + lift (exit (Just e)) pure Nothing where - lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do + entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= toMPlus + pure + ( fromIntegral $ N.word64 (BS.take 8 entryBs) + , fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) ) + - entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) - >>= toMPlus - pure $ ( fromIntegral $ N.word64 (BS.take 8 entryBs), - fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs))) ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do @@ -640,44 +670,44 @@ ncqStorageIsDeleted :: MonadIO m => NCQStorage -> HashRef -> m Bool ncqStorageIsDeleted NCQStorage{..} what = do pure False + ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do deleted <- ncqStorageIsDeleted ncq h - when deleted $ exit Nothing ncqLocate ncq h >>= \case Nothing -> pure Nothing - Just (InWriteQueue lbs) -> pure $ Just lbs + + Just (InWriteQueue lbs) -> + pure $ Just lbs Just (InCurrent (o,l)) -> do - -- FIXME: timeout! answ <- atomically do a <- newEmptyTMVar fd <- readTVar ncqCurrentHandleR modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) - modifyTVar ncqCurrentReadReq ( |> (fd, o, l, a) ) + modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) pure a - atomically $ takeTMVar answ <&> Just . LBS.fromStrict + atomically $ takeTMVar answ <&> Just . LBS.fromStrict Just (InFossil key (o,l)) -> do + mCE <- atomically do + files <- readTVar ncqTrackedFiles + pure $ HPSQ.lookup key files >>= snd - mmaped <- readTVarIO ncqCachedData <&> HPSQ.lookup key >>= \case - Just (_,mmaped) -> do + case mCE of + Just CachedEntry{..} -> do now <- getTimeCoarse - atomically $ modifyTVar ncqCachedData (HPSQ.insert key now mmaped) - pure mmaped + atomically $ writeTVar cachedTs now + let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) cachedMmapedData) + pure $ Just $ LBS.fromStrict chunk Nothing -> do - now <- getTimeCoarse - let fn = ncqGetDataFileName ncq key - -- TODO: possible-exception! - newMmaped <- liftIO (mmapFileByteString fn Nothing) - atomically $ ncqAddCachedSTM now ncqMaxCachedData ncqCachedData key newMmaped - pure newMmaped + err $ "ncqStorageGet: missing CachedEntry for " <+> pretty key + pure Nothing - pure $ Just $ LBS.fromStrict $ BS.take (fromIntegral l) (BS.drop (fromIntegral o+4+32) mmaped) ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef) ncqStorageGetRef NCQStorage{..} ref = readTVarIO ncqRefsMem <&> HM.lookup ref @@ -707,21 +737,44 @@ ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do atomically $ readTVar ncqFlushNow >>= mapM_ (`writeTQueue` ()) + ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do - now <- getTimeCoarse - for_ keys $ \key -> do - let fn = ncqGetIndexFileName ncq key - liftIO (nwayHashMMapReadOnly fn) >>= \case - Nothing -> err $ "NCQStorage: can't mmap index file" <+> pretty fn - Just nway -> atomically do - ncqAddCachedSTM now ncqMaxCachedIdx ncqCachedIndexes key nway + now <- getTimeCoarse + + ncqAddTrackedFilesIO ncq (fmap (BS8.unpack . coerce) keys) + + loaded <- catMaybes <$> forM keys \key -> runMaybeT do + mEntry <- liftIO $ readTVarIO ncqTrackedFiles <&> HPSQ.lookup key + guard (maybe True (\(_, m) -> isNothing m) mEntry) + + let idxFile = ncqGetIndexFileName ncq key + let datFile = ncqGetDataFileName ncq key + + (mmIdx, nway) <- MaybeT $ liftIO $ nwayHashMMapReadOnly idxFile + mmData <- liftIO $ mmapFileByteString datFile Nothing + tnow <- newTVarIO now + pure (key, CachedEntry mmIdx mmData nway tnow) + + atomically do + evictIfNeededSTM ncq (Just (List.length loaded)) + + for_ loaded \(k, ce) -> do + files <- readTVar ncqTrackedFiles + case HPSQ.lookup k files of + Just (p, Nothing) -> do + modifyTVar ncqTrackedFiles (HPSQ.insert k p (Just ce)) + modifyTVar ncqCachedEntries (+1) + _ -> pure () + + + ncqLoadIndexes :: MonadIO m => NCQStorage -> m () ncqLoadIndexes ncq@NCQStorage{..} = do debug "WIP: ncqStorageLoadIndexes" w <- readTVarIO ncqTrackedFiles - <&> List.take (ncqMaxCachedIdx `div` 2) . HPSQ.keys + <&> List.take (ncqMaxCached `div` 2) . HPSQ.keys ncqLoadSomeIndexes ncq w ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m () @@ -822,8 +875,7 @@ ncqStorageInit_ check path = do let ncqMinLog = 2 * (1024 ^ 3) let ncqMaxLog = 10 * (1024 ^ 3) - let ncqMaxCachedIdx = 64 - let ncqMaxCachedData = ncqMaxCachedIdx `div` 2 + let ncqMaxCached = 64 ncqWriteQueue <- newTVarIO HPSQ.empty diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 64905f1c..022f0a6e 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -293,11 +293,6 @@ main = do for_ trf $ \tf -> do notice $ "tracked" <+> pretty tf - tri <- readTVarIO ncqCachedIndexes <&> HPSQ.toList - - for_ tri $ \(k,_,_) -> do - notice $ "cached-index" <+> pretty k - e -> throwIO $ BadFormException @C (mkList e) entry $ bindMatch "test:ncq:raw" $ \case