wip, fix writing tails

This commit is contained in:
voidlizard 2025-08-22 18:59:00 +03:00
parent 996be0c16c
commit 0869b57971
4 changed files with 29 additions and 8 deletions

View File

@ -74,6 +74,7 @@ ncqStorageOpen fp upd = do
ncqRunSem <- atomically $ newTSem 1 ncqRunSem <- atomically $ newTSem 1
ncqFileLock <- newTVarIO Nothing ncqFileLock <- newTVarIO Nothing
ncqCurrentFossils <- newTVarIO mempty ncqCurrentFossils <- newTVarIO mempty
ncqReplQueue <- newTVarIO mempty
let ncq = NCQStorage{..} & upd let ncq = NCQStorage{..} & upd

View File

@ -175,11 +175,11 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
liftIO (ncqTryLoadState ncq) liftIO (ncqTryLoadState ncq)
closeQ <- liftIO newTQueueIO indexQ <- liftIO newTQueueIO
closer <- spawnActivity $ liftIO $ fix \loop -> do indexer <- spawnActivity $ liftIO $ fix \loop -> do
what <- atomically do what <- atomically do
tryReadTQueue closeQ >>= \case tryReadTQueue indexQ >>= \case
Just e -> pure $ Just e Just e -> pure $ Just e
Nothing -> do Nothing -> do
stop <- readTVar ncqStopReq stop <- readTVar ncqStopReq
@ -187,6 +187,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
maybe1 what none $ \(fk :: FileKey) -> do maybe1 what none $ \(fk :: FileKey) -> do
ncqIndexFile ncq Nothing (DataFile fk) ncqIndexFile ncq Nothing (DataFile fk)
dropReplaces fk
atomically $ modifyTVar ncqCurrentFossils (HS.delete fk) atomically $ modifyTVar ncqCurrentFossils (HS.delete fk)
loop loop
@ -245,7 +246,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
RunFin mfh -> do RunFin mfh -> do
liftIO $ for_ mfh closeFd liftIO $ for_ mfh closeFd
debug "exit storage" debug "exit storage"
atomically $ pollSTM closer >>= maybe STM.retry (const none) atomically $ pollSTM indexer >>= maybe STM.retry (const none)
RunNew -> do RunNew -> do
alive <- readTVarIO ncqAlive alive <- readTVarIO ncqAlive
@ -272,6 +273,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
ss <- appendTailSection fh ss <- appendTailSection fh
liftIO (fileSynchronisePortable fh) liftIO (fileSynchronisePortable fh)
flushReplaces fk
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
@ -287,8 +289,9 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
if | needClose && continue -> do if | needClose && continue -> do
liftIO $ closeFd fh liftIO $ closeFd fh
flushReplaces fk
debug $ "closeFd" <+> viaShow fh debug $ "closeFd" <+> viaShow fh
atomically $ writeTQueue closeQ fk atomically $ writeTQueue indexQ fk
loop RunNew loop RunNew
| not continue -> loop (RunFin (Just fh)) | not continue -> loop (RunFin (Just fh))
@ -332,7 +335,8 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
Just (NCQEntry w, EntryHere bs) -> do Just (NCQEntry w, EntryHere bs) -> do
off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0) off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0)
n <- lift (appendSection fh bs) n <- lift (appendSection fh bs)
atomically (writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))) let op = writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))
atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op])
pure n pure n
_ -> pure 0 _ -> pure 0
@ -340,12 +344,24 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
let written = sum ws let written = sum ws
loop $ RunSync (fk, fh, w + written, total' + written, True) loop $ RunSync (fk, fh, w + written, total' + written, True)
mapM_ wait [closer] mapM_ wait [indexer]
where where
setAlive = atomically $ writeTVar ncqAlive True setAlive = atomically $ writeTVar ncqAlive True
unsetAlive = atomically $ writeTVar ncqAlive False unsetAlive = atomically $ writeTVar ncqAlive False
dropReplaces :: forall m . MonadIO m => FileKey -> m ()
dropReplaces fk = atomically do
modifyTVar ncqReplQueue (HM.delete fk)
flushReplaces :: forall m . MonadIO m => FileKey -> m ()
flushReplaces fk = do
atomically do
ncqDelCachedDataSTM ncq fk
ops <- readTVar ncqReplQueue <&> fromMaybe mempty . HM.lookup fk
modifyTVar ncqReplQueue (HM.delete fk)
sequence_ ops
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
openNewDataFile = do openNewDataFile = do
fk <- ncqGetNewFileKey ncq DataFile fk <- ncqGetNewFileKey ncq DataFile
@ -415,5 +431,6 @@ data RunSt =
| RunWrite (FileKey, Fd, Int, Int) | RunWrite (FileKey, Fd, Int, Int)
| RunSync (FileKey, Fd, Int, Int, Bool) | RunSync (FileKey, Fd, Int, Int, Bool)
| RunFin (Maybe Fd) | RunFin (Maybe Fd)
deriving stock Show

View File

@ -122,6 +122,7 @@ data NCQStorage =
, ncqSyncNo :: TVar Int , ncqSyncNo :: TVar Int
, ncqServiceSem :: TSem , ncqServiceSem :: TSem
, ncqRunSem :: TSem , ncqRunSem :: TSem
, ncqReplQueue :: TVar (HashMap FileKey [STM ()])
, ncqFileLock :: TVar (Maybe FileLock) , ncqFileLock :: TVar (Maybe FileLock)
} }
deriving stock (Generic) deriving stock (Generic)

View File

@ -498,7 +498,9 @@ ncq3Tests = do
liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc) liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc)
ncqIndexCompactFull sto -- ncqIndexCompactFull sto
-- ncqStorageStop
pause @'Seconds 11
ncqWithStorage dir $ \sto -> do ncqWithStorage dir $ \sto -> do
-- notice "check deleted" -- notice "check deleted"