From b8b2ed4d148174c1d1dbda2dd6116aab2f34dbd3 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 18 Mar 2025 06:40:38 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 62 ++++++++++++++++-------- hbs2-tests/test/TestCQ.hs | 4 +- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index c4253189..83e04277 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -40,6 +40,7 @@ import Data.Either import Data.Maybe import Data.Text qualified as Text import Data.Text.IO qualified as Text +import Data.Int import Lens.Micro.Platform import Data.HashSet (HashSet) import Data.HashSet qualified as HS @@ -94,8 +95,8 @@ data NCQStorage = , ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) - , ncqDeleted :: TVar (HashSet HashRef) - , ncqDeleteQ :: TBQueue HashRef + , ncqDeleted :: TVar (HashMap HashRef Int16) + , ncqDeleteQ :: TBQueue (HashRef, Int16) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqTrackedFiles :: TVar (HashSet FileKey) , ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash)) @@ -252,6 +253,7 @@ ncqIndexFile n@NCQStorage{} fp' = do ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do + debug "ncqStorageStop" ncqStorageSync ncq atomically $ writeTVar ncqStopped True atomically do @@ -412,20 +414,28 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do fix \next -> do + debug "BEBEBEBE" + void $ race (pause @'Seconds 1) $ atomically do stop <- readTVar ncqStopped - flush <- isEmptyTQueue myFlushQ <&> not + -- flush <- isEmptyTQueue myFlushQ <&> not size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt) - unless (flush || size || stop) STM.retry + -- unless (flush || size || stop) STM.retry + unless (size || stop) STM.retry toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ liftIO do w <- readTVarIO ncqDeletedW - for_ toWrite $ \hx -> do + for_ toWrite $ \(hx,delta) -> do + let sdelta = N.bytestring16 (fromIntegral delta) let k = coerce @_ @ByteString hx - _ <- Posix.fdWrite w (N.bytestring32 (fromIntegral $ BS.length k)) - Posix.fdWrite w k + let size = BS.length k + BS.length sdelta + let deleted = mconcat [ N.bytestring32 (fromIntegral size) + , k + , sdelta + ] + void $ Posix.fdWrite w deleted debug $ "DELETED" <+> pretty hx fileSynchronise w @@ -553,8 +563,14 @@ ncqStoragePut ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do let h = hashObject @HbSync lbs & coerce ncqLocate ncq h >>= \case - Just{} -> exit (Just h) - _ -> none + Nothing -> none + Just{} -> do + d <- readTVarIO ncqDeleted <&> fromMaybe 0 . HM.lookup h + if d < 1 then + exit (Just h) + else do + let delta = negate d - 1 + atomically $ writeTBQueue ncqDeleteQ (h, delta) now <- getTimeCoarse atomically do @@ -636,7 +652,7 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do - readTVarIO ncqDeleted <&> not . HS.member h >>= guard + ncqStorageIsDeleted ncq h >>= guard . not toMPlus =<< (ncqLocate ncq h <&> fmap ncqLocatedSize) ncqStorageScanDataFile :: MonadIO m @@ -666,10 +682,15 @@ ncqStorageScanDataFile ncq fp' action = do next (4 + o + fromIntegral w, BS.drop (w+4) bs) + +ncqStorageIsDeleted :: MonadIO m => NCQStorage -> HashRef -> m Bool +ncqStorageIsDeleted NCQStorage{..} what = do + readTVarIO ncqDeleted <&> (>0) . fromMaybe 0 . HM.lookup what + ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do - deleted <- readTVarIO ncqDeleted <&> HS.member h + deleted <- ncqStorageIsDeleted ncq h when deleted $ exit Nothing @@ -727,10 +748,11 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do _ -> none atomically do - already <- readTVar ncqDeleted <&> HS.member h - unless already do - writeTBQueue ncqDeleteQ h - modifyTVar ncqDeleted (HS.insert h) + what <- readTVar ncqDeleted <&> fromMaybe 0 . HM.lookup h + when (what < 1) do + let delta = negate what + 1 + writeTBQueue ncqDeleteQ (h, delta) + modifyTVar ncqDeleted (HM.insertWith (+) h delta) ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do @@ -797,7 +819,7 @@ ncqStorageOpen fp = do -- liftIO $ print $ pretty "FILE" <+> pretty fn bs0 <- liftIO $ mmapFileByteString fn Nothing - items <- HS.fromList <$> S.toList_ do + items <- HM.fromListWith (+) <$> S.toList_ do flip runContT pure $ callCC \exit -> do flip fix bs0 $ \next bs -> do when (BS.length bs < 4) $ exit () @@ -809,10 +831,12 @@ ncqStorageOpen fp = do exit () let k = BS.take 32 p & coerce . BS.copy - lift $ S.yield (k :: HashRef) + let v = BS.take 2 (BS.drop 32 p) & N.word16 & fromIntegral @_ @Int16 + lift $ S.yield (k,v) + next (BS.drop (w+4) bs) - debug $ "NCQStorage.deleted" <+> pretty (HS.size items) + debug $ "NCQStorage.deleted" <+> pretty (HM.size items) atomically $ writeTVar ncqDeleted items readCurrent ncq@NCQStorage{..} = do @@ -838,7 +862,7 @@ ncqStorageOpen fp = do let k = BS.take 32 p & coerce . BS.copy let vs = w - 32 - unless (HS.member k deleted) do + unless (fromMaybe 0 (HM.lookup k deleted) > 0) do lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs)) next (o+w+4, BS.drop (w+4) bs) diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 8d621c69..9b63170c 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -493,6 +493,8 @@ main = do none for_ hashes $ \h -> runMaybeT do + already <- liftIO (ncqStorageHasBlock ncq h <&> isJust) + guard (not already) blk <- getBlock sto (coerce h) >>= toMPlus liftIO do let l = LBS.length blk @@ -528,8 +530,6 @@ main = do liftIO $ ncqStorageStop ncq - pause @'Seconds 5 - wait writer e -> throwIO $ BadFormException @C (mkList e)