This commit is contained in:
voidlizard 2025-06-22 14:30:26 +03:00
parent 617ce7d4db
commit f4cc7b1530
2 changed files with 76 additions and 41 deletions

View File

@ -32,6 +32,7 @@ import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.Ord (Down(..),comparing) import Data.Ord (Down(..),comparing)
import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TSem
import Data.HashPSQ qualified as HPSQ import Data.HashPSQ qualified as HPSQ
import Data.HashPSQ (HashPSQ) import Data.HashPSQ (HashPSQ)
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
@ -94,32 +95,47 @@ import System.FileLock as FL
data NCQEntry = data NCQEntry =
NCQEntryNew ByteString NCQEntryNew Int ByteString
| NCQEntryJustWritten Handle Word64 | NCQEntryJustWritten Int Fd ByteString
| NCQEntrySynced Fd Word64
type Shard = TVar (HashMap HashRef (TVar NCQEntry)) type Shard = TVar (HashMap HashRef (TVar NCQEntry))
data NCQStorage2 = data NCQStorage2 =
NCQStorage2 NCQStorage2
{ ncqFsync :: Int { ncqFsync :: Int
, ncqWriteQLen :: Int
, ncqWriteBlock :: Int
, ncqMemTable :: Vector Shard , ncqMemTable :: Vector Shard
, ncqWriteQ :: TBQueue HashRef , ncqWriteSem :: TSem
, ncqWriteQ :: TVar (Seq HashRef)
, ncqStorageStopReq :: TVar Bool , ncqStorageStopReq :: TVar Bool
, ncqStorageSyncReq :: TVar Bool
, ncqSyncNo :: TVar Int
} deriving (Generic) } deriving (Generic)
ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2
ncqStorageOpen2 fp upd = do ncqStorageOpen2 fp upd = do
let ncqFsync = 16 * 1024^2 let ncqFsync = 16 * 1024^2
let ncqWriteQLen = 1024 * 16
let ncqWriteBlock = 4096 * 4
cap <- getNumCapabilities <&> fromIntegral cap <- getNumCapabilities <&> fromIntegral
ncqWriteQ <- newTBQueueIO 32768 ncqWriteQ <- newTVarIO mempty
ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty) ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap)
ncqMemTable <- V.fromList <$> replicateM (max 2 (cap `div` 2)) (newTVarIO mempty)
ncqStorageStopReq <- newTVarIO False ncqStorageStopReq <- newTVarIO False
ncqStorageSyncReq <- newTVarIO False
ncqSyncNo <- newTVarIO 0
pure $ NCQStorage2{..} & upd pure $ NCQStorage2{..} & upd
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqStorageStop2 NCQStorage2{..} = do ncqStorageStop2 NCQStorage2{..} = do
atomically $ writeTVar ncqStorageStopReq True atomically $ writeTVar ncqStorageStopReq True
ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqStorageSync2 NCQStorage2{..} = do
atomically $ writeTVar ncqStorageSyncReq True
ncqShardIdx :: NCQStorage2 -> HashRef -> Int ncqShardIdx :: NCQStorage2 -> HashRef -> Int
ncqShardIdx NCQStorage2{..} h = ncqShardIdx NCQStorage2{..} h =
fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable
@ -141,11 +157,18 @@ ncqPutBS :: MonadUnliftIO m => NCQStorage2 -> ByteString -> m HashRef
ncqPutBS ncq@NCQStorage2{..} bs = do ncqPutBS ncq@NCQStorage2{..} bs = do
let h = HashRef (hashObject @HbSync bs) let h = HashRef (hashObject @HbSync bs)
atomically do atomically do
waitTSem ncqWriteSem
stop <- readTVar ncqStorageStopReq
filled <- readTVar ncqWriteQ <&> Seq.length
when (not stop && filled > ncqWriteQLen) STM.retry
n <- readTVar ncqSyncNo
ncqAlterEntrySTM ncq h $ \case ncqAlterEntrySTM ncq h $ \case
Just e -> Just e Just e -> Just e
Nothing -> do Nothing -> Just (NCQEntryNew n bs)
Just (NCQEntryNew bs) modifyTVar' ncqWriteQ (|> h)
writeTBQueue ncqWriteQ h signalTSem ncqWriteSem
pure h pure h
@ -168,8 +191,12 @@ ncqAlterEntrySTM ncq h alterFn = do
tve <- newTVar e tve <- newTVar e
modifyTVar' shard (HM.insert h tve) modifyTVar' shard (HM.insert h tve)
ncqStorageRun2 :: forall m . MonadUnliftIO m => NCQStorage2 -> m () ncqStorageRun2 :: forall m . MonadUnliftIO m
ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do => NCQStorage2
-> m ()
ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do
jobQ <- newTQueueIO
fname <- liftIO $ emptyTempFile "." "datafile-.data" fname <- liftIO $ emptyTempFile "." "datafile-.data"
@ -179,39 +206,44 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
ContT $ bracket none $ const do ContT $ bracket none $ const do
liftIO $ closeFd fh0 liftIO $ closeFd fh0
flip fix (fh0,0) \loop (fh,w) -> do jobz <- ContT $ withAsync $ forever (atomically (readTQueue jobQ) >>= join)
link jobz
what <- atomically do flip fix (fh0,0) $ \loop (fh,w) -> do
h <- tryReadTBQueue ncqWriteQ
stop <- readTVar ncqStorageStopReq
case (stop,h) of sync <- readTVarIO ncqStorageSyncReq
(False, Nothing) -> STM.retry
(True, Nothing) -> pure $ Left ()
(_, Just h) -> ncqLookupEntrySTM ncq h >>= \case
Nothing -> pure $ Right Nothing
Just (r,t) -> pure $ Right (Just (h,r,t))
case what of when (w > ncqFsync || sync) do
Left _ -> exit liftIO (fileSynchronise fh)
Right Nothing -> loop (fh,w) atomically do
Right (Just (h,r,t)) -> do writeTVar ncqStorageSyncReq False
n <- lift (appendEntry fh h r) modifyTVar' ncqSyncNo succ
loop (fh,0)
w' <- if (w + n) < ncqFsync then do chunk <- atomically do
pure (w + n) stop <- readTVar ncqStorageStopReq
else do sy <- readTVar ncqStorageSyncReq
liftIO $ fileSynchronise fh chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
pure 0
loop (fh, w') if | Seq.null chunk && stop -> pure $ Left ()
| Seq.null chunk && not (stop || sy) -> STM.retry
| otherwise -> pure $ Right chunk
case chunk of
Left{} -> exit ()
Right chu -> do
ws <- for chu $ \h -> do
atomically (ncqLookupEntrySTM ncq h) >>= \case
Nothing -> pure 0
Just (r,t) -> lift (appendEntry fh h r)
loop (fh, w + sum ws)
where where
exit = none
appendEntry :: Fd -> HashRef -> NCQEntry -> m Int appendEntry :: Fd -> HashRef -> NCQEntry -> m Int
appendEntry fh h (NCQEntryNew bs) = do appendEntry fh h (NCQEntryNew _ bs) = do
let ss = N.bytestring32 (32 + fromIntegral (BS.length bs)) let ss = N.bytestring32 (32 + fromIntegral (BS.length bs))
let section = ss <> coerce h <> bs let section = ss <> coerce h <> bs
liftIO (Posix.fdWrite fh section) <&> fromIntegral liftIO (Posix.fdWrite fh section) <&> fromIntegral
@ -219,3 +251,6 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
appendEntry fh h _ = do appendEntry fh h _ = do
pure 0 pure 0
{-# INLINE appendEntry #-}

View File

@ -615,9 +615,9 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do
notice "NO SHIT" notice "NO SHIT"
setLoggingOff @DEBUG -- setLoggingOff @DEBUG
for_ [1 .. tn] $ \tnn -> do for_ [1..tn] $ \tnn -> do
ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } ) ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } )
w <- ContT $ withAsync (ncqStorageRun2 ncq1) w <- ContT $ withAsync (ncqStorageRun2 ncq1)
@ -628,15 +628,15 @@ testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do
co <- BS.readFile n co <- BS.readFile n
ncqPutBS ncq1 co ncqPutBS ncq1 co
ncqStorageStop2 ncq1
performMajorGC
wait w
rm ncqDir
let tt = realToFrac @_ @(Fixed E2) t let tt = realToFrac @_ @(Fixed E2) t
let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2) let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2)
notice $ pretty tnn <+> pretty tt <+> pretty speed notice $ pretty tnn <+> pretty tt <+> pretty speed
rm ncqDir
lift $ ncqStorageStop2 ncq1
wait w
main :: IO () main :: IO ()
main = do main = do