This commit is contained in:
voidlizard 2025-03-20 09:15:54 +03:00 committed by Dmitry Zuykov
parent b8b2ed4d14
commit 326989a9fa
3 changed files with 29 additions and 32 deletions

View File

@ -14,6 +14,7 @@ import Data.Aeson(FromJSON(..),ToJSON(..),Value(..))
import Data.Binary (Binary(..)) import Data.Binary (Binary(..))
import Data.ByteArray qualified as BA import Data.ByteArray qualified as BA
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8 import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
-- import Data.ByteString.Short qualified as SB -- import Data.ByteString.Short qualified as SB
@ -73,9 +74,10 @@ instance IsString (Hash HbSync) where
doDecode = fromBase58 (BS8.pack s) doDecode = fromBase58 (BS8.pack s)
instance FromStringMaybe (Hash HbSync) where instance FromStringMaybe (Hash HbSync) where
fromStringMay s= HbSyncHash <$> doDecode fromStringMay "" = Nothing
fromStringMay s = HbSyncHash <$> unbase58
where where
doDecode = fromBase58 (BS8.pack s) unbase58 = fromBase58 (BS8.pack s)
instance Pretty (Hash HbSync) where instance Pretty (Hash HbSync) where
pretty (HbSyncHash s) = pretty @String [qc|{toBase58 s}|] pretty (HbSyncHash s) = pretty @String [qc|{toBase58 s}|]

View File

@ -261,6 +261,7 @@ ncqStorageStop ncq@NCQStorage{..} = do
doneD <- isEmptyTBQueue ncqDeleteQ doneD <- isEmptyTBQueue ncqDeleteQ
let done = doneW && doneD let done = doneW && doneD
unless done STM.retry unless done STM.retry
debug "ncqStorageStop DONE"
ncqStorageRun :: MonadUnliftIO m => NCQStorage -> m () ncqStorageRun :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
@ -311,8 +312,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
makeWriter indexQ = do makeWriter indexQ = do
let dumpTimeout = round 10e6 let dumpTimeout = TimeoutSec 10
let dumpData = 1024 ^ 2 let dumpData = 1024 ^ 10
let syncData = fromIntegral ncqSyncSize let syncData = fromIntegral ncqSyncSize
writer <- ContT $ withAsync do writer <- ContT $ withAsync do
@ -322,28 +323,18 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
fix \next -> do fix \next -> do
flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do liftIO $ race (pause dumpTimeout) $ atomically do
flush <- isEmptyTQueue myFlushQ <&> not flush <- isEmptyTQueue myFlushQ <&> not
stop <- readTVar ncqStopped stop <- readTVar ncqStopped
if flush || stop then pure True else STM.retry bytes <- readTVar ncqLastWritten
if bytes > dumpData || flush || stop then none else STM.retry
void $ atomically (readTQueue myFlushQ >> STM.flushTQueue myFlushQ) void $ atomically (STM.flushTQueue myFlushQ)
let flushNow = fromRight False flush liftIO $ writeJournal indexQ syncData
now <- getTimeCoarse
lastW <- readTVarIO ncqLastWritten
bytes <- readTVarIO ncqNotWritten
let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0
stopped <- readTVarIO ncqStopped
when (dumpByTime || bytes >= dumpData || flushNow || stopped) do
-- debug "NCQStorage: dump data!"
liftIO $ writeJournal indexQ syncData
done <- atomically $ readTVar ncqWriteQueue <&> HPSQ.null done <- atomically $ readTVar ncqWriteQueue <&> HPSQ.null
stopped <- readTVarIO ncqStopped
if done && stopped then none else next if done && stopped then none else next
@ -410,23 +401,22 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
myFlushQ <- newTQueueIO myFlushQ <- newTQueueIO
atomically $ modifyTVar ncqFlushNow (myFlushQ:) atomically $ modifyTVar ncqFlushNow (myFlushQ:)
debug "delWriter running" mt <- atomically $ isEmptyTBQueue ncqDeleteQ
debug $ "delWriter running" <+> pretty mt
fix \next -> do fix \next -> do
debug "BEBEBEBE" void $ race (pause @'Seconds 2) $ atomically do
void $ race (pause @'Seconds 1) $ atomically do
stop <- readTVar ncqStopped stop <- readTVar ncqStopped
-- flush <- isEmptyTQueue myFlushQ <&> not flush <- isEmptyTQueue myFlushQ <&> not
size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt) size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt)
-- unless (flush || size || stop) STM.retry unless (flush || size || stop) STM.retry
unless (size || stop) STM.retry
toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ
liftIO do liftIO do
w <- readTVarIO ncqDeletedW w <- readTVarIO ncqDeletedW
-- debug "write shit"
for_ toWrite $ \(hx,delta) -> do for_ toWrite $ \(hx,delta) -> do
let sdelta = N.bytestring16 (fromIntegral delta) let sdelta = N.bytestring16 (fromIntegral delta)
let k = coerce @_ @ByteString hx let k = coerce @_ @ByteString hx
@ -439,9 +429,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
debug $ "DELETED" <+> pretty hx debug $ "DELETED" <+> pretty hx
fileSynchronise w fileSynchronise w
atomically (isEmptyTBQueue ncqDeleteQ) >>= \case stop <- readTVarIO ncqStopped
True -> none size <- atomically $ lengthTBQueue ncqDeleteQ
False -> next
if stop && size <= 0 then none else next
debug "delWriter stopped"
link delWriter link delWriter
pure delWriter pure delWriter

View File

@ -495,12 +495,14 @@ main = do
for_ hashes $ \h -> runMaybeT do for_ hashes $ \h -> runMaybeT do
already <- liftIO (ncqStorageHasBlock ncq h <&> isJust) already <- liftIO (ncqStorageHasBlock ncq h <&> isJust)
guard (not already) guard (not already)
-- debug $ "write" <+> pretty h
blk <- getBlock sto (coerce h) >>= toMPlus blk <- getBlock sto (coerce h) >>= toMPlus
liftIO do liftIO do
let l = LBS.length blk let l = LBS.length blk
-- print $ pretty h <+> pretty l -- print $ pretty h <+> pretty l
ncqStoragePut ncq blk ncqStoragePut ncq blk
warn "about to stop storage!"
liftIO $ ncqStorageStop ncq liftIO $ ncqStorageStop ncq
wait writer wait writer