diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index dde9bce7..c477a538 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -48,8 +48,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do stop <- readTVar ncqStopReq if not stop then STM.retry else pure Nothing - maybe1 what none $ \(fk :: FileKey, fh) -> do - closeFd fh >> ncqIndexFile ncq (DataFile fk) >> loop + maybe1 what none $ \(fk :: FileKey) -> do + ncqIndexFile ncq (DataFile fk) >> loop let shLast = V.length ncqWriteOps - 1 spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do @@ -82,7 +82,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do spawnActivity measureWPS - -- spawnActivity (ncqStateUpdateLoop ncq) + spawnActivity (ncqStateUpdateLoop ncq) spawnActivity $ forever do pause @'Seconds 30 @@ -157,7 +157,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do pure 0 if | needClose && continue -> do - atomically $ writeTQueue closeQ (fk, fh) + liftIO $ closeFd fh + atomically $ writeTQueue closeQ fk loop RunNew | not continue -> loop RunFin diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs index db868d05..4bd9ea8a 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -19,30 +19,51 @@ import UnliftIO.IO.File import System.IO qualified as IO import Lens.Micro.Platform +import Control.Concurrent.STM qualified as STM + newtype StateOP a = StateOP { fromStateOp :: ReaderT NCQStorage STM a } deriving newtype (Functor,Applicative,Monad,MonadReader NCQStorage) {- HLINT ignore "Eta reduce"-} +ncqStateUpdateLoop :: MonadIO m + => NCQStorage + -> m () + +ncqStateUpdateLoop ncq@NCQStorage{..} = do + + debug $ red "ncqStateUpdateLoop" + + sInit <- readTVarIO ncqState + + flip fix sInit $ \next s0 -> do + state <- atomically do + s1 <- readTVar ncqState + stop <- readTVar ncqStopReq + if s1 == s0 && not stop then STM.retry else pure s1 + + key <- ncqGetNewFileKey ncq StateFile + let snkFile = ncqGetFileName ncq (StateFile key) + liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do + IO.hPrint fh (pretty state) + + done <- atomically do + writeTVar ncqStateKey key + modifyTVar ncqWrites succ + readTVar ncqStopReq + + unless done do + next =<< readTVarIO ncqState + ncqStateUpdate :: MonadIO m => NCQStorage -> StateOP a -> m () + ncqStateUpdate ncq@NCQStorage{..} action = do - s0 <- readTVarIO ncqState - - s1 <- atomically do - void $ runReaderT (fromStateOp action) ncq - modifyTVar ncqWrites succ - readTVar ncqState - - unless (s1 == s0) do - key <- ncqGetNewFileKey ncq StateFile - let snkFile = ncqGetFileName ncq (StateFile key) - liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do - IO.hPrint fh (pretty s1) - atomically $ writeTVar ncqStateKey key + atomically do + void $ runReaderT (fromStateOp action) ncq ncqStateAddDataFile :: FileKey -> StateOP () ncqStateAddDataFile fk = do