mirror of https://github.com/voidlizard/hbs2
wip, ncq2 new writer
This commit is contained in:
parent
91a0af9ee3
commit
a1e6ff50f9
|
@ -234,29 +234,36 @@ data RunSt =
|
|||
RunNew
|
||||
| RunWrite (FileKey, Fd, Int, Int)
|
||||
| RunSync (FileKey, Fd, Int, Int, Bool)
|
||||
| RunFin
|
||||
|
||||
ncqStorageRun2 :: forall m . MonadUnliftIO m
|
||||
=> NCQStorage2
|
||||
-> m ()
|
||||
ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do
|
||||
ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
|
||||
|
||||
jobQ <- newTQueueIO
|
||||
closeQ <- newTQueueIO
|
||||
|
||||
closer <- ContT $ withAsync $ liftIO $ forever do
|
||||
atomically (readTQueue closeQ) >>= \(fk, fh) -> do
|
||||
closeFd fh
|
||||
let fname = BS8.unpack (coerce fk)
|
||||
-- notice $ yellow "indexing" <+> pretty fname
|
||||
idx <- ncqIndexFile ncq fname
|
||||
nwayHashMMapReadOnly idx >>= \case
|
||||
Nothing -> err $ "can't open index" <+> pretty idx
|
||||
Just (bs,nway) -> do
|
||||
nwayHashScanAll nway bs $ \_ k _ -> do
|
||||
unless (k == emptyKey) do
|
||||
none
|
||||
atomically do
|
||||
closer <- ContT $ withAsync $ liftIO $ fix \loop -> do
|
||||
what <- atomically do
|
||||
stop <- readTVar ncqStorageStopReq
|
||||
tryReadTQueue closeQ >>= \case
|
||||
Just e -> pure $ Just e
|
||||
Nothing | not stop -> STM.retry
|
||||
| otherwise -> pure Nothing
|
||||
|
||||
maybe1 what none $ \(fk, fh) -> do
|
||||
closeFd fh
|
||||
let fname = BS8.unpack (coerce fk)
|
||||
-- notice $ yellow "indexing" <+> pretty fname
|
||||
idx <- ncqIndexFile ncq fname
|
||||
nwayHashMMapReadOnly idx >>= \case
|
||||
Nothing -> err $ "can't open index" <+> pretty idx
|
||||
Just (bs,nway) -> do
|
||||
nwayHashScanAll nway bs $ \_ k _ -> do
|
||||
unless (k == emptyKey) $ atomically do
|
||||
ncqAlterEntrySTM ncq (coerce k) (const Nothing)
|
||||
loop
|
||||
|
||||
link closer
|
||||
|
||||
|
@ -269,20 +276,26 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do
|
|||
|
||||
flip fix RunNew $ \loop -> \case
|
||||
|
||||
RunFin -> do
|
||||
debug "wait finalizing"
|
||||
atomically $ pollSTM closer >>= maybe STM.retry (const none)
|
||||
debug "exit storage"
|
||||
|
||||
RunNew -> do
|
||||
stop <- readTVarIO ncqStorageStopReq
|
||||
mt <- readTVarIO ncqWriteQ <&> Seq.null
|
||||
|
||||
when (stop && mt) do
|
||||
exit ()
|
||||
|
||||
(fk,fhx) <- openNewDataFile
|
||||
loop $ RunWrite (fk,fhx,0,0)
|
||||
if stop && mt then do
|
||||
loop RunFin
|
||||
else do
|
||||
(fk,fhx) <- openNewDataFile
|
||||
loop $ RunWrite (fk,fhx,0,0)
|
||||
|
||||
RunSync (fk, fh, w, total, continue) -> do
|
||||
|
||||
stop <- readTVarIO ncqStorageStopReq
|
||||
sync <- readTVarIO ncqStorageSyncReq
|
||||
let needClose = total >= ncqMinLog
|
||||
let needClose = total >= ncqMinLog || stop
|
||||
|
||||
rest <- if not (sync || needClose || w > ncqFsync) then
|
||||
pure w
|
||||
|
@ -299,7 +312,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure $ callCC \exit -> do
|
|||
atomically $ writeTQueue closeQ (fk, fh)
|
||||
loop RunNew
|
||||
|
||||
| not continue -> exit ()
|
||||
| not continue -> loop RunFin
|
||||
|
||||
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
|
||||
|
||||
|
|
|
@ -130,9 +130,11 @@ runTest action = do
|
|||
flip runContT pure do
|
||||
ContT $ bracket none $ const do
|
||||
unless keep (rm tmp)
|
||||
flushLoggers
|
||||
|
||||
lift $ lift $ action (TestEnv tmp)
|
||||
|
||||
|
||||
testNCQFuckupRecovery1 :: MonadUnliftIO m
|
||||
=> TestEnv
|
||||
-> m ()
|
||||
|
@ -680,6 +682,7 @@ testNCQ2ConcurrentWriteSimple1 tn n TestEnv{..} = flip runContT pure do
|
|||
ncqPutBS ncq1 (Just B) Nothing co
|
||||
|
||||
liftIO $ ncqStorageStop2 ncq1
|
||||
wait w
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
|
Loading…
Reference in New Issue