This commit is contained in:
Dmitry Zuykov 2025-05-12 09:01:22 +03:00
parent 03ec08509a
commit 24a46e1c02
2 changed files with 49 additions and 28 deletions

View File

@ -21,6 +21,7 @@ import Network.ByteOrder qualified as N
import Data.HashMap.Strict (HashMap)
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Data.Ord (Down(..))
import Control.Concurrent.STM qualified as STM
import Data.HashPSQ qualified as HPSQ
import Data.HashPSQ (HashPSQ)
@ -51,6 +52,7 @@ import System.Posix.IO as PosixBase
import System.Posix.Types as Posix
import System.Posix.IO.ByteString as Posix
import System.Posix.Unistd
import System.Posix.Files (getFileStatus, modificationTimeHiRes)
import System.IO.MMap as MMap
import System.IO.Temp (emptyTempFile)
-- import Foreign.Ptr
@ -82,6 +84,20 @@ instance IsString FileKey where
instance Pretty FileKey where
pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s))
newtype FilePrio = FilePrio (Down TimeSpec)
deriving newtype (Eq,Ord)
deriving stock (Generic,Show)
mkFilePrio :: TimeSpec -> FilePrio
mkFilePrio = FilePrio . Down
data CachedEntry =
CachedEntry { cachedTs :: TimeSpec
, cachedMmapedIdx :: ByteString
, cachedMmapedData :: ByteString
, cachedNway :: NWayHash
}
data NCQStorage =
NCQStorage
{ ncqRoot :: FilePath
@ -95,9 +111,8 @@ data NCQStorage =
, ncqRefsDirty :: TVar Int
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString)
, ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64))
, ncqTrackedFiles :: TVar (HashSet FileKey)
, ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash))
, ncqCachedData :: TVar (HashPSQ FileKey TimeSpec ByteString)
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
, ncqCachedEntries :: TVar Int
, ncqNotWritten :: TVar Word64
, ncqLastWritten :: TVar TimeSpec
, ncqCurrentHandleW :: TVar Fd
@ -174,15 +189,6 @@ ncqGetDeletedFileName :: NCQStorage -> FilePath
ncqGetDeletedFileName ncq = do
ncqGetFileName ncq "deleted.data"
-- ncqCheckCurrentSize :: MonadIO m => NCQStorage -> m (Either Integer Integer)
-- ncqCheckCurrentSize ncq = liftIO $ readCurrent `catch` (\(_ :: IOError) -> pure $ Left 0)
-- where
-- readCurrent = do
-- let name = ncqGetCurrentName ncq
-- a <- liftIO (BS.readFile (ncqGetCurrentSizeName ncq)) <&> N.word64
-- b <- fileSize name
-- pure $ if a == fromIntegral b then Right (fromIntegral a) else Left (fromIntegral a)
ncqAddCachedSTM :: TimeSpec -- ^ now
-> Int -- ^ limit
@ -204,9 +210,25 @@ ncqAddCachedSTM now limit tv k v = do
writeTVar tv (HPSQ.insert k now v dst)
ncqAddTrackedFilesSTM :: NCQStorage -> [FileKey] -> STM ()
ncqAddTrackedFilesIO :: MonadIO m => NCQStorage -> [FilePath] -> m ()
ncqAddTrackedFilesIO ncq fps = do
tsFiles <- forM fps \fp -> do
stat <- liftIO $ getFileStatus fp
let ts = modificationTimeHiRes stat
pure (FileKey (fromString fp), TimeSpec (floor ts) 0)
atomically $ ncqAddTrackedFilesSTM ncq tsFiles
ncqAddTrackedFilesSTM :: NCQStorage -> [(FileKey, TimeSpec)] -> STM ()
ncqAddTrackedFilesSTM NCQStorage{..} keys = do
modifyTVar ncqTrackedFiles (HS.union (HS.fromList keys))
old <- readTVar ncqTrackedFiles
let new = flip fix (old, keys) \next -> \case
(s, []) -> s
(s, (k,ts):xs) -> next (HPSQ.insert k (mkFilePrio ts) Nothing s, xs)
writeTVar ncqTrackedFiles new
ncqReadTrackedFiles :: MonadIO m => NCQStorage -> m ()
ncqReadTrackedFiles ncq@NCQStorage{} = do
@ -214,9 +236,8 @@ ncqReadTrackedFiles ncq@NCQStorage{} = do
files <- dirFiles (ncqGetCurrentDir ncq)
>>= mapM (pure . takeBaseName)
<&> List.filter (List.isPrefixOf "fossil-")
<&> fmap fromString
atomically $ ncqAddTrackedFilesSTM ncq files
ncqAddTrackedFilesIO ncq files
ncqWriteError :: MonadIO m => NCQStorage -> Text -> m ()
ncqWriteError ncq txt = liftIO do
@ -375,13 +396,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
for_ what $ \(fd,fn) -> do
key <- ncqIndexFile ncq fn <&> fromString @FileKey
key <- ncqIndexFile ncq fn
ncqAddTrackedFilesIO ncq [key]
atomically do
ncqAddTrackedFilesSTM ncq [key]
modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd))
ncqLoadSomeIndexes ncq [key]
ncqLoadSomeIndexes ncq [fromString key]
link indexer
pure indexer
@ -533,7 +554,7 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
(cachedIdx, rest) <- atomically do
cached <- readTVar ncqCachedIndexes
other' <- readTVar ncqTrackedFiles <&> HS.toList
other' <- readTVar ncqTrackedFiles <&> HPSQ.keys
let other = [ x | x <- other', not (HPSQ.member x cached) ]
pure (cached, other)
@ -699,14 +720,15 @@ ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do
ncqLoadIndexes :: MonadIO m => NCQStorage -> m ()
ncqLoadIndexes ncq@NCQStorage{..} = do
debug "WIP: ncqStorageLoadIndexes"
w <- readTVarIO ncqTrackedFiles <&> List.take (ncqMaxCachedIdx `div` 2) . HS.toList
w <- readTVarIO ncqTrackedFiles
<&> List.take (ncqMaxCachedIdx `div` 2) . HPSQ.keys
ncqLoadSomeIndexes ncq w
ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m ()
ncqFixIndexes ncq@NCQStorage{..} = do
debug "ncqFixIndexes"
keys <- readTVarIO ncqTrackedFiles
keys <- readTVarIO ncqTrackedFiles <&> HPSQ.keys
for_ keys $ \k -> do
let idxName = ncqGetIndexFileName ncq k
@ -715,8 +737,8 @@ ncqFixIndexes ncq@NCQStorage{..} = do
unless here do
warn $ "missed-index" <+> pretty k
let dataName = ncqGetDataFileName ncq k
newKey <- ncqIndexFile ncq dataName <&> fromString @FileKey
atomically $ ncqAddTrackedFilesSTM ncq [newKey]
newKey <- ncqIndexFile ncq dataName
ncqAddTrackedFilesIO ncq [newKey]
ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage
@ -814,9 +836,8 @@ ncqStorageInit_ check path = do
ncqCurrentReadReq <- newTVarIO mempty
ncqCurrentUsage <- newTVarIO mempty
ncqStopped <- newTVarIO False
ncqCachedIndexes <- newTVarIO HPSQ.empty
ncqCachedData <- newTVarIO HPSQ.empty
ncqTrackedFiles <- newTVarIO mempty
ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqCachedEntries <- newTVarIO 0
let currentName = ncqGetCurrentName_ path ncqGen

View File

@ -288,7 +288,7 @@ main = do
writer <- ContT $ withAsync $ ncqStorageRun ncq
link writer
trf <- readTVarIO ncqTrackedFiles <&> HS.toList
trf <- readTVarIO ncqTrackedFiles <&> HPSQ.keys
for_ trf $ \tf -> do
notice $ "tracked" <+> pretty tf