diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 353c255b..534b4f22 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -28,11 +28,12 @@ ncqStorageOpen :: MonadIO m => FilePath -> (NCQStorage -> NCQStorage) -> m NCQSt ncqStorageOpen fp upd = do let ncqRoot = fp let ncqGen = 0 - let ncqFsync = 16 * megabytes + -- let ncqFsync = 16 * megabytes + let ncqFsync = 32 * megabytes let ncqWriteQLen = 1024 * 4 - let ncqMinLog = 2 * gigabytes - let ncqMaxLog = 32 * gigabytes - let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 + let ncqMinLog = 512 * megabytes + let ncqMaxLog = 32 * gigabytes + let ncqWriteBlock = max 256 $ ncqWriteQLen `div` 2 let ncqMaxCachedIndex = 64 let ncqMaxCachedData = 64 let ncqIdleThrsh = 50.0 @@ -117,14 +118,16 @@ ncqPutBlock0 :: MonadUnliftIO m -> m (Maybe HashRef) ncqPutBlock0 sto lbs wait = do ncqLocate sto ohash >>= \case - Nothing -> Just <$> work sto (Just B) (Just ohash) bs - _ -> pure (Just ohash) + Nothing -> Just <$> work + Just l | ncqIsTomb l -> Just <$> work + _ -> pure (Just ohash) where bs = LBS.toStrict lbs ohash = HashRef $ hashObject @HbSync bs - work | wait = ncqPutBS - | otherwise = ncqTossBS + work | wait = ncqPutBS sto (Just B) (Just ohash) bs + | otherwise = ncqTossBS sto (Just B) (Just ohash) bs + {-# INLINE work #-} {-# INLINE ncqPutBlock0 #-} diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index c477a538..155e6f88 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -147,6 +147,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize + -- atomically $ ncqDeferredWriteOpSTM ncq do ncqStateUpdate ncq do ncqStateAddFact (P (PData (DataFile fk) ss)) @@ -168,7 +169,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do RunWrite (fk, fh, w, total') -> do - let timeoutMicro = 10_000_000 + let timeoutMicro = 30_000_000 chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStopReq diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index ab200932..5f92d374 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -1,4 +1,3 @@ -{-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE UndecidableInstances #-} module HBS2.Storage.NCQ3.Internal.Types where @@ -11,6 +10,7 @@ import Data.HashSet qualified as HS import Text.Printf import Control.Concurrent.STM.TSem (TSem,waitTSem,signalTSem) import System.FileLock (FileLock) +import Data.Vector qualified as V data CachedData = CachedData !ByteString data CachedIndex = CachedIndex !ByteString !NWayHash @@ -200,3 +200,8 @@ ncqIsTombEntrySize :: Integral a => a -> Bool ncqIsTombEntrySize s = fromIntegral s <= ncqTombEntrySize {-# INLINE ncqIsTombEntrySize #-} +ncqDeferredWriteOpSTM :: NCQStorage -> IO () -> STM () +ncqDeferredWriteOpSTM NCQStorage{..} work = do + nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) + writeTQueue (ncqWriteOps ! nw) work + diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index d0341e32..a7c71cb6 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -606,6 +606,7 @@ ncq3Tests = do chu <- S.toList_ (readChunkedBS lbs (256*1024)) hashes <- forConcurrently chu $ \chunk -> do + -- ncqPutBS ncq (Just B) Nothing (LBS.toStrict chunk) ncqTossBlock ncq chunk >>= orThrowUser "can't save" none