This commit is contained in:
voidlizard 2025-08-14 12:23:02 +03:00
parent 3d7e2794d7
commit 56d71cb988
4 changed files with 20 additions and 10 deletions

View File

@ -28,11 +28,12 @@ ncqStorageOpen :: MonadIO m => FilePath -> (NCQStorage -> NCQStorage) -> m NCQSt
ncqStorageOpen fp upd = do ncqStorageOpen fp upd = do
let ncqRoot = fp let ncqRoot = fp
let ncqGen = 0 let ncqGen = 0
let ncqFsync = 16 * megabytes -- let ncqFsync = 16 * megabytes
let ncqFsync = 32 * megabytes
let ncqWriteQLen = 1024 * 4 let ncqWriteQLen = 1024 * 4
let ncqMinLog = 2 * gigabytes let ncqMinLog = 512 * megabytes
let ncqMaxLog = 32 * gigabytes let ncqMaxLog = 32 * gigabytes
let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqWriteBlock = max 256 $ ncqWriteQLen `div` 2
let ncqMaxCachedIndex = 64 let ncqMaxCachedIndex = 64
let ncqMaxCachedData = 64 let ncqMaxCachedData = 64
let ncqIdleThrsh = 50.0 let ncqIdleThrsh = 50.0
@ -117,14 +118,16 @@ ncqPutBlock0 :: MonadUnliftIO m
-> m (Maybe HashRef) -> m (Maybe HashRef)
ncqPutBlock0 sto lbs wait = do ncqPutBlock0 sto lbs wait = do
ncqLocate sto ohash >>= \case ncqLocate sto ohash >>= \case
Nothing -> Just <$> work sto (Just B) (Just ohash) bs Nothing -> Just <$> work
_ -> pure (Just ohash) Just l | ncqIsTomb l -> Just <$> work
_ -> pure (Just ohash)
where where
bs = LBS.toStrict lbs bs = LBS.toStrict lbs
ohash = HashRef $ hashObject @HbSync bs ohash = HashRef $ hashObject @HbSync bs
work | wait = ncqPutBS work | wait = ncqPutBS sto (Just B) (Just ohash) bs
| otherwise = ncqTossBS | otherwise = ncqTossBS sto (Just B) (Just ohash) bs
{-# INLINE work #-}
{-# INLINE ncqPutBlock0 #-} {-# INLINE ncqPutBlock0 #-}

View File

@ -147,6 +147,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
-- atomically $ ncqDeferredWriteOpSTM ncq do
ncqStateUpdate ncq do ncqStateUpdate ncq do
ncqStateAddFact (P (PData (DataFile fk) ss)) ncqStateAddFact (P (PData (DataFile fk) ss))
@ -168,7 +169,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
RunWrite (fk, fh, w, total') -> do RunWrite (fk, fh, w, total') -> do
let timeoutMicro = 10_000_000 let timeoutMicro = 30_000_000
chunk <- liftIO $ timeout timeoutMicro $ atomically do chunk <- liftIO $ timeout timeoutMicro $ atomically do
stop <- readTVar ncqStopReq stop <- readTVar ncqStopReq

View File

@ -1,4 +1,3 @@
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE UndecidableInstances #-}
module HBS2.Storage.NCQ3.Internal.Types where module HBS2.Storage.NCQ3.Internal.Types where
@ -11,6 +10,7 @@ import Data.HashSet qualified as HS
import Text.Printf import Text.Printf
import Control.Concurrent.STM.TSem (TSem,waitTSem,signalTSem) import Control.Concurrent.STM.TSem (TSem,waitTSem,signalTSem)
import System.FileLock (FileLock) import System.FileLock (FileLock)
import Data.Vector qualified as V
data CachedData = CachedData !ByteString data CachedData = CachedData !ByteString
data CachedIndex = CachedIndex !ByteString !NWayHash data CachedIndex = CachedIndex !ByteString !NWayHash
@ -200,3 +200,8 @@ ncqIsTombEntrySize :: Integral a => a -> Bool
ncqIsTombEntrySize s = fromIntegral s <= ncqTombEntrySize ncqIsTombEntrySize s = fromIntegral s <= ncqTombEntrySize
{-# INLINE ncqIsTombEntrySize #-} {-# INLINE ncqIsTombEntrySize #-}
ncqDeferredWriteOpSTM :: NCQStorage -> IO () -> STM ()
ncqDeferredWriteOpSTM NCQStorage{..} work = do
nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps)
writeTQueue (ncqWriteOps ! nw) work

View File

@ -606,6 +606,7 @@ ncq3Tests = do
chu <- S.toList_ (readChunkedBS lbs (256*1024)) chu <- S.toList_ (readChunkedBS lbs (256*1024))
hashes <- forConcurrently chu $ \chunk -> do hashes <- forConcurrently chu $ \chunk -> do
-- ncqPutBS ncq (Just B) Nothing (LBS.toStrict chunk)
ncqTossBlock ncq chunk >>= orThrowUser "can't save" ncqTossBlock ncq chunk >>= orThrowUser "can't save"
none none