diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 94b7461d..b329750b 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -32,6 +32,7 @@ import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.Ord (Down(..),comparing) import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TSem import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap @@ -94,32 +95,47 @@ import System.FileLock as FL data NCQEntry = - NCQEntryNew ByteString - | NCQEntryJustWritten Handle Word64 + NCQEntryNew Int ByteString + | NCQEntryJustWritten Int Fd ByteString + | NCQEntrySynced Fd Word64 type Shard = TVar (HashMap HashRef (TVar NCQEntry)) data NCQStorage2 = NCQStorage2 { ncqFsync :: Int + , ncqWriteQLen :: Int + , ncqWriteBlock :: Int , ncqMemTable :: Vector Shard - , ncqWriteQ :: TBQueue HashRef + , ncqWriteSem :: TSem + , ncqWriteQ :: TVar (Seq HashRef) , ncqStorageStopReq :: TVar Bool + , ncqStorageSyncReq :: TVar Bool + , ncqSyncNo :: TVar Int } deriving (Generic) ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 ncqStorageOpen2 fp upd = do - let ncqFsync = 16 * 1024^2 + let ncqFsync = 16 * 1024^2 + let ncqWriteQLen = 1024 * 16 + let ncqWriteBlock = 4096 * 4 cap <- getNumCapabilities <&> fromIntegral - ncqWriteQ <- newTBQueueIO 32768 - ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty) + ncqWriteQ <- newTVarIO mempty + ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap) + ncqMemTable <- V.fromList <$> replicateM (max 2 (cap `div` 2)) (newTVarIO mempty) ncqStorageStopReq <- newTVarIO False + ncqStorageSyncReq <- newTVarIO False + ncqSyncNo <- newTVarIO 0 pure $ NCQStorage2{..} & upd ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 NCQStorage2{..} = do atomically $ writeTVar ncqStorageStopReq True +ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m () +ncqStorageSync2 NCQStorage2{..} = do + atomically $ writeTVar ncqStorageSyncReq True + ncqShardIdx :: NCQStorage2 -> HashRef -> Int ncqShardIdx NCQStorage2{..} h = fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable @@ -141,11 +157,18 @@ ncqPutBS :: MonadUnliftIO m => NCQStorage2 -> ByteString -> m HashRef ncqPutBS ncq@NCQStorage2{..} bs = do let h = HashRef (hashObject @HbSync bs) atomically do + waitTSem ncqWriteSem + stop <- readTVar ncqStorageStopReq + filled <- readTVar ncqWriteQ <&> Seq.length + + when (not stop && filled > ncqWriteQLen) STM.retry + + n <- readTVar ncqSyncNo ncqAlterEntrySTM ncq h $ \case Just e -> Just e - Nothing -> do - Just (NCQEntryNew bs) - writeTBQueue ncqWriteQ h + Nothing -> Just (NCQEntryNew n bs) + modifyTVar' ncqWriteQ (|> h) + signalTSem ncqWriteSem pure h @@ -168,8 +191,12 @@ ncqAlterEntrySTM ncq h alterFn = do tve <- newTVar e modifyTVar' shard (HM.insert h tve) -ncqStorageRun2 :: forall m . MonadUnliftIO m => NCQStorage2 -> m () -ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do +ncqStorageRun2 :: forall m . MonadUnliftIO m + => NCQStorage2 + -> m () +ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do + + jobQ <- newTQueueIO fname <- liftIO $ emptyTempFile "." "datafile-.data" @@ -179,39 +206,44 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do ContT $ bracket none $ const do liftIO $ closeFd fh0 - flip fix (fh0,0) \loop (fh,w) -> do + jobz <- ContT $ withAsync $ forever (atomically (readTQueue jobQ) >>= join) + link jobz - what <- atomically do - h <- tryReadTBQueue ncqWriteQ - stop <- readTVar ncqStorageStopReq + flip fix (fh0,0) $ \loop (fh,w) -> do - case (stop,h) of - (False, Nothing) -> STM.retry - (True, Nothing) -> pure $ Left () - (_, Just h) -> ncqLookupEntrySTM ncq h >>= \case - Nothing -> pure $ Right Nothing - Just (r,t) -> pure $ Right (Just (h,r,t)) + sync <- readTVarIO ncqStorageSyncReq - case what of - Left _ -> exit - Right Nothing -> loop (fh,w) - Right (Just (h,r,t)) -> do - n <- lift (appendEntry fh h r) + when (w > ncqFsync || sync) do + liftIO (fileSynchronise fh) + atomically do + writeTVar ncqStorageSyncReq False + modifyTVar' ncqSyncNo succ + loop (fh,0) - w' <- if (w + n) < ncqFsync then do - pure (w + n) - else do - liftIO $ fileSynchronise fh - pure 0 + chunk <- atomically do + stop <- readTVar ncqStorageStopReq + sy <- readTVar ncqStorageSyncReq + chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) - loop (fh, w') + if | Seq.null chunk && stop -> pure $ Left () + | Seq.null chunk && not (stop || sy) -> STM.retry + | otherwise -> pure $ Right chunk + + case chunk of + Left{} -> exit () + Right chu -> do + ws <- for chu $ \h -> do + atomically (ncqLookupEntrySTM ncq h) >>= \case + Nothing -> pure 0 + Just (r,t) -> lift (appendEntry fh h r) + + loop (fh, w + sum ws) where - exit = none appendEntry :: Fd -> HashRef -> NCQEntry -> m Int - appendEntry fh h (NCQEntryNew bs) = do + appendEntry fh h (NCQEntryNew _ bs) = do let ss = N.bytestring32 (32 + fromIntegral (BS.length bs)) let section = ss <> coerce h <> bs liftIO (Posix.fdWrite fh section) <&> fromIntegral @@ -219,3 +251,6 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do appendEntry fh h _ = do pure 0 + {-# INLINE appendEntry #-} + + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 1a8d162c..76a3843d 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -615,9 +615,9 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do notice "NO SHIT" - setLoggingOff @DEBUG + -- setLoggingOff @DEBUG - for_ [1 .. tn] $ \tnn -> do + for_ [1..tn] $ \tnn -> do ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } ) w <- ContT $ withAsync (ncqStorageRun2 ncq1) @@ -628,15 +628,15 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do co <- BS.readFile n ncqPutBS ncq1 co + ncqStorageStop2 ncq1 + performMajorGC + wait w + rm ncqDir + let tt = realToFrac @_ @(Fixed E2) t let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2) notice $ pretty tnn <+> pretty tt <+> pretty speed - rm ncqDir - - lift $ ncqStorageStop2 ncq1 - wait w - main :: IO () main = do