diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 22e1cd17..fc56e3a7 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -189,7 +189,7 @@ ncqPutBS0 wait ncq@NCQStorage{..} mtp mhref bs' = ncqOperation ncq (pure $ fromM pure True Just (NCQEntry e) -> readTVar e >>= \case - EntryHere bs'' | bs == bs''-> pure False + EntryHere bs'' | bs == bs''-> pure False | otherwise -> writeTVar e (EntryHere bs) >> pure True EntryThere{} -> writeTVar e (EntryHere bs) >> pure True @@ -259,6 +259,7 @@ ncqDelEntry me href = do -- всегда пишем tomb и надеемся на лучшее -- merge/compact разберутся -- однако не пишем, если записи еще нет + -- void $ ncqPutBS me (Just T) (Just href) "" ncqLocate me href >>= \case Just loc | not (ncqIsTomb loc) -> do void $ ncqPutBS me (Just T) (Just href) "" diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index cecae900..0ce90b2e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -242,7 +242,8 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do pure $ a || b flip fix RunNew $ \loop -> \case - RunFin -> do + RunFin mfh -> do + liftIO $ for_ mfh closeFd debug "exit storage" atomically $ pollSTM closer >>= maybe STM.retry (const none) @@ -250,7 +251,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do alive <- readTVarIO ncqAlive empty <- readTVarIO ncqWriteQ <&> Seq.null if not alive && empty - then loop RunFin + then loop (RunFin Nothing) else do (fk, fhx) <- openNewDataFile loop $ RunWrite (fk, fhx, 0, 0) @@ -286,10 +287,11 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do if | needClose && continue -> do liftIO $ closeFd fh + debug $ "closeFd" <+> viaShow fh atomically $ writeTQueue closeQ fk loop RunNew - | not continue -> loop RunFin + | not continue -> loop (RunFin (Just fh)) | otherwise -> loop $ RunWrite (fk, fh, rest, total) @@ -301,7 +303,13 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStopReq sy <- readTVar ncqSyncReq - chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) + + chunk <- if not stop then + stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) + else do + r <- readTVar ncqWriteQ + modifyTVar ncqWriteQ mempty + pure r if | Seq.null chunk && stop -> pure $ Left () | Seq.null chunk && not (stop || sy) -> STM.retry @@ -406,6 +414,6 @@ data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) | RunSync (FileKey, Fd, Int, Int, Bool) - | RunFin + | RunFin (Maybe Fd) diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index e0386167..56bd3044 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -145,6 +145,8 @@ ncq3Tests = do writeTQueue hq h modifyTVar w1 succ + ncqStorageStop sto + ncqWithStorage testEnvDir $ \sto -> do notice $ "reopen/lookup" <+> pretty num hh <- atomically $ STM.flushTQueue hq @@ -794,7 +796,7 @@ ncq3Tests = do delBlock sto ha atomically $ modifyTVar blkz (HM.insert ha Nothing) found <- hasBlock sto ha - assertBool (show $ "not deleted" <+> pretty ha) (isNothing found) + assertBool (show $ "not deleted" <+> pretty ha <+> pretty found) (isNothing found) pr <- uniformRM (0, 1.0) g