This commit is contained in:
voidlizard 2025-08-14 11:20:43 +03:00
parent 548828e47f
commit 3d7e2794d7
2 changed files with 39 additions and 17 deletions

View File

@ -48,8 +48,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
stop <- readTVar ncqStopReq stop <- readTVar ncqStopReq
if not stop then STM.retry else pure Nothing if not stop then STM.retry else pure Nothing
maybe1 what none $ \(fk :: FileKey, fh) -> do maybe1 what none $ \(fk :: FileKey) -> do
closeFd fh >> ncqIndexFile ncq (DataFile fk) >> loop ncqIndexFile ncq (DataFile fk) >> loop
let shLast = V.length ncqWriteOps - 1 let shLast = V.length ncqWriteOps - 1
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
@ -82,7 +82,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
spawnActivity measureWPS spawnActivity measureWPS
-- spawnActivity (ncqStateUpdateLoop ncq) spawnActivity (ncqStateUpdateLoop ncq)
spawnActivity $ forever do spawnActivity $ forever do
pause @'Seconds 30 pause @'Seconds 30
@ -157,7 +157,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
pure 0 pure 0
if | needClose && continue -> do if | needClose && continue -> do
atomically $ writeTQueue closeQ (fk, fh) liftIO $ closeFd fh
atomically $ writeTQueue closeQ fk
loop RunNew loop RunNew
| not continue -> loop RunFin | not continue -> loop RunFin

View File

@ -19,30 +19,51 @@ import UnliftIO.IO.File
import System.IO qualified as IO import System.IO qualified as IO
import Lens.Micro.Platform import Lens.Micro.Platform
import Control.Concurrent.STM qualified as STM
newtype StateOP a = newtype StateOP a =
StateOP { fromStateOp :: ReaderT NCQStorage STM a } StateOP { fromStateOp :: ReaderT NCQStorage STM a }
deriving newtype (Functor,Applicative,Monad,MonadReader NCQStorage) deriving newtype (Functor,Applicative,Monad,MonadReader NCQStorage)
{- HLINT ignore "Eta reduce"-} {- HLINT ignore "Eta reduce"-}
ncqStateUpdateLoop :: MonadIO m
=> NCQStorage
-> m ()
ncqStateUpdateLoop ncq@NCQStorage{..} = do
debug $ red "ncqStateUpdateLoop"
sInit <- readTVarIO ncqState
flip fix sInit $ \next s0 -> do
state <- atomically do
s1 <- readTVar ncqState
stop <- readTVar ncqStopReq
if s1 == s0 && not stop then STM.retry else pure s1
key <- ncqGetNewFileKey ncq StateFile
let snkFile = ncqGetFileName ncq (StateFile key)
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
IO.hPrint fh (pretty state)
done <- atomically do
writeTVar ncqStateKey key
modifyTVar ncqWrites succ
readTVar ncqStopReq
unless done do
next =<< readTVarIO ncqState
ncqStateUpdate :: MonadIO m ncqStateUpdate :: MonadIO m
=> NCQStorage => NCQStorage
-> StateOP a -> StateOP a
-> m () -> m ()
ncqStateUpdate ncq@NCQStorage{..} action = do ncqStateUpdate ncq@NCQStorage{..} action = do
s0 <- readTVarIO ncqState atomically do
s1 <- atomically do
void $ runReaderT (fromStateOp action) ncq void $ runReaderT (fromStateOp action) ncq
modifyTVar ncqWrites succ
readTVar ncqState
unless (s1 == s0) do
key <- ncqGetNewFileKey ncq StateFile
let snkFile = ncqGetFileName ncq (StateFile key)
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
IO.hPrint fh (pretty s1)
atomically $ writeTVar ncqStateKey key
ncqStateAddDataFile :: FileKey -> StateOP () ncqStateAddDataFile :: FileKey -> StateOP ()
ncqStateAddDataFile fk = do ncqStateAddDataFile fk = do