wip, compiles

This commit is contained in:
Dmitry Zuykov 2025-05-12 10:56:08 +03:00
parent 24a46e1c02
commit db41293fa2
2 changed files with 124 additions and 77 deletions

View File

@ -21,7 +21,7 @@ import Network.ByteOrder qualified as N
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.Ord (Down(..)) import Data.Ord (Down(..),comparing)
import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM qualified as STM
import Data.HashPSQ qualified as HPSQ import Data.HashPSQ qualified as HPSQ
import Data.HashPSQ (HashPSQ) import Data.HashPSQ (HashPSQ)
@ -53,6 +53,7 @@ import System.Posix.Types as Posix
import System.Posix.IO.ByteString as Posix import System.Posix.IO.ByteString as Posix
import System.Posix.Unistd import System.Posix.Unistd
import System.Posix.Files (getFileStatus, modificationTimeHiRes) import System.Posix.Files (getFileStatus, modificationTimeHiRes)
import System.IO.Error (catchIOError)
import System.IO.MMap as MMap import System.IO.MMap as MMap
import System.IO.Temp (emptyTempFile) import System.IO.Temp (emptyTempFile)
-- import Foreign.Ptr -- import Foreign.Ptr
@ -92,10 +93,10 @@ mkFilePrio :: TimeSpec -> FilePrio
mkFilePrio = FilePrio . Down mkFilePrio = FilePrio . Down
data CachedEntry = data CachedEntry =
CachedEntry { cachedTs :: TimeSpec CachedEntry { cachedMmapedIdx :: ByteString
, cachedMmapedIdx :: ByteString
, cachedMmapedData :: ByteString , cachedMmapedData :: ByteString
, cachedNway :: NWayHash , cachedNway :: NWayHash
, cachedTs :: TVar TimeSpec
} }
data NCQStorage = data NCQStorage =
@ -105,8 +106,7 @@ data NCQStorage =
, ncqSyncSize :: Int , ncqSyncSize :: Int
, ncqMinLog :: Int , ncqMinLog :: Int
, ncqMaxLog :: Int , ncqMaxLog :: Int
, ncqMaxCachedIdx :: Int , ncqMaxCached :: Int
, ncqMaxCachedData :: Int
, ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsMem :: TVar (HashMap HashRef HashRef)
, ncqRefsDirty :: TVar Int , ncqRefsDirty :: TVar Int
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString)
@ -210,13 +210,17 @@ ncqAddCachedSTM now limit tv k v = do
writeTVar tv (HPSQ.insert k now v dst) writeTVar tv (HPSQ.insert k now v dst)
ncqAddTrackedFilesIO :: MonadIO m => NCQStorage -> [FilePath] -> m () ncqAddTrackedFilesIO :: MonadIO m => NCQStorage -> [FilePath] -> m ()
ncqAddTrackedFilesIO ncq fps = do ncqAddTrackedFilesIO ncq fps = do
tsFiles <- forM fps \fp -> do tsFiles <- catMaybes <$> forM fps \fp -> liftIO $ do
stat <- liftIO $ getFileStatus fp catchIOError
let ts = modificationTimeHiRes stat (do
pure (FileKey (fromString fp), TimeSpec (floor ts) 0) stat <- getFileStatus fp
let ts = modificationTimeHiRes stat
pure $ Just (FileKey (fromString fp), TimeSpec (floor ts) 0))
(\e -> do
err $ "ncqAddTrackedFilesIO: failed to stat " <+> pretty fp <+> pretty (displayException e)
pure Nothing)
atomically $ ncqAddTrackedFilesSTM ncq tsFiles atomically $ ncqAddTrackedFilesSTM ncq tsFiles
@ -540,9 +544,40 @@ ncqLocatedSize = \case
InCurrent (_,s) -> fromIntegral s InCurrent (_,s) -> fromIntegral s
InFossil _ (_,s) -> fromIntegral s InFossil _ (_,s) -> fromIntegral s
evictIfNeededSTM :: NCQStorage -> Maybe Int -> STM ()
evictIfNeededSTM NCQStorage{..} howMany = do
cur <- readTVar ncqCachedEntries
let need = fromMaybe (cur `div` 2) howMany
excess = max 0 (cur + need - ncqMaxCached)
when (excess > 0) do
files <- readTVar ncqTrackedFiles <&> HPSQ.toList
-- собрать [(ts, k, prio)] с чтением TVar
oldest <- forM files \case
(k, prio, Just ce) -> do
ts <- readTVar (cachedTs ce)
pure (Just (ts, k, prio))
_ -> pure Nothing
let victims =
oldest
& catMaybes
& List.sortOn (\(ts,_,_) -> ts)
& List.take excess
for_ victims $ \(_,k,prio) -> do
modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing)
modifyTVar ncqCachedEntries (subtract 1)
ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location)
ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
-- сначала проверяем очередь и current
l1 <- atomically do l1 <- atomically do
inQ <- readTVar ncqWriteQueue <&> (fmap snd . HPSQ.lookup h) <&> fmap InWriteQueue inQ <- readTVar ncqWriteQueue <&> (fmap snd . HPSQ.lookup h) <&> fmap InWriteQueue
inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent
@ -551,57 +586,52 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
for_ l1 $ exit . Just for_ l1 $ exit . Just
now <- getTimeCoarse now <- getTimeCoarse
tracked <- readTVarIO ncqTrackedFiles
(cachedIdx, rest) <- atomically do for_ (HPSQ.toList tracked) $ \(fk, prio, mCached) -> do
cached <- readTVar ncqCachedIndexes case mCached of
other' <- readTVar ncqTrackedFiles <&> HPSQ.keys
let other = [ x | x <- other', not (HPSQ.member x cached) ]
pure (cached, other)
Just (CachedEntry{..}) -> do
lookupEntry h (cachedMmapedIdx, cachedNway) <&> fmap (InFossil fk) >>= \case
Just loc -> do
atomically $ writeTVar cachedTs now
for_ (HPSQ.toList cachedIdx) $ \(fk,_,nway) -> do exit (Just loc)
lookupEntry h nway <&> fmap (InFossil fk) >>= \case
Nothing -> pure Nothing -- none
other -> do
atomically $ modifyTVar ncqCachedIndexes (HPSQ.insert fk now nway)
exit other
-- TODO: use-filter-for-faster-scan Nothing -> pure ()
-- 1. Какой фильтр?
-- 2. Как и когда его перестраивать?
-- 2.1 На открытии? Будет расти время открытия (но можно параллельно)
--
for_ rest $ \r -> runMaybeT do Nothing -> void $ runMaybeT do
let fn = ncqGetIndexFileName ncq r let indexFile = ncqGetIndexFileName ncq fk
let dataFile = ncqGetDataFileName ncq fk
nway' <- liftIO (nwayHashMMapReadOnly fn) (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) >>= toMPlus
datBs <- liftIO $ mmapFileByteString dataFile Nothing
when (isNothing nway') do e <- lookupEntry h (idxBs, idxNway) <&> fmap (InFossil fk) >>= toMPlus
err ("NCQStorage: can't mmap file" <+> pretty fn)
nway <- toMPlus nway' liftIO $ atomically do
files <- readTVar ncqTrackedFiles
case HPSQ.lookup fk files of
Just (p, _) -> do
ce <- CachedEntry idxBs datBs idxNway <$> newTVar now
modifyTVar ncqTrackedFiles (HPSQ.insert fk p (Just ce))
modifyTVar ncqCachedEntries (+1)
evictIfNeededSTM ncq (Just 1)
Nothing -> pure ()
e <- lookupEntry h nway <&> fmap (InFossil r) >>= toMPlus lift (exit (Just e))
liftIO (mmapFileByteString (ncqGetDataFileName ncq r) Nothing) >>= \mmaped ->
atomically do
ncqAddCachedSTM now ncqMaxCachedIdx ncqCachedIndexes r nway
ncqAddCachedSTM now ncqMaxCachedData ncqCachedData r mmaped
lift (exit (Just e))
pure Nothing pure Nothing
where where
lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do
entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= toMPlus
pure
( fromIntegral $ N.word64 (BS.take 8 entryBs)
, fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) )
entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx))
>>= toMPlus
pure $ ( fromIntegral $ N.word64 (BS.take 8 entryBs),
fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)))
ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer)
ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do
@ -640,44 +670,44 @@ ncqStorageIsDeleted :: MonadIO m => NCQStorage -> HashRef -> m Bool
ncqStorageIsDeleted NCQStorage{..} what = do ncqStorageIsDeleted NCQStorage{..} what = do
pure False pure False
ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString)
ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
deleted <- ncqStorageIsDeleted ncq h deleted <- ncqStorageIsDeleted ncq h
when deleted $ exit Nothing when deleted $ exit Nothing
ncqLocate ncq h >>= \case ncqLocate ncq h >>= \case
Nothing -> pure Nothing Nothing -> pure Nothing
Just (InWriteQueue lbs) -> pure $ Just lbs
Just (InWriteQueue lbs) ->
pure $ Just lbs
Just (InCurrent (o,l)) -> do Just (InCurrent (o,l)) -> do
-- FIXME: timeout!
answ <- atomically do answ <- atomically do
a <- newEmptyTMVar a <- newEmptyTMVar
fd <- readTVar ncqCurrentHandleR fd <- readTVar ncqCurrentHandleR
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1)
modifyTVar ncqCurrentReadReq ( |> (fd, o, l, a) ) modifyTVar ncqCurrentReadReq (|> (fd, o, l, a))
pure a pure a
atomically $ takeTMVar answ <&> Just . LBS.fromStrict atomically $ takeTMVar answ <&> Just . LBS.fromStrict
Just (InFossil key (o,l)) -> do Just (InFossil key (o,l)) -> do
mCE <- atomically do
files <- readTVar ncqTrackedFiles
pure $ HPSQ.lookup key files >>= snd
mmaped <- readTVarIO ncqCachedData <&> HPSQ.lookup key >>= \case case mCE of
Just (_,mmaped) -> do Just CachedEntry{..} -> do
now <- getTimeCoarse now <- getTimeCoarse
atomically $ modifyTVar ncqCachedData (HPSQ.insert key now mmaped) atomically $ writeTVar cachedTs now
pure mmaped let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) cachedMmapedData)
pure $ Just $ LBS.fromStrict chunk
Nothing -> do Nothing -> do
now <- getTimeCoarse err $ "ncqStorageGet: missing CachedEntry for " <+> pretty key
let fn = ncqGetDataFileName ncq key pure Nothing
-- TODO: possible-exception!
newMmaped <- liftIO (mmapFileByteString fn Nothing)
atomically $ ncqAddCachedSTM now ncqMaxCachedData ncqCachedData key newMmaped
pure newMmaped
pure $ Just $ LBS.fromStrict $ BS.take (fromIntegral l) (BS.drop (fromIntegral o+4+32) mmaped)
ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef) ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef)
ncqStorageGetRef NCQStorage{..} ref = readTVarIO ncqRefsMem <&> HM.lookup ref ncqStorageGetRef NCQStorage{..} ref = readTVarIO ncqRefsMem <&> HM.lookup ref
@ -707,21 +737,44 @@ ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageSync NCQStorage{..} = do ncqStorageSync NCQStorage{..} = do
atomically $ readTVar ncqFlushNow >>= mapM_ (`writeTQueue` ()) atomically $ readTVar ncqFlushNow >>= mapM_ (`writeTQueue` ())
ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m ()
ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do
now <- getTimeCoarse now <- getTimeCoarse
for_ keys $ \key -> do
let fn = ncqGetIndexFileName ncq key ncqAddTrackedFilesIO ncq (fmap (BS8.unpack . coerce) keys)
liftIO (nwayHashMMapReadOnly fn) >>= \case
Nothing -> err $ "NCQStorage: can't mmap index file" <+> pretty fn loaded <- catMaybes <$> forM keys \key -> runMaybeT do
Just nway -> atomically do mEntry <- liftIO $ readTVarIO ncqTrackedFiles <&> HPSQ.lookup key
ncqAddCachedSTM now ncqMaxCachedIdx ncqCachedIndexes key nway guard (maybe True (\(_, m) -> isNothing m) mEntry)
let idxFile = ncqGetIndexFileName ncq key
let datFile = ncqGetDataFileName ncq key
(mmIdx, nway) <- MaybeT $ liftIO $ nwayHashMMapReadOnly idxFile
mmData <- liftIO $ mmapFileByteString datFile Nothing
tnow <- newTVarIO now
pure (key, CachedEntry mmIdx mmData nway tnow)
atomically do
evictIfNeededSTM ncq (Just (List.length loaded))
for_ loaded \(k, ce) -> do
files <- readTVar ncqTrackedFiles
case HPSQ.lookup k files of
Just (p, Nothing) -> do
modifyTVar ncqTrackedFiles (HPSQ.insert k p (Just ce))
modifyTVar ncqCachedEntries (+1)
_ -> pure ()
ncqLoadIndexes :: MonadIO m => NCQStorage -> m () ncqLoadIndexes :: MonadIO m => NCQStorage -> m ()
ncqLoadIndexes ncq@NCQStorage{..} = do ncqLoadIndexes ncq@NCQStorage{..} = do
debug "WIP: ncqStorageLoadIndexes" debug "WIP: ncqStorageLoadIndexes"
w <- readTVarIO ncqTrackedFiles w <- readTVarIO ncqTrackedFiles
<&> List.take (ncqMaxCachedIdx `div` 2) . HPSQ.keys <&> List.take (ncqMaxCached `div` 2) . HPSQ.keys
ncqLoadSomeIndexes ncq w ncqLoadSomeIndexes ncq w
ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m () ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m ()
@ -822,8 +875,7 @@ ncqStorageInit_ check path = do
let ncqMinLog = 2 * (1024 ^ 3) let ncqMinLog = 2 * (1024 ^ 3)
let ncqMaxLog = 10 * (1024 ^ 3) let ncqMaxLog = 10 * (1024 ^ 3)
let ncqMaxCachedIdx = 64 let ncqMaxCached = 64
let ncqMaxCachedData = ncqMaxCachedIdx `div` 2
ncqWriteQueue <- newTVarIO HPSQ.empty ncqWriteQueue <- newTVarIO HPSQ.empty

View File

@ -293,11 +293,6 @@ main = do
for_ trf $ \tf -> do for_ trf $ \tf -> do
notice $ "tracked" <+> pretty tf notice $ "tracked" <+> pretty tf
tri <- readTVarIO ncqCachedIndexes <&> HPSQ.toList
for_ tri $ \(k,_,_) -> do
notice $ "cached-index" <+> pretty k
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:raw" $ \case entry $ bindMatch "test:ncq:raw" $ \case