This commit is contained in:
voidlizard 2025-08-22 17:17:36 +03:00
parent 783481af76
commit 996be0c16c
3 changed files with 18 additions and 7 deletions

View File

@ -259,6 +259,7 @@ ncqDelEntry me href = do
-- всегда пишем tomb и надеемся на лучшее -- всегда пишем tomb и надеемся на лучшее
-- merge/compact разберутся -- merge/compact разберутся
-- однако не пишем, если записи еще нет -- однако не пишем, если записи еще нет
-- void $ ncqPutBS me (Just T) (Just href) ""
ncqLocate me href >>= \case ncqLocate me href >>= \case
Just loc | not (ncqIsTomb loc) -> do Just loc | not (ncqIsTomb loc) -> do
void $ ncqPutBS me (Just T) (Just href) "" void $ ncqPutBS me (Just T) (Just href) ""

View File

@ -242,7 +242,8 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
pure $ a || b pure $ a || b
flip fix RunNew $ \loop -> \case flip fix RunNew $ \loop -> \case
RunFin -> do RunFin mfh -> do
liftIO $ for_ mfh closeFd
debug "exit storage" debug "exit storage"
atomically $ pollSTM closer >>= maybe STM.retry (const none) atomically $ pollSTM closer >>= maybe STM.retry (const none)
@ -250,7 +251,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
alive <- readTVarIO ncqAlive alive <- readTVarIO ncqAlive
empty <- readTVarIO ncqWriteQ <&> Seq.null empty <- readTVarIO ncqWriteQ <&> Seq.null
if not alive && empty if not alive && empty
then loop RunFin then loop (RunFin Nothing)
else do else do
(fk, fhx) <- openNewDataFile (fk, fhx) <- openNewDataFile
loop $ RunWrite (fk, fhx, 0, 0) loop $ RunWrite (fk, fhx, 0, 0)
@ -286,10 +287,11 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
if | needClose && continue -> do if | needClose && continue -> do
liftIO $ closeFd fh liftIO $ closeFd fh
debug $ "closeFd" <+> viaShow fh
atomically $ writeTQueue closeQ fk atomically $ writeTQueue closeQ fk
loop RunNew loop RunNew
| not continue -> loop RunFin | not continue -> loop (RunFin (Just fh))
| otherwise -> loop $ RunWrite (fk, fh, rest, total) | otherwise -> loop $ RunWrite (fk, fh, rest, total)
@ -301,7 +303,13 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
chunk <- liftIO $ timeout timeoutMicro $ atomically do chunk <- liftIO $ timeout timeoutMicro $ atomically do
stop <- readTVar ncqStopReq stop <- readTVar ncqStopReq
sy <- readTVar ncqSyncReq sy <- readTVar ncqSyncReq
chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
chunk <- if not stop then
stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
else do
r <- readTVar ncqWriteQ
modifyTVar ncqWriteQ mempty
pure r
if | Seq.null chunk && stop -> pure $ Left () if | Seq.null chunk && stop -> pure $ Left ()
| Seq.null chunk && not (stop || sy) -> STM.retry | Seq.null chunk && not (stop || sy) -> STM.retry
@ -406,6 +414,6 @@ data RunSt =
RunNew RunNew
| RunWrite (FileKey, Fd, Int, Int) | RunWrite (FileKey, Fd, Int, Int)
| RunSync (FileKey, Fd, Int, Int, Bool) | RunSync (FileKey, Fd, Int, Int, Bool)
| RunFin | RunFin (Maybe Fd)

View File

@ -145,6 +145,8 @@ ncq3Tests = do
writeTQueue hq h writeTQueue hq h
modifyTVar w1 succ modifyTVar w1 succ
ncqStorageStop sto
ncqWithStorage testEnvDir $ \sto -> do ncqWithStorage testEnvDir $ \sto -> do
notice $ "reopen/lookup" <+> pretty num notice $ "reopen/lookup" <+> pretty num
hh <- atomically $ STM.flushTQueue hq hh <- atomically $ STM.flushTQueue hq
@ -794,7 +796,7 @@ ncq3Tests = do
delBlock sto ha delBlock sto ha
atomically $ modifyTVar blkz (HM.insert ha Nothing) atomically $ modifyTVar blkz (HM.insert ha Nothing)
found <- hasBlock sto ha found <- hasBlock sto ha
assertBool (show $ "not deleted" <+> pretty ha) (isNothing found) assertBool (show $ "not deleted" <+> pretty ha <+> pretty found) (isNothing found)
pr <- uniformRM (0, 1.0) g pr <- uniformRM (0, 1.0) g