diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 2dd6d14f..de4a723a 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -169,6 +169,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do readTVarIO ncqFileLock >>= mapM_ FL.unlockFile ContT $ bracket none $ const $ liftIO do + void $ ncqStateDump ncq debug "storage done" ncqRemoveGarbage ncq @@ -242,107 +243,120 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do b <- ncqIndexCompactStep ncq pure $ a || b - flip fix RunNew $ \loop -> \case - RunFin mfh -> do - liftIO $ for_ mfh closeFd - debug "exit storage" - atomically $ pollSTM indexer >>= maybe STM.retry (const none) + flip fix RunNew $ \loop s -> do + -- debug $ viaShow s + case s of + RunFin mfh -> do + liftIO $ for_ mfh closeFd + rest <- readTVarIO ncqWriteQ <&> Seq.length + debug $ "exit storage" <+> pretty rest + atomically $ pollSTM indexer >>= maybe STM.retry (const none) - RunNew -> do - alive <- readTVarIO ncqAlive - empty <- readTVarIO ncqWriteQ <&> Seq.null - if not alive && empty - then loop (RunFin Nothing) - else do - (fk, fhx) <- openNewDataFile - loop $ RunWrite (fk, fhx, 0, 0) - - - RunSync (fk, fh, w, total, continue) -> do - - (stop,sync) <- atomically do - (,) <$> readTVar ncqStopReq - <*> readTVar ncqSyncReq - -- <*> readTVar ncqWriteEMA - - let needClose = total >= ncqMinLog || stop - - rest <- if not (sync || needClose || w > ncqFsync) then - pure w - else do - - ss <- appendTailSection fh - liftIO (fileSynchronisePortable fh) - flushReplaces fk - - -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize - - -- atomically $ ncqDeferredWriteOpSTM ncq do - ncqStateUpdate ncq do - ncqStateAddFact (P (PData (DataFile fk) ss)) - - atomically do - writeTVar ncqSyncReq False - modifyTVar ncqSyncNo succ - - pure 0 - - if | needClose && continue -> do - liftIO $ closeFd fh - flushReplaces fk - debug $ "closeFd" <+> viaShow fh - atomically $ writeTQueue indexQ fk - loop RunNew - - | not continue -> loop (RunFin (Just fh)) - - | otherwise -> loop $ RunWrite (fk, fh, rest, total) - - - RunWrite (fk, fh, w, total') -> do - - let timeoutMicro = 10_000_000 - - chunk <- liftIO $ timeout timeoutMicro $ atomically do - stop <- readTVar ncqStopReq - sy <- readTVar ncqSyncReq - - chunk <- if not stop then - stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) - else do - r <- readTVar ncqWriteQ - modifyTVar ncqWriteQ mempty - pure r - - if | Seq.null chunk && stop -> pure $ Left () - | Seq.null chunk && not (stop || sy) -> STM.retry - | otherwise -> pure $ Right chunk - - case chunk of - Nothing -> do - liftIO $ join $ readTVarIO ncqOnRunWriteIdle - if w == 0 then do - loop $ RunWrite (fk,fh,w,total') + RunNew -> do + alive <- readTVarIO ncqAlive + empty <- readTVarIO ncqWriteQ <&> Seq.null + if not alive && empty + then loop (RunFin Nothing) else do - atomically $ writeTVar ncqSyncReq True - loop $ RunSync (fk, fh, w, total', True) -- exit () + (fk, fhx) <- openNewDataFile + loop $ RunWrite (fk, fhx, 0, 0) - Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () - Just (Right chu) -> do - ws <- for chu $ \h -> do - atomically (ncqLookupEntrySTM ncq h) >>= \case - Just (NCQEntry w, EntryHere bs) -> do - off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0) - n <- lift (appendSection fh bs) - let op = writeTVar w (EntryThere (FileLocation fk off (fromIntegral n))) - atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op]) - pure n + RunSync (fk, fh, w, total, continue) -> do - _ -> pure 0 + (stop,sync) <- atomically do + (,) <$> readTVar ncqStopReq + <*> readTVar ncqSyncReq + -- <*> readTVar ncqWriteEMA - let written = sum ws - loop $ RunSync (fk, fh, w + written, total' + written, True) + let needClose = total >= ncqMinLog || stop + + rest <- if not (sync || needClose || w > ncqFsync) then + pure w + else do + + ss <- appendTailSection fh + liftIO (fileSynchronisePortable fh) + flushReplaces fk + + ncqStateUpdate ncq do + ncqStateAddFact (P (PData (DataFile fk) ss)) + + -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize + + -- atomically $ ncqDeferredWriteOpSTM ncq do + + atomically do + writeTVar ncqSyncReq False + modifyTVar ncqSyncNo succ + + pure 0 + + if | needClose && continue -> do + liftIO $ closeFd fh + flushReplaces fk + debug $ "closeFd" <+> viaShow fh + atomically $ writeTQueue indexQ fk + loop RunNew + + | not continue -> loop (RunFin (Just fh)) + + | otherwise -> loop $ RunWrite (fk, fh, rest, total) + + + RunWrite (fk, fh, w, total') -> do + + let timeoutMicro = 10_000_000 + + chunk <- liftIO $ timeout timeoutMicro $ atomically do + stop <- readTVar ncqStopReq + sy <- readTVar ncqSyncReq + + chunk <- if not stop then + stateTVar ncqWriteQ (Seq.splitAt 1) + else do + r <- readTVar ncqWriteQ + modifyTVar ncqWriteQ mempty + pure r + + if | Seq.null chunk && stop -> pure $ Left () + | Seq.null chunk && not (stop || sy) -> STM.retry + | otherwise -> pure $ Right chunk + + stop <- readTVarIO ncqStopReq + + case chunk of + Nothing -> do + liftIO $ join $ readTVarIO ncqOnRunWriteIdle + stop <- readTVarIO ncqStopReq + if w == 0 && not stop then do + loop $ RunWrite (fk,fh,w,total') + else do + atomically $ writeTVar ncqSyncReq True + loop $ RunSync (fk, fh, w, total', not stop) -- exit () + + Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () + + Just (Right chu) -> do + ws <- for chu $ \h -> do + atomically (ncqLookupEntrySTM ncq h) >>= \case + Just (NCQEntry w, EntryHere bs) -> do + off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0) + n <- lift (appendSection fh bs) + + let op = do + readTVar w >>= \case + EntryHere bs1 | bs1 == bs -> do + writeTVar w (EntryThere (FileLocation fk off (fromIntegral n))) + _ -> none + + atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op]) + pure n + + _ -> pure 0 + + let written = sum ws + loop $ RunSync (fk, fh, w + written, total' + written, not stop) mapM_ wait [indexer] @@ -350,11 +364,11 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do setAlive = atomically $ writeTVar ncqAlive True unsetAlive = atomically $ writeTVar ncqAlive False - dropReplaces :: forall m . MonadIO m => FileKey -> m () + dropReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 () dropReplaces fk = atomically do modifyTVar ncqReplQueue (HM.delete fk) - flushReplaces :: forall m . MonadIO m => FileKey -> m () + flushReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 () flushReplaces fk = do atomically do ncqDelCachedDataSTM ncq fk @@ -368,7 +382,9 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do atomically $ modifyTVar ncqCurrentFossils (HS.insert fk) - ncqStateUpdate ncq (ncqStateAddDataFile fk) + ncqStateUpdate ncq do + ncqStateAddFact (P (PData (DataFile fk) 0)) + ncqStateAddDataFile fk let fname = ncqGetFileName ncq (DataFile fk) -- touch fname @@ -425,7 +441,6 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do when (k2 == k1) $ waitState k2 - data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs index 940f21e7..0d68652d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -27,6 +27,17 @@ newtype StateOP a = {- HLINT ignore "Eta reduce"-} +ncqStateDump :: MonadIO m + => NCQStorage + -> m FileKey +ncqStateDump ncq@NCQStorage{..} = do + state <- readTVarIO ncqState + key <- ncqGetNewFileKey ncq StateFile + let snkFile = ncqGetFileName ncq (StateFile key) + liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do + IO.hPrint fh (pretty state) + pure key + ncqStateUpdateLoop :: MonadIO m => NCQStorage -> m () @@ -43,10 +54,7 @@ ncqStateUpdateLoop ncq@NCQStorage{..} = do stop <- readTVar ncqStopReq if s1 == s0 && not stop then STM.retry else pure s1 - key <- ncqGetNewFileKey ncq StateFile - let snkFile = ncqGetFileName ncq (StateFile key) - liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do - IO.hPrint fh (pretty state) + key <- ncqStateDump ncq done <- atomically do writeTVar ncqStateKey key diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 7bcab6dc..c1aef21c 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -141,12 +141,11 @@ ncq3Tests = do h <- ncqPutBS sto (Just B) Nothing bs found <- ncqLocate sto h <&> isJust liftIO $ assertBool (show $ "found" <+> pretty h) found + debug $ "written" <+> pretty h <+> pretty (BS.length bs) atomically do writeTQueue hq h modifyTVar w1 succ - ncqStorageStop sto - ncqWithStorage testEnvDir $ \sto -> do notice $ "reopen/lookup" <+> pretty num hh <- atomically $ STM.flushTQueue hq @@ -494,13 +493,13 @@ ncq3Tests = do >>= orThrowUser ("missed" <+> pretty h) unless (ncqEntrySize loc == ncqTombEntrySize) do - notice $ pretty h <+> pretty (ncqEntrySize loc) <+> pretty ncqTombEntrySize + err $ red (pretty h) <+> pretty (ncqEntrySize loc) <+> pretty ncqTombEntrySize - liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc) + -- liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc) -- ncqIndexCompactFull sto -- ncqStorageStop - pause @'Seconds 11 + -- pause @'Seconds 11 ncqWithStorage dir $ \sto -> do -- notice "check deleted" @@ -509,9 +508,10 @@ ncq3Tests = do for_ hashes $ \h -> do ncqLocate sto h >>= \case - Nothing -> notice $ "not-found" <+> pretty h + Nothing -> err $ red "not-found" <+> pretty h Just loc -> do - liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc) + unless (ncqIsTomb loc) do + err $ red ("tomb/1" <+> pretty h) entry $ bindMatch "test:ncq3:del2" $ nil_ $ \syn -> do