From 03ec08509aabd2439d6fa8b42d5a8cf8d90b6718 Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Mon, 12 May 2025 08:01:23 +0300 Subject: [PATCH] wip, remove deleted records log --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 124 ++--------------------- hbs2-tests/test/TestCQ.hs | 22 ---- 2 files changed, 8 insertions(+), 138 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 589657e1..6c1ffe48 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -22,7 +22,6 @@ import Data.HashMap.Strict (HashMap) import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Control.Concurrent.STM qualified as STM -import Control.Concurrent.STM.TSem import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap @@ -95,8 +94,6 @@ data NCQStorage = , ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) - , ncqDeleted :: TVar (HashMap HashRef Int16) - , ncqDeleteQ :: TBQueue (HashRef, Int16) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqTrackedFiles :: TVar (HashSet FileKey) , ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash)) @@ -257,9 +254,7 @@ ncqStorageStop ncq@NCQStorage{..} = do ncqStorageSync ncq atomically $ writeTVar ncqStopped True atomically do - doneW <- readTVar ncqWriteQueue <&> HPSQ.null - doneD <- isEmptyTBQueue ncqDeleteQ - let done = doneW && doneD + done <- readTVar ncqWriteQueue <&> HPSQ.null unless done STM.retry debug "ncqStorageStop DONE" @@ -279,9 +274,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do reader <- makeReader indexer <- makeIndexer indexQ writer <- makeWriter indexQ - delWriter <- makeDelWriter - mapM_ waitCatch [writer,indexer,refsWriter,delWriter] + mapM_ waitCatch [writer,indexer,refsWriter] mapM_ cancel [reader] where @@ -392,52 +386,6 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link indexer pure indexer - makeDelWriter = do - - let fsyncAt = 150 - - delWriter <- ContT $ withAsync do - - myFlushQ <- newTQueueIO - atomically $ modifyTVar ncqFlushNow (myFlushQ:) - - mt <- atomically $ isEmptyTBQueue ncqDeleteQ - debug $ "delWriter running" <+> pretty mt - - fix \next -> do - - void $ race (pause @'Seconds 2) $ atomically do - stop <- readTVar ncqStopped - flush <- isEmptyTQueue myFlushQ <&> not - size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt) - unless (flush || size || stop) STM.retry - - toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ - - liftIO do - w <- readTVarIO ncqDeletedW - -- debug "write shit" - for_ toWrite $ \(hx,delta) -> do - let sdelta = N.bytestring16 (fromIntegral delta) - let k = coerce @_ @ByteString hx - let size = BS.length k + BS.length sdelta - let deleted = mconcat [ N.bytestring32 (fromIntegral size) - , k - , sdelta - ] - void $ Posix.fdWrite w deleted - fileSynchronise w - - stop <- readTVarIO ncqStopped - size <- atomically $ lengthTBQueue ncqDeleteQ - - if stop && size <= 0 then none else next - - debug "delWriter stopped" - - link delWriter - pure delWriter - writeJournal indexQ syncData = liftIO do trace $ "writeJournal" <+> pretty syncData @@ -552,17 +500,9 @@ ncqStoragePut ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do when stoped $ exit Nothing - let h = hashObject @HbSync lbs & coerce + when (LBS.null lbs) $ exit Nothing - ncqLocate ncq h >>= \case - Nothing -> none - Just{} -> do - d <- readTVarIO ncqDeleted <&> fromMaybe 0 . HM.lookup h - if d < 1 then - exit (Just h) - else do - let delta = negate d - 1 - atomically $ writeTBQueue ncqDeleteQ (h, delta) + let h = hashObject @HbSync lbs & coerce now <- getTimeCoarse atomically do @@ -677,7 +617,7 @@ ncqStorageScanDataFile ncq fp' action = do ncqStorageIsDeleted :: MonadIO m => NCQStorage -> HashRef -> m Bool ncqStorageIsDeleted NCQStorage{..} what = do - readTVarIO ncqDeleted <&> (>0) . fromMaybe 0 . HM.lookup what + pure False ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do @@ -739,12 +679,8 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do True -> exit () _ -> none - atomically do - what <- readTVar ncqDeleted <&> fromMaybe 0 . HM.lookup h - when (what < 1) do - let delta = negate what + 1 - writeTBQueue ncqDeleteQ (h, delta) - modifyTVar ncqDeleted (HM.insertWith (+) h delta) + error "ncqStorageDel not implemented" + ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do @@ -783,25 +719,12 @@ ncqFixIndexes ncq@NCQStorage{..} = do atomically $ ncqAddTrackedFilesSTM ncq [newKey] --- NOTE: --- merely find in fossil files, --- because "current" is filtered on load --- and it's okay, we may compact only --- fossils -ncqScanDeleted :: MonadUnliftIO m => NCQStorage -> m [Location] -ncqScanDeleted ncq@NCQStorage{..} = do - readTVarIO ncqDeleted - <&> fmap fst . List.filter ((>0).snd) . HM.toList - >>= mapM (ncqLocate ncq) - <&> catMaybes - ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage ncqStorageOpen fp = do ncq@NCQStorage{..} <- ncqStorageInit_ False fp ncqReadTrackedFiles ncq ncqFixIndexes ncq ncqLoadIndexes ncq - readDeleted ncq readCurrent ncq readRefs ncq atomically $ putTMVar ncqOpenDone True @@ -818,32 +741,6 @@ ncqStorageOpen fp = do S.yield (k,v) atomically $ writeTVar ncqRefsMem (HM.fromList kvs) - - readDeleted ncq@NCQStorage{..} = do - let fn = ncqGetDeletedFileName ncq - -- liftIO $ print $ pretty "FILE" <+> pretty fn - bs0 <- liftIO $ mmapFileByteString fn Nothing - - items <- HM.fromListWith (+) <$> S.toList_ do - flip runContT pure $ callCC \exit -> do - flip fix bs0 $ \next bs -> do - when (BS.length bs < 4) $ exit () - let w = BS.take 4 bs & N.word32 & fromIntegral - let p = BS.take w (BS.drop 4 bs) - - when (BS.length p < w ) do - err $ "broken file" <+> pretty fn - exit () - - let k = BS.take 32 p & coerce . BS.copy - let v = BS.take 2 (BS.drop 32 p) & N.word16 & fromIntegral @_ @Int16 - lift $ S.yield (k,v) - - next (BS.drop (w+4) bs) - - debug $ "NCQStorage.deleted" <+> pretty (HM.size items) - atomically $ writeTVar ncqDeleted items - readCurrent ncq@NCQStorage{..} = do let fn = ncqGetCurrentName ncq -- liftIO $ print $ pretty "FILE" <+> pretty fn @@ -851,8 +748,6 @@ ncqStorageOpen fp = do now <- getTimeCoarse - deleted <- readTVarIO ncqDeleted - items <- S.toList_ <$> flip runContT pure $ callCC \exit ->do flip fix (0,bs0) $ \next (o,bs) -> do @@ -867,8 +762,7 @@ ncqStorageOpen fp = do let k = BS.take 32 p & coerce . BS.copy let vs = w - 32 - unless (fromMaybe 0 (HM.lookup k deleted) > 0) do - lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs)) + lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs)) next (o+w+4, BS.drop (w+4) bs) @@ -910,8 +804,6 @@ ncqStorageInit_ check path = do let ncqMaxCachedData = ncqMaxCachedIdx `div` 2 ncqWriteQueue <- newTVarIO HPSQ.empty - ncqDeleted <- newTVarIO mempty - ncqDeleteQ <- newTBQueueIO 3000 ncqNotWritten <- newTVarIO 0 ncqLastWritten <- getTimeCoarse >>= newTVarIO diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 6a291a52..e66dee65 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -509,28 +509,6 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq:raw:scan-deleted" $ nil_ \case - [StringLike fn] -> liftIO $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - ContT $ bracket none $ const do - none - - what <- lift $ ncqScanDeleted ncq - - for_ what $ \l -> do - liftIO $ print l - - liftIO $ ncqStorageStop ncq - - wait writer - - e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq:raw:del-some" $ nil_ \case [StringLike fn] -> liftIO $ flip runContT pure do