From 24a46e1c0250ff5b53df37910e82cefced75b8ee Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Mon, 12 May 2025 09:01:22 +0300 Subject: [PATCH] wip, wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 75 +++++++++++++++--------- hbs2-tests/test/TestCQ.hs | 2 +- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 6c1ffe48..ec61c17d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -21,6 +21,7 @@ import Network.ByteOrder qualified as N import Data.HashMap.Strict (HashMap) import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe +import Data.Ord (Down(..)) import Control.Concurrent.STM qualified as STM import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) @@ -51,6 +52,7 @@ import System.Posix.IO as PosixBase import System.Posix.Types as Posix import System.Posix.IO.ByteString as Posix import System.Posix.Unistd +import System.Posix.Files (getFileStatus, modificationTimeHiRes) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) -- import Foreign.Ptr @@ -82,6 +84,20 @@ instance IsString FileKey where instance Pretty FileKey where pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) +newtype FilePrio = FilePrio (Down TimeSpec) + deriving newtype (Eq,Ord) + deriving stock (Generic,Show) + +mkFilePrio :: TimeSpec -> FilePrio +mkFilePrio = FilePrio . Down + +data CachedEntry = + CachedEntry { cachedTs :: TimeSpec + , cachedMmapedIdx :: ByteString + , cachedMmapedData :: ByteString + , cachedNway :: NWayHash + } + data NCQStorage = NCQStorage { ncqRoot :: FilePath @@ -95,9 +111,8 @@ data NCQStorage = , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) - , ncqTrackedFiles :: TVar (HashSet FileKey) - , ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash)) - , ncqCachedData :: TVar (HashPSQ FileKey TimeSpec ByteString) + , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) + , ncqCachedEntries :: TVar Int , ncqNotWritten :: TVar Word64 , ncqLastWritten :: TVar TimeSpec , ncqCurrentHandleW :: TVar Fd @@ -174,15 +189,6 @@ ncqGetDeletedFileName :: NCQStorage -> FilePath ncqGetDeletedFileName ncq = do ncqGetFileName ncq "deleted.data" --- ncqCheckCurrentSize :: MonadIO m => NCQStorage -> m (Either Integer Integer) --- ncqCheckCurrentSize ncq = liftIO $ readCurrent `catch` (\(_ :: IOError) -> pure $ Left 0) --- where --- readCurrent = do --- let name = ncqGetCurrentName ncq --- a <- liftIO (BS.readFile (ncqGetCurrentSizeName ncq)) <&> N.word64 --- b <- fileSize name --- pure $ if a == fromIntegral b then Right (fromIntegral a) else Left (fromIntegral a) - ncqAddCachedSTM :: TimeSpec -- ^ now -> Int -- ^ limit @@ -204,9 +210,25 @@ ncqAddCachedSTM now limit tv k v = do writeTVar tv (HPSQ.insert k now v dst) -ncqAddTrackedFilesSTM :: NCQStorage -> [FileKey] -> STM () + +ncqAddTrackedFilesIO :: MonadIO m => NCQStorage -> [FilePath] -> m () +ncqAddTrackedFilesIO ncq fps = do + tsFiles <- forM fps \fp -> do + stat <- liftIO $ getFileStatus fp + let ts = modificationTimeHiRes stat + pure (FileKey (fromString fp), TimeSpec (floor ts) 0) + + atomically $ ncqAddTrackedFilesSTM ncq tsFiles + + +ncqAddTrackedFilesSTM :: NCQStorage -> [(FileKey, TimeSpec)] -> STM () ncqAddTrackedFilesSTM NCQStorage{..} keys = do - modifyTVar ncqTrackedFiles (HS.union (HS.fromList keys)) + old <- readTVar ncqTrackedFiles + let new = flip fix (old, keys) \next -> \case + (s, []) -> s + (s, (k,ts):xs) -> next (HPSQ.insert k (mkFilePrio ts) Nothing s, xs) + + writeTVar ncqTrackedFiles new ncqReadTrackedFiles :: MonadIO m => NCQStorage -> m () ncqReadTrackedFiles ncq@NCQStorage{} = do @@ -214,9 +236,8 @@ ncqReadTrackedFiles ncq@NCQStorage{} = do files <- dirFiles (ncqGetCurrentDir ncq) >>= mapM (pure . takeBaseName) <&> List.filter (List.isPrefixOf "fossil-") - <&> fmap fromString - atomically $ ncqAddTrackedFilesSTM ncq files + ncqAddTrackedFilesIO ncq files ncqWriteError :: MonadIO m => NCQStorage -> Text -> m () ncqWriteError ncq txt = liftIO do @@ -375,13 +396,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do for_ what $ \(fd,fn) -> do - key <- ncqIndexFile ncq fn <&> fromString @FileKey + key <- ncqIndexFile ncq fn + ncqAddTrackedFilesIO ncq [key] atomically do - ncqAddTrackedFilesSTM ncq [key] modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) - ncqLoadSomeIndexes ncq [key] + ncqLoadSomeIndexes ncq [fromString key] link indexer pure indexer @@ -533,7 +554,7 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do (cachedIdx, rest) <- atomically do cached <- readTVar ncqCachedIndexes - other' <- readTVar ncqTrackedFiles <&> HS.toList + other' <- readTVar ncqTrackedFiles <&> HPSQ.keys let other = [ x | x <- other', not (HPSQ.member x cached) ] pure (cached, other) @@ -699,14 +720,15 @@ ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do ncqLoadIndexes :: MonadIO m => NCQStorage -> m () ncqLoadIndexes ncq@NCQStorage{..} = do debug "WIP: ncqStorageLoadIndexes" - w <- readTVarIO ncqTrackedFiles <&> List.take (ncqMaxCachedIdx `div` 2) . HS.toList + w <- readTVarIO ncqTrackedFiles + <&> List.take (ncqMaxCachedIdx `div` 2) . HPSQ.keys ncqLoadSomeIndexes ncq w ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m () ncqFixIndexes ncq@NCQStorage{..} = do debug "ncqFixIndexes" - keys <- readTVarIO ncqTrackedFiles + keys <- readTVarIO ncqTrackedFiles <&> HPSQ.keys for_ keys $ \k -> do let idxName = ncqGetIndexFileName ncq k @@ -715,8 +737,8 @@ ncqFixIndexes ncq@NCQStorage{..} = do unless here do warn $ "missed-index" <+> pretty k let dataName = ncqGetDataFileName ncq k - newKey <- ncqIndexFile ncq dataName <&> fromString @FileKey - atomically $ ncqAddTrackedFilesSTM ncq [newKey] + newKey <- ncqIndexFile ncq dataName + ncqAddTrackedFilesIO ncq [newKey] ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage @@ -814,9 +836,8 @@ ncqStorageInit_ check path = do ncqCurrentReadReq <- newTVarIO mempty ncqCurrentUsage <- newTVarIO mempty ncqStopped <- newTVarIO False - ncqCachedIndexes <- newTVarIO HPSQ.empty - ncqCachedData <- newTVarIO HPSQ.empty - ncqTrackedFiles <- newTVarIO mempty + ncqTrackedFiles <- newTVarIO HPSQ.empty + ncqCachedEntries <- newTVarIO 0 let currentName = ncqGetCurrentName_ path ncqGen diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index e66dee65..64905f1c 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -288,7 +288,7 @@ main = do writer <- ContT $ withAsync $ ncqStorageRun ncq link writer - trf <- readTVarIO ncqTrackedFiles <&> HS.toList + trf <- readTVarIO ncqTrackedFiles <&> HPSQ.keys for_ trf $ \tf -> do notice $ "tracked" <+> pretty tf