mirror of https://github.com/voidlizard/hbs2
wip, fixing
This commit is contained in:
parent
0869b57971
commit
8d58c5d818
|
@ -169,6 +169,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||||||
readTVarIO ncqFileLock >>= mapM_ FL.unlockFile
|
readTVarIO ncqFileLock >>= mapM_ FL.unlockFile
|
||||||
|
|
||||||
ContT $ bracket none $ const $ liftIO do
|
ContT $ bracket none $ const $ liftIO do
|
||||||
|
void $ ncqStateDump ncq
|
||||||
debug "storage done"
|
debug "storage done"
|
||||||
|
|
||||||
ncqRemoveGarbage ncq
|
ncqRemoveGarbage ncq
|
||||||
|
@ -242,107 +243,120 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||||||
b <- ncqIndexCompactStep ncq
|
b <- ncqIndexCompactStep ncq
|
||||||
pure $ a || b
|
pure $ a || b
|
||||||
|
|
||||||
flip fix RunNew $ \loop -> \case
|
flip fix RunNew $ \loop s -> do
|
||||||
RunFin mfh -> do
|
-- debug $ viaShow s
|
||||||
liftIO $ for_ mfh closeFd
|
case s of
|
||||||
debug "exit storage"
|
RunFin mfh -> do
|
||||||
atomically $ pollSTM indexer >>= maybe STM.retry (const none)
|
liftIO $ for_ mfh closeFd
|
||||||
|
rest <- readTVarIO ncqWriteQ <&> Seq.length
|
||||||
|
debug $ "exit storage" <+> pretty rest
|
||||||
|
atomically $ pollSTM indexer >>= maybe STM.retry (const none)
|
||||||
|
|
||||||
RunNew -> do
|
RunNew -> do
|
||||||
alive <- readTVarIO ncqAlive
|
alive <- readTVarIO ncqAlive
|
||||||
empty <- readTVarIO ncqWriteQ <&> Seq.null
|
empty <- readTVarIO ncqWriteQ <&> Seq.null
|
||||||
if not alive && empty
|
if not alive && empty
|
||||||
then loop (RunFin Nothing)
|
then loop (RunFin Nothing)
|
||||||
else do
|
|
||||||
(fk, fhx) <- openNewDataFile
|
|
||||||
loop $ RunWrite (fk, fhx, 0, 0)
|
|
||||||
|
|
||||||
|
|
||||||
RunSync (fk, fh, w, total, continue) -> do
|
|
||||||
|
|
||||||
(stop,sync) <- atomically do
|
|
||||||
(,) <$> readTVar ncqStopReq
|
|
||||||
<*> readTVar ncqSyncReq
|
|
||||||
-- <*> readTVar ncqWriteEMA
|
|
||||||
|
|
||||||
let needClose = total >= ncqMinLog || stop
|
|
||||||
|
|
||||||
rest <- if not (sync || needClose || w > ncqFsync) then
|
|
||||||
pure w
|
|
||||||
else do
|
|
||||||
|
|
||||||
ss <- appendTailSection fh
|
|
||||||
liftIO (fileSynchronisePortable fh)
|
|
||||||
flushReplaces fk
|
|
||||||
|
|
||||||
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
|
|
||||||
|
|
||||||
-- atomically $ ncqDeferredWriteOpSTM ncq do
|
|
||||||
ncqStateUpdate ncq do
|
|
||||||
ncqStateAddFact (P (PData (DataFile fk) ss))
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
writeTVar ncqSyncReq False
|
|
||||||
modifyTVar ncqSyncNo succ
|
|
||||||
|
|
||||||
pure 0
|
|
||||||
|
|
||||||
if | needClose && continue -> do
|
|
||||||
liftIO $ closeFd fh
|
|
||||||
flushReplaces fk
|
|
||||||
debug $ "closeFd" <+> viaShow fh
|
|
||||||
atomically $ writeTQueue indexQ fk
|
|
||||||
loop RunNew
|
|
||||||
|
|
||||||
| not continue -> loop (RunFin (Just fh))
|
|
||||||
|
|
||||||
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
|
|
||||||
|
|
||||||
|
|
||||||
RunWrite (fk, fh, w, total') -> do
|
|
||||||
|
|
||||||
let timeoutMicro = 10_000_000
|
|
||||||
|
|
||||||
chunk <- liftIO $ timeout timeoutMicro $ atomically do
|
|
||||||
stop <- readTVar ncqStopReq
|
|
||||||
sy <- readTVar ncqSyncReq
|
|
||||||
|
|
||||||
chunk <- if not stop then
|
|
||||||
stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
|
|
||||||
else do
|
|
||||||
r <- readTVar ncqWriteQ
|
|
||||||
modifyTVar ncqWriteQ mempty
|
|
||||||
pure r
|
|
||||||
|
|
||||||
if | Seq.null chunk && stop -> pure $ Left ()
|
|
||||||
| Seq.null chunk && not (stop || sy) -> STM.retry
|
|
||||||
| otherwise -> pure $ Right chunk
|
|
||||||
|
|
||||||
case chunk of
|
|
||||||
Nothing -> do
|
|
||||||
liftIO $ join $ readTVarIO ncqOnRunWriteIdle
|
|
||||||
if w == 0 then do
|
|
||||||
loop $ RunWrite (fk,fh,w,total')
|
|
||||||
else do
|
else do
|
||||||
atomically $ writeTVar ncqSyncReq True
|
(fk, fhx) <- openNewDataFile
|
||||||
loop $ RunSync (fk, fh, w, total', True) -- exit ()
|
loop $ RunWrite (fk, fhx, 0, 0)
|
||||||
|
|
||||||
Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
|
|
||||||
|
|
||||||
Just (Right chu) -> do
|
RunSync (fk, fh, w, total, continue) -> do
|
||||||
ws <- for chu $ \h -> do
|
|
||||||
atomically (ncqLookupEntrySTM ncq h) >>= \case
|
|
||||||
Just (NCQEntry w, EntryHere bs) -> do
|
|
||||||
off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0)
|
|
||||||
n <- lift (appendSection fh bs)
|
|
||||||
let op = writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))
|
|
||||||
atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op])
|
|
||||||
pure n
|
|
||||||
|
|
||||||
_ -> pure 0
|
(stop,sync) <- atomically do
|
||||||
|
(,) <$> readTVar ncqStopReq
|
||||||
|
<*> readTVar ncqSyncReq
|
||||||
|
-- <*> readTVar ncqWriteEMA
|
||||||
|
|
||||||
let written = sum ws
|
let needClose = total >= ncqMinLog || stop
|
||||||
loop $ RunSync (fk, fh, w + written, total' + written, True)
|
|
||||||
|
rest <- if not (sync || needClose || w > ncqFsync) then
|
||||||
|
pure w
|
||||||
|
else do
|
||||||
|
|
||||||
|
ss <- appendTailSection fh
|
||||||
|
liftIO (fileSynchronisePortable fh)
|
||||||
|
flushReplaces fk
|
||||||
|
|
||||||
|
ncqStateUpdate ncq do
|
||||||
|
ncqStateAddFact (P (PData (DataFile fk) ss))
|
||||||
|
|
||||||
|
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
|
||||||
|
|
||||||
|
-- atomically $ ncqDeferredWriteOpSTM ncq do
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
writeTVar ncqSyncReq False
|
||||||
|
modifyTVar ncqSyncNo succ
|
||||||
|
|
||||||
|
pure 0
|
||||||
|
|
||||||
|
if | needClose && continue -> do
|
||||||
|
liftIO $ closeFd fh
|
||||||
|
flushReplaces fk
|
||||||
|
debug $ "closeFd" <+> viaShow fh
|
||||||
|
atomically $ writeTQueue indexQ fk
|
||||||
|
loop RunNew
|
||||||
|
|
||||||
|
| not continue -> loop (RunFin (Just fh))
|
||||||
|
|
||||||
|
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
|
||||||
|
|
||||||
|
|
||||||
|
RunWrite (fk, fh, w, total') -> do
|
||||||
|
|
||||||
|
let timeoutMicro = 10_000_000
|
||||||
|
|
||||||
|
chunk <- liftIO $ timeout timeoutMicro $ atomically do
|
||||||
|
stop <- readTVar ncqStopReq
|
||||||
|
sy <- readTVar ncqSyncReq
|
||||||
|
|
||||||
|
chunk <- if not stop then
|
||||||
|
stateTVar ncqWriteQ (Seq.splitAt 1)
|
||||||
|
else do
|
||||||
|
r <- readTVar ncqWriteQ
|
||||||
|
modifyTVar ncqWriteQ mempty
|
||||||
|
pure r
|
||||||
|
|
||||||
|
if | Seq.null chunk && stop -> pure $ Left ()
|
||||||
|
| Seq.null chunk && not (stop || sy) -> STM.retry
|
||||||
|
| otherwise -> pure $ Right chunk
|
||||||
|
|
||||||
|
stop <- readTVarIO ncqStopReq
|
||||||
|
|
||||||
|
case chunk of
|
||||||
|
Nothing -> do
|
||||||
|
liftIO $ join $ readTVarIO ncqOnRunWriteIdle
|
||||||
|
stop <- readTVarIO ncqStopReq
|
||||||
|
if w == 0 && not stop then do
|
||||||
|
loop $ RunWrite (fk,fh,w,total')
|
||||||
|
else do
|
||||||
|
atomically $ writeTVar ncqSyncReq True
|
||||||
|
loop $ RunSync (fk, fh, w, total', not stop) -- exit ()
|
||||||
|
|
||||||
|
Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
|
||||||
|
|
||||||
|
Just (Right chu) -> do
|
||||||
|
ws <- for chu $ \h -> do
|
||||||
|
atomically (ncqLookupEntrySTM ncq h) >>= \case
|
||||||
|
Just (NCQEntry w, EntryHere bs) -> do
|
||||||
|
off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0)
|
||||||
|
n <- lift (appendSection fh bs)
|
||||||
|
|
||||||
|
let op = do
|
||||||
|
readTVar w >>= \case
|
||||||
|
EntryHere bs1 | bs1 == bs -> do
|
||||||
|
writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))
|
||||||
|
_ -> none
|
||||||
|
|
||||||
|
atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op])
|
||||||
|
pure n
|
||||||
|
|
||||||
|
_ -> pure 0
|
||||||
|
|
||||||
|
let written = sum ws
|
||||||
|
loop $ RunSync (fk, fh, w + written, total' + written, not stop)
|
||||||
|
|
||||||
mapM_ wait [indexer]
|
mapM_ wait [indexer]
|
||||||
|
|
||||||
|
@ -350,11 +364,11 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||||||
setAlive = atomically $ writeTVar ncqAlive True
|
setAlive = atomically $ writeTVar ncqAlive True
|
||||||
unsetAlive = atomically $ writeTVar ncqAlive False
|
unsetAlive = atomically $ writeTVar ncqAlive False
|
||||||
|
|
||||||
dropReplaces :: forall m . MonadIO m => FileKey -> m ()
|
dropReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 ()
|
||||||
dropReplaces fk = atomically do
|
dropReplaces fk = atomically do
|
||||||
modifyTVar ncqReplQueue (HM.delete fk)
|
modifyTVar ncqReplQueue (HM.delete fk)
|
||||||
|
|
||||||
flushReplaces :: forall m . MonadIO m => FileKey -> m ()
|
flushReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 ()
|
||||||
flushReplaces fk = do
|
flushReplaces fk = do
|
||||||
atomically do
|
atomically do
|
||||||
ncqDelCachedDataSTM ncq fk
|
ncqDelCachedDataSTM ncq fk
|
||||||
|
@ -368,7 +382,9 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||||||
|
|
||||||
atomically $ modifyTVar ncqCurrentFossils (HS.insert fk)
|
atomically $ modifyTVar ncqCurrentFossils (HS.insert fk)
|
||||||
|
|
||||||
ncqStateUpdate ncq (ncqStateAddDataFile fk)
|
ncqStateUpdate ncq do
|
||||||
|
ncqStateAddFact (P (PData (DataFile fk) 0))
|
||||||
|
ncqStateAddDataFile fk
|
||||||
|
|
||||||
let fname = ncqGetFileName ncq (DataFile fk)
|
let fname = ncqGetFileName ncq (DataFile fk)
|
||||||
-- touch fname
|
-- touch fname
|
||||||
|
@ -425,7 +441,6 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||||||
when (k2 == k1) $ waitState k2
|
when (k2 == k1) $ waitState k2
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
data RunSt =
|
data RunSt =
|
||||||
RunNew
|
RunNew
|
||||||
| RunWrite (FileKey, Fd, Int, Int)
|
| RunWrite (FileKey, Fd, Int, Int)
|
||||||
|
|
|
@ -27,6 +27,17 @@ newtype StateOP a =
|
||||||
|
|
||||||
{- HLINT ignore "Eta reduce"-}
|
{- HLINT ignore "Eta reduce"-}
|
||||||
|
|
||||||
|
ncqStateDump :: MonadIO m
|
||||||
|
=> NCQStorage
|
||||||
|
-> m FileKey
|
||||||
|
ncqStateDump ncq@NCQStorage{..} = do
|
||||||
|
state <- readTVarIO ncqState
|
||||||
|
key <- ncqGetNewFileKey ncq StateFile
|
||||||
|
let snkFile = ncqGetFileName ncq (StateFile key)
|
||||||
|
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
|
||||||
|
IO.hPrint fh (pretty state)
|
||||||
|
pure key
|
||||||
|
|
||||||
ncqStateUpdateLoop :: MonadIO m
|
ncqStateUpdateLoop :: MonadIO m
|
||||||
=> NCQStorage
|
=> NCQStorage
|
||||||
-> m ()
|
-> m ()
|
||||||
|
@ -43,10 +54,7 @@ ncqStateUpdateLoop ncq@NCQStorage{..} = do
|
||||||
stop <- readTVar ncqStopReq
|
stop <- readTVar ncqStopReq
|
||||||
if s1 == s0 && not stop then STM.retry else pure s1
|
if s1 == s0 && not stop then STM.retry else pure s1
|
||||||
|
|
||||||
key <- ncqGetNewFileKey ncq StateFile
|
key <- ncqStateDump ncq
|
||||||
let snkFile = ncqGetFileName ncq (StateFile key)
|
|
||||||
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
|
|
||||||
IO.hPrint fh (pretty state)
|
|
||||||
|
|
||||||
done <- atomically do
|
done <- atomically do
|
||||||
writeTVar ncqStateKey key
|
writeTVar ncqStateKey key
|
||||||
|
|
|
@ -141,12 +141,11 @@ ncq3Tests = do
|
||||||
h <- ncqPutBS sto (Just B) Nothing bs
|
h <- ncqPutBS sto (Just B) Nothing bs
|
||||||
found <- ncqLocate sto h <&> isJust
|
found <- ncqLocate sto h <&> isJust
|
||||||
liftIO $ assertBool (show $ "found" <+> pretty h) found
|
liftIO $ assertBool (show $ "found" <+> pretty h) found
|
||||||
|
debug $ "written" <+> pretty h <+> pretty (BS.length bs)
|
||||||
atomically do
|
atomically do
|
||||||
writeTQueue hq h
|
writeTQueue hq h
|
||||||
modifyTVar w1 succ
|
modifyTVar w1 succ
|
||||||
|
|
||||||
ncqStorageStop sto
|
|
||||||
|
|
||||||
ncqWithStorage testEnvDir $ \sto -> do
|
ncqWithStorage testEnvDir $ \sto -> do
|
||||||
notice $ "reopen/lookup" <+> pretty num
|
notice $ "reopen/lookup" <+> pretty num
|
||||||
hh <- atomically $ STM.flushTQueue hq
|
hh <- atomically $ STM.flushTQueue hq
|
||||||
|
@ -494,13 +493,13 @@ ncq3Tests = do
|
||||||
>>= orThrowUser ("missed" <+> pretty h)
|
>>= orThrowUser ("missed" <+> pretty h)
|
||||||
|
|
||||||
unless (ncqEntrySize loc == ncqTombEntrySize) do
|
unless (ncqEntrySize loc == ncqTombEntrySize) do
|
||||||
notice $ pretty h <+> pretty (ncqEntrySize loc) <+> pretty ncqTombEntrySize
|
err $ red (pretty h) <+> pretty (ncqEntrySize loc) <+> pretty ncqTombEntrySize
|
||||||
|
|
||||||
liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc)
|
-- liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc)
|
||||||
|
|
||||||
-- ncqIndexCompactFull sto
|
-- ncqIndexCompactFull sto
|
||||||
-- ncqStorageStop
|
-- ncqStorageStop
|
||||||
pause @'Seconds 11
|
-- pause @'Seconds 11
|
||||||
|
|
||||||
ncqWithStorage dir $ \sto -> do
|
ncqWithStorage dir $ \sto -> do
|
||||||
-- notice "check deleted"
|
-- notice "check deleted"
|
||||||
|
@ -509,9 +508,10 @@ ncq3Tests = do
|
||||||
for_ hashes $ \h -> do
|
for_ hashes $ \h -> do
|
||||||
|
|
||||||
ncqLocate sto h >>= \case
|
ncqLocate sto h >>= \case
|
||||||
Nothing -> notice $ "not-found" <+> pretty h
|
Nothing -> err $ red "not-found" <+> pretty h
|
||||||
Just loc -> do
|
Just loc -> do
|
||||||
liftIO $ assertBool (show $ "tomb/1" <+> pretty h) (ncqIsTomb loc)
|
unless (ncqIsTomb loc) do
|
||||||
|
err $ red ("tomb/1" <+> pretty h)
|
||||||
|
|
||||||
|
|
||||||
entry $ bindMatch "test:ncq3:del2" $ nil_ $ \syn -> do
|
entry $ bindMatch "test:ncq3:del2" $ nil_ $ \syn -> do
|
||||||
|
|
Loading…
Reference in New Issue