From 326989a9fa39bc324e341863755485e3d328aae3 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 20 Mar 2025 09:15:54 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Hash.hs | 6 ++- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 53 ++++++++++-------------- hbs2-tests/test/TestCQ.hs | 2 + 3 files changed, 29 insertions(+), 32 deletions(-) diff --git a/hbs2-core/lib/HBS2/Hash.hs b/hbs2-core/lib/HBS2/Hash.hs index ca33e549..b3136969 100644 --- a/hbs2-core/lib/HBS2/Hash.hs +++ b/hbs2-core/lib/HBS2/Hash.hs @@ -14,6 +14,7 @@ import Data.Aeson(FromJSON(..),ToJSON(..),Value(..)) import Data.Binary (Binary(..)) import Data.ByteArray qualified as BA import Data.ByteString (ByteString) +import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Data.ByteString.Lazy qualified as LBS -- import Data.ByteString.Short qualified as SB @@ -73,9 +74,10 @@ instance IsString (Hash HbSync) where doDecode = fromBase58 (BS8.pack s) instance FromStringMaybe (Hash HbSync) where - fromStringMay s= HbSyncHash <$> doDecode + fromStringMay "" = Nothing + fromStringMay s = HbSyncHash <$> unbase58 where - doDecode = fromBase58 (BS8.pack s) + unbase58 = fromBase58 (BS8.pack s) instance Pretty (Hash HbSync) where pretty (HbSyncHash s) = pretty @String [qc|{toBase58 s}|] diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 83e04277..4518b219 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -261,6 +261,7 @@ ncqStorageStop ncq@NCQStorage{..} = do doneD <- isEmptyTBQueue ncqDeleteQ let done = doneW && doneD unless done STM.retry + debug "ncqStorageStop DONE" ncqStorageRun :: MonadUnliftIO m => NCQStorage -> m () ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do @@ -311,8 +312,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do makeWriter indexQ = do - let dumpTimeout = round 10e6 - let dumpData = 1024 ^ 2 + let dumpTimeout = TimeoutSec 10 + let dumpData = 1024 ^ 10 let syncData = fromIntegral ncqSyncSize writer <- ContT $ withAsync do @@ -322,28 +323,18 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do fix \next -> do - flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do - flush <- isEmptyTQueue myFlushQ <&> not - stop <- readTVar ncqStopped - if flush || stop then pure True else STM.retry + liftIO $ race (pause dumpTimeout) $ atomically do + flush <- isEmptyTQueue myFlushQ <&> not + stop <- readTVar ncqStopped + bytes <- readTVar ncqLastWritten + if bytes > dumpData || flush || stop then none else STM.retry - void $ atomically (readTQueue myFlushQ >> STM.flushTQueue myFlushQ) + void $ atomically (STM.flushTQueue myFlushQ) - let flushNow = fromRight False flush - - now <- getTimeCoarse - lastW <- readTVarIO ncqLastWritten - bytes <- readTVarIO ncqNotWritten - - let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 - - stopped <- readTVarIO ncqStopped - - when (dumpByTime || bytes >= dumpData || flushNow || stopped) do - -- debug "NCQStorage: dump data!" - liftIO $ writeJournal indexQ syncData + liftIO $ writeJournal indexQ syncData done <- atomically $ readTVar ncqWriteQueue <&> HPSQ.null + stopped <- readTVarIO ncqStopped if done && stopped then none else next @@ -410,23 +401,22 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do myFlushQ <- newTQueueIO atomically $ modifyTVar ncqFlushNow (myFlushQ:) - debug "delWriter running" + mt <- atomically $ isEmptyTBQueue ncqDeleteQ + debug $ "delWriter running" <+> pretty mt fix \next -> do - debug "BEBEBEBE" - - void $ race (pause @'Seconds 1) $ atomically do + void $ race (pause @'Seconds 2) $ atomically do stop <- readTVar ncqStopped - -- flush <- isEmptyTQueue myFlushQ <&> not + flush <- isEmptyTQueue myFlushQ <&> not size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt) - -- unless (flush || size || stop) STM.retry - unless (size || stop) STM.retry + unless (flush || size || stop) STM.retry toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ liftIO do w <- readTVarIO ncqDeletedW + -- debug "write shit" for_ toWrite $ \(hx,delta) -> do let sdelta = N.bytestring16 (fromIntegral delta) let k = coerce @_ @ByteString hx @@ -439,9 +429,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug $ "DELETED" <+> pretty hx fileSynchronise w - atomically (isEmptyTBQueue ncqDeleteQ) >>= \case - True -> none - False -> next + stop <- readTVarIO ncqStopped + size <- atomically $ lengthTBQueue ncqDeleteQ + + if stop && size <= 0 then none else next + + debug "delWriter stopped" link delWriter pure delWriter diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 9b63170c..875d09dd 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -495,12 +495,14 @@ main = do for_ hashes $ \h -> runMaybeT do already <- liftIO (ncqStorageHasBlock ncq h <&> isJust) guard (not already) + -- debug $ "write" <+> pretty h blk <- getBlock sto (coerce h) >>= toMPlus liftIO do let l = LBS.length blk -- print $ pretty h <+> pretty l ncqStoragePut ncq blk + warn "about to stop storage!" liftIO $ ncqStorageStop ncq wait writer