This commit is contained in:
voidlizard 2025-03-14 08:35:59 +03:00 committed by Dmitry Zuykov
parent 0159c15e58
commit 622b9ef531
1 changed files with 43 additions and 43 deletions

View File

@ -104,7 +104,7 @@ data NCQStorage =
, ncqCurrentHandleR :: TVar Fd , ncqCurrentHandleR :: TVar Fd
, ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentUsage :: TVar (IntMap Int)
, ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString))
, ncqFlushNow :: TVar Int , ncqFlushNow :: TVar [TQueue ()]
, ncqOpenDone :: TMVar Bool , ncqOpenDone :: TMVar Bool
, ncqStopped :: TVar Bool , ncqStopped :: TVar Bool
} }
@ -272,26 +272,30 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
-- cap <- (10*) <$> getNumCapabilities -- cap <- (10*) <$> getNumCapabilities
cap <- getNumCapabilities cap <- getNumCapabilities
refsWriter <- ContT $ withAsync $ untilStopped do refsWriter <- ContT $ withAsync do
myFlushQ <- newTQueueIO
atomically $ modifyTVar ncqFlushNow (myFlushQ:)
untilStopped do
-- FIXME: timeout-hardcode -- FIXME: timeout-hardcode
void $ race (pause @'Seconds 2) $ atomically do void $ race (pause @'Seconds 1) $ atomically do
readTVar ncqStopped `orElse` STM.retry void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ
dirty <- readTVarIO ncqRefsDirty dirty <- readTVarIO ncqRefsDirty
when (dirty > 0) do when (dirty > 0) do
refs <- readTVarIO ncqRefsMem <&> HM.toList refs <- readTVarIO ncqRefsMem <&> HM.toList
withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do
for_ refs $ \(k,v) -> do for_ refs $ \(k,v) -> do
let ks = coerce @_ @ByteString k let ks = coerce @_ @ByteString k
let vs = coerce @_ @ByteString v let vs = coerce @_ @ByteString v
let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay
liftIO do liftIO do
BS.hPutStr fh (N.bytestring32 (fromIntegral w)) BS.hPutStr fh (N.bytestring32 (fromIntegral w))
BS.hPutStr fh ks BS.hPutStr fh ks
BS.hPutStr fh vs BS.hPutStr fh vs
atomically $ writeTVar ncqRefsDirty 0 atomically $ writeTVar ncqRefsDirty 0
link refsWriter link refsWriter
@ -314,14 +318,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
indexer <- ContT $ withAsync $ untilStopped do indexer <- ContT $ withAsync $ untilStopped do
what <- atomically do what' <- race (pause @'Seconds 1) $ atomically do
e <- tryReadTQueue indexQ peekTQueue indexQ >> STM.flushTQueue indexQ
stop <- readTVar ncqStopped
case e of let what = fromRight mempty what'
Just x -> pure (Just x)
Nothing | stop -> pure Nothing
| otherwise -> STM.retry
for_ what $ \(fd,fn) -> do for_ what $ \(fd,fn) -> do
@ -335,30 +335,30 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
link indexer link indexer
writer <- ContT $ withAsync $ untilStopped do writer <- ContT $ withAsync do
flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do myFlushQ <- newTQueueIO
flush <- readTVar ncqFlushNow atomically $ modifyTVar ncqFlushNow (myFlushQ:)
stop <- readTVar ncqStopped
if flush > 0 || stop then do untilStopped do
writeTVar ncqFlushNow 0
flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do
void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ
pure True pure True
else do
STM.retry
let flushNow = fromRight False flush let flushNow = fromRight False flush
now <- getTimeCoarse now <- getTimeCoarse
lastW <- readTVarIO ncqLastWritten lastW <- readTVarIO ncqLastWritten
bytes <- readTVarIO ncqNotWritten bytes <- readTVarIO ncqNotWritten
let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0
stopped <- readTVarIO ncqStopped stopped <- readTVarIO ncqStopped
when (dumpByTime || bytes >= dumpData || flushNow || stopped) do when (dumpByTime || bytes >= dumpData || flushNow || stopped) do
-- debug "NCQStorage: dump data!" -- debug "NCQStorage: dump data!"
liftIO $ writeJournal indexQ syncData liftIO $ writeJournal indexQ syncData
mapM_ waitCatch [writer,indexer,refsWriter] mapM_ waitCatch [writer,indexer,refsWriter]
mapM_ cancel [reader] mapM_ cancel [reader]
@ -648,7 +648,7 @@ ncqStorageDel sto h = do
ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageSync NCQStorage{..} = do ncqStorageSync NCQStorage{..} = do
atomically $ modifyTVar ncqFlushNow succ atomically $ readTVar ncqFlushNow >>= mapM_ (`writeTQueue` ())
ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m ()
ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do
@ -775,7 +775,7 @@ ncqStorageInit_ check path = do
ncqLastWritten <- getTimeCoarse >>= newTVarIO ncqLastWritten <- getTimeCoarse >>= newTVarIO
ncqWaitIndex <- newTVarIO HPSQ.empty ncqWaitIndex <- newTVarIO HPSQ.empty
ncqFlushNow <- newTVarIO 0 ncqFlushNow <- newTVarIO mempty
ncqOpenDone <- newEmptyTMVarIO ncqOpenDone <- newEmptyTMVarIO
ncqCurrentReadReq <- newTVarIO mempty ncqCurrentReadReq <- newTVarIO mempty
ncqCurrentUsage <- newTVarIO mempty ncqCurrentUsage <- newTVarIO mempty