From 622b9ef5318fd24588f0361cad8157d90289f593 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Fri, 14 Mar 2025 08:35:59 +0300 Subject: [PATCH] okay --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 86 ++++++++++++------------ 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 8520ca61..df1e3074 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -104,7 +104,7 @@ data NCQStorage = , ncqCurrentHandleR :: TVar Fd , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) - , ncqFlushNow :: TVar Int + , ncqFlushNow :: TVar [TQueue ()] , ncqOpenDone :: TMVar Bool , ncqStopped :: TVar Bool } @@ -272,26 +272,30 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do -- cap <- (10*) <$> getNumCapabilities cap <- getNumCapabilities - refsWriter <- ContT $ withAsync $ untilStopped do + refsWriter <- ContT $ withAsync do + myFlushQ <- newTQueueIO + atomically $ modifyTVar ncqFlushNow (myFlushQ:) + + untilStopped do -- FIXME: timeout-hardcode - void $ race (pause @'Seconds 2) $ atomically do - readTVar ncqStopped `orElse` STM.retry + void $ race (pause @'Seconds 1) $ atomically do + void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ - dirty <- readTVarIO ncqRefsDirty + dirty <- readTVarIO ncqRefsDirty - when (dirty > 0) do - refs <- readTVarIO ncqRefsMem <&> HM.toList - withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do - for_ refs $ \(k,v) -> do - let ks = coerce @_ @ByteString k - let vs = coerce @_ @ByteString v - let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay - liftIO do - BS.hPutStr fh (N.bytestring32 (fromIntegral w)) - BS.hPutStr fh ks - BS.hPutStr fh vs - atomically $ writeTVar ncqRefsDirty 0 + when (dirty > 0) do + refs <- readTVarIO ncqRefsMem <&> HM.toList + withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do + for_ refs $ \(k,v) -> do + let ks = coerce @_ @ByteString k + let vs = coerce @_ @ByteString v + let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay + liftIO do + BS.hPutStr fh (N.bytestring32 (fromIntegral w)) + BS.hPutStr fh ks + BS.hPutStr fh vs + atomically $ writeTVar ncqRefsDirty 0 link refsWriter @@ -314,14 +318,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do indexer <- ContT $ withAsync $ untilStopped do - what <- atomically do - e <- tryReadTQueue indexQ - stop <- readTVar ncqStopped + what' <- race (pause @'Seconds 1) $ atomically do + peekTQueue indexQ >> STM.flushTQueue indexQ - case e of - Just x -> pure (Just x) - Nothing | stop -> pure Nothing - | otherwise -> STM.retry + let what = fromRight mempty what' for_ what $ \(fd,fn) -> do @@ -335,30 +335,30 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link indexer - writer <- ContT $ withAsync $ untilStopped do + writer <- ContT $ withAsync do - flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do - flush <- readTVar ncqFlushNow - stop <- readTVar ncqStopped - if flush > 0 || stop then do - writeTVar ncqFlushNow 0 + myFlushQ <- newTQueueIO + atomically $ modifyTVar ncqFlushNow (myFlushQ:) + + untilStopped do + + flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do + void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ pure True - else do - STM.retry - let flushNow = fromRight False flush + let flushNow = fromRight False flush - now <- getTimeCoarse - lastW <- readTVarIO ncqLastWritten - bytes <- readTVarIO ncqNotWritten + now <- getTimeCoarse + lastW <- readTVarIO ncqLastWritten + bytes <- readTVarIO ncqNotWritten - let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 + let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 - stopped <- readTVarIO ncqStopped + stopped <- readTVarIO ncqStopped - when (dumpByTime || bytes >= dumpData || flushNow || stopped) do - -- debug "NCQStorage: dump data!" - liftIO $ writeJournal indexQ syncData + when (dumpByTime || bytes >= dumpData || flushNow || stopped) do + -- debug "NCQStorage: dump data!" + liftIO $ writeJournal indexQ syncData mapM_ waitCatch [writer,indexer,refsWriter] mapM_ cancel [reader] @@ -648,7 +648,7 @@ ncqStorageDel sto h = do ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do - atomically $ modifyTVar ncqFlushNow succ + atomically $ readTVar ncqFlushNow >>= mapM_ (`writeTQueue` ()) ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do @@ -775,7 +775,7 @@ ncqStorageInit_ check path = do ncqLastWritten <- getTimeCoarse >>= newTVarIO ncqWaitIndex <- newTVarIO HPSQ.empty - ncqFlushNow <- newTVarIO 0 + ncqFlushNow <- newTVarIO mempty ncqOpenDone <- newEmptyTMVarIO ncqCurrentReadReq <- newTVarIO mempty ncqCurrentUsage <- newTVarIO mempty