diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 1611ca21..e2de969f 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -234,29 +234,36 @@ data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) | RunSync (FileKey, Fd, Int, Int, Bool) + | RunFin ncqStorageRun2 :: forall m . MonadUnliftIO m => NCQStorage2 -> m () -ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do +ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do jobQ <- newTQueueIO closeQ <- newTQueueIO - closer <- ContT $ withAsync $ liftIO $ forever do - atomically (readTQueue closeQ) >>= \(fk, fh) -> do - closeFd fh - let fname = BS8.unpack (coerce fk) - -- notice $ yellow "indexing" <+> pretty fname - idx <- ncqIndexFile ncq fname - nwayHashMMapReadOnly idx >>= \case - Nothing -> err $ "can't open index" <+> pretty idx - Just (bs,nway) -> do - nwayHashScanAll nway bs $ \_ k _ -> do - unless (k == emptyKey) do - none - atomically do + closer <- ContT $ withAsync $ liftIO $ fix \loop -> do + what <- atomically do + stop <- readTVar ncqStorageStopReq + tryReadTQueue closeQ >>= \case + Just e -> pure $ Just e + Nothing | not stop -> STM.retry + | otherwise -> pure Nothing + + maybe1 what none $ \(fk, fh) -> do + closeFd fh + let fname = BS8.unpack (coerce fk) + -- notice $ yellow "indexing" <+> pretty fname + idx <- ncqIndexFile ncq fname + nwayHashMMapReadOnly idx >>= \case + Nothing -> err $ "can't open index" <+> pretty idx + Just (bs,nway) -> do + nwayHashScanAll nway bs $ \_ k _ -> do + unless (k == emptyKey) $ atomically do ncqAlterEntrySTM ncq (coerce k) (const Nothing) + loop link closer @@ -269,20 +276,26 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do flip fix RunNew $ \loop -> \case + RunFin -> do + debug "wait finalizing" + atomically $ pollSTM closer >>= maybe STM.retry (const none) + debug "exit storage" + RunNew -> do stop <- readTVarIO ncqStorageStopReq mt <- readTVarIO ncqWriteQ <&> Seq.null - when (stop && mt) do - exit () - - (fk,fhx) <- openNewDataFile - loop $ RunWrite (fk,fhx,0,0) + if stop && mt then do + loop RunFin + else do + (fk,fhx) <- openNewDataFile + loop $ RunWrite (fk,fhx,0,0) RunSync (fk, fh, w, total, continue) -> do + stop <- readTVarIO ncqStorageStopReq sync <- readTVarIO ncqStorageSyncReq - let needClose = total >= ncqMinLog + let needClose = total >= ncqMinLog || stop rest <- if not (sync || needClose || w > ncqFsync) then pure w @@ -299,7 +312,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do atomically $ writeTQueue closeQ (fk, fh) loop RunNew - | not continue -> exit () + | not continue -> loop RunFin | otherwise -> loop $ RunWrite (fk, fh, rest, total) diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index fec8cab2..6cccdb28 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -130,9 +130,11 @@ runTest action = do flip runContT pure do ContT $ bracket none $ const do unless keep (rm tmp) + flushLoggers lift $ lift $ action (TestEnv tmp) + testNCQFuckupRecovery1 :: MonadUnliftIO m => TestEnv -> m () @@ -680,6 +682,7 @@ testNCQ2ConcurrentWriteSimple1 tn n TestEnv{..} = flip runContT pure do ncqPutBS ncq1 (Just B) Nothing co liftIO $ ncqStorageStop2 ncq1 + wait w main :: IO () main = do