From 0869b5797179e7d3de6d561203e1acf01693372e Mon Sep 17 00:00:00 2001 From: voidlizard Date: Fri, 22 Aug 2025 18:59:00 +0300 Subject: [PATCH] wip, fix writing tails --- .../lib/HBS2/Storage/NCQ3/Internal.hs | 1 + .../lib/HBS2/Storage/NCQ3/Internal/Run.hs | 31 ++++++++++++++----- .../lib/HBS2/Storage/NCQ3/Internal/Types.hs | 1 + hbs2-tests/test/NCQ3.hs | 4 ++- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index fc56e3a7..a3ba4321 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -74,6 +74,7 @@ ncqStorageOpen fp upd = do ncqRunSem <- atomically $ newTSem 1 ncqFileLock <- newTVarIO Nothing ncqCurrentFossils <- newTVarIO mempty + ncqReplQueue <- newTVarIO mempty let ncq = NCQStorage{..} & upd diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 0ce90b2e..2dd6d14f 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -175,11 +175,11 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do liftIO (ncqTryLoadState ncq) - closeQ <- liftIO newTQueueIO + indexQ <- liftIO newTQueueIO - closer <- spawnActivity $ liftIO $ fix \loop -> do + indexer <- spawnActivity $ liftIO $ fix \loop -> do what <- atomically do - tryReadTQueue closeQ >>= \case + tryReadTQueue indexQ >>= \case Just e -> pure $ Just e Nothing -> do stop <- readTVar ncqStopReq @@ -187,6 +187,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do maybe1 what none $ \(fk :: FileKey) -> do ncqIndexFile ncq Nothing (DataFile fk) + dropReplaces fk atomically $ modifyTVar ncqCurrentFossils (HS.delete fk) loop @@ -245,7 +246,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do RunFin mfh -> do liftIO $ for_ mfh closeFd debug "exit storage" - atomically $ pollSTM closer >>= maybe STM.retry (const none) + atomically $ pollSTM indexer >>= maybe STM.retry (const none) RunNew -> do alive <- readTVarIO ncqAlive @@ -272,6 +273,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do ss <- appendTailSection fh liftIO (fileSynchronisePortable fh) + flushReplaces fk -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize @@ -287,8 +289,9 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do if | needClose && continue -> do liftIO $ closeFd fh + flushReplaces fk debug $ "closeFd" <+> viaShow fh - atomically $ writeTQueue closeQ fk + atomically $ writeTQueue indexQ fk loop RunNew | not continue -> loop (RunFin (Just fh)) @@ -332,7 +335,8 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do Just (NCQEntry w, EntryHere bs) -> do off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0) n <- lift (appendSection fh bs) - atomically (writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))) + let op = writeTVar w (EntryThere (FileLocation fk off (fromIntegral n))) + atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op]) pure n _ -> pure 0 @@ -340,12 +344,24 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do let written = sum ws loop $ RunSync (fk, fh, w + written, total' + written, True) - mapM_ wait [closer] + mapM_ wait [indexer] where setAlive = atomically $ writeTVar ncqAlive True unsetAlive = atomically $ writeTVar ncqAlive False + dropReplaces :: forall m . MonadIO m => FileKey -> m () + dropReplaces fk = atomically do + modifyTVar ncqReplQueue (HM.delete fk) + + flushReplaces :: forall m . MonadIO m => FileKey -> m () + flushReplaces fk = do + atomically do + ncqDelCachedDataSTM ncq fk + ops <- readTVar ncqReplQueue <&> fromMaybe mempty . HM.lookup fk + modifyTVar ncqReplQueue (HM.delete fk) + sequence_ ops + openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do fk <- ncqGetNewFileKey ncq DataFile @@ -415,5 +431,6 @@ data RunSt = | RunWrite (FileKey, Fd, Int, Int) | RunSync (FileKey, Fd, Int, Int, Bool) | RunFin (Maybe Fd) + deriving stock Show diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index 98105e89..f259d85f 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -122,6 +122,7 @@ data NCQStorage = , ncqSyncNo :: TVar Int , ncqServiceSem :: TSem , ncqRunSem :: TSem + , ncqReplQueue :: TVar (HashMap FileKey [STM ()]) , ncqFileLock :: TVar (Maybe FileLock) } deriving stock (Generic) diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 56bd3044..7bcab6dc 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -498,7 +498,9 @@ ncq3Tests = do liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc) - ncqIndexCompactFull sto + -- ncqIndexCompactFull sto + -- ncqStorageStop + pause @'Seconds 11 ncqWithStorage dir $ \sto -> do -- notice "check deleted"