mirror of https://github.com/voidlizard/hbs2
deferred rm, sux
This commit is contained in:
parent
4f5842ef7f
commit
2807f326c6
|
@ -137,6 +137,8 @@ data NCQStorage2 =
|
||||||
, ncqStorageSyncReq :: TVar Bool
|
, ncqStorageSyncReq :: TVar Bool
|
||||||
, ncqSyncNo :: TVar Int
|
, ncqSyncNo :: TVar Int
|
||||||
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
|
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
|
||||||
|
, ncqFilesUsage :: TVar (HashMap FileKey Int)
|
||||||
|
, ncqFilesToRemove :: TVar (HashSet FileKey)
|
||||||
, ncqCachedEntries :: TVar Int
|
, ncqCachedEntries :: TVar Int
|
||||||
, ncqWrites :: TVar Int
|
, ncqWrites :: TVar Int
|
||||||
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
|
||||||
|
@ -165,6 +167,8 @@ ncqStorageOpen2 fp upd = do
|
||||||
ncqStorageSyncReq <- newTVarIO False
|
ncqStorageSyncReq <- newTVarIO False
|
||||||
ncqSyncNo <- newTVarIO 0
|
ncqSyncNo <- newTVarIO 0
|
||||||
ncqTrackedFiles <- newTVarIO HPSQ.empty
|
ncqTrackedFiles <- newTVarIO HPSQ.empty
|
||||||
|
ncqFilesUsage <- newTVarIO mempty
|
||||||
|
ncqFilesToRemove <- newTVarIO mempty
|
||||||
ncqCachedEntries <- newTVarIO 0
|
ncqCachedEntries <- newTVarIO 0
|
||||||
ncqStorageTasks <- newTVarIO 0
|
ncqStorageTasks <- newTVarIO 0
|
||||||
ncqWrites <- newTVarIO 0
|
ncqWrites <- newTVarIO 0
|
||||||
|
@ -279,12 +283,23 @@ ncqLocate2 ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . InMemory . coerce)
|
lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . InMemory . coerce)
|
||||||
|
|
||||||
-- atomically $ modifyTVar' ncqWrites succ
|
atomically $ modifyTVar' ncqWrites succ
|
||||||
|
|
||||||
-- FIXME: race
|
-- FIXME: race
|
||||||
-- merge can-delete-file-while-in-use
|
-- merge can-delete-file-while-in-use
|
||||||
|
|
||||||
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
|
tracked <- atomically do
|
||||||
|
fs <- readTVar ncqTrackedFiles <&> HPSQ.toList
|
||||||
|
for_ fs $ \(k,_,_) -> modifyTVar ncqFilesUsage (HM.insertWith (+) k 1)
|
||||||
|
pure fs
|
||||||
|
|
||||||
|
let sweep = \case
|
||||||
|
Nothing -> Nothing
|
||||||
|
Just v | v <= 0 -> Nothing
|
||||||
|
Just v -> Just (pred v)
|
||||||
|
|
||||||
|
ContT $ bracket none $ const $ atomically do
|
||||||
|
for_ tracked $ \(k,_,_) -> modifyTVar ncqFilesUsage (HM.alter sweep k)
|
||||||
|
|
||||||
for_ tracked $ \(fk, prio, mCached) -> do
|
for_ tracked $ \(fk, prio, mCached) -> do
|
||||||
case mCached of
|
case mCached of
|
||||||
|
@ -377,6 +392,35 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
|
||||||
ncqAlterEntrySTM ncq (coerce k) (const Nothing)
|
ncqAlterEntrySTM ncq (coerce k) (const Nothing)
|
||||||
loop
|
loop
|
||||||
|
|
||||||
|
spawnActivity $ forever do
|
||||||
|
rmz <- atomically do
|
||||||
|
rms <- readTVar ncqFilesToRemove <&> HS.toList
|
||||||
|
usage <- readTVar ncqFilesUsage
|
||||||
|
|
||||||
|
what <- for rms $ \k -> do
|
||||||
|
if fromMaybe 0 (HM.lookup k usage) <= 0 then
|
||||||
|
pure (Right k)
|
||||||
|
else
|
||||||
|
pure (Left k)
|
||||||
|
|
||||||
|
let r = rights what
|
||||||
|
let l = lefts what
|
||||||
|
|
||||||
|
when (List.null r) STM.retry
|
||||||
|
|
||||||
|
writeTVar ncqFilesToRemove (HS.fromList l)
|
||||||
|
|
||||||
|
pure r
|
||||||
|
|
||||||
|
for_ rmz $ \k -> do
|
||||||
|
let d = ncqGetFileName ncq (toFileName (DataFile k))
|
||||||
|
let i = ncqGetFileName ncq (toFileName (IndexFile k))
|
||||||
|
|
||||||
|
debug $ red "remove" <+> pretty d
|
||||||
|
debug $ red "remove" <+> pretty i
|
||||||
|
|
||||||
|
rm d >> rm i
|
||||||
|
|
||||||
spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ))
|
spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ))
|
||||||
|
|
||||||
spawnActivity measureWPS
|
spawnActivity measureWPS
|
||||||
|
@ -677,7 +721,19 @@ ncqRepair me@NCQStorage2{} = do
|
||||||
|
|
||||||
for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do
|
for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do
|
||||||
let dataFile = ncqGetFileName me $ toFileName (DataFile fo)
|
let dataFile = ncqGetFileName me $ toFileName (DataFile fo)
|
||||||
|
|
||||||
|
stat <- liftIO $ PFS.getFileStatus dataFile
|
||||||
|
-- FIXME: maybe-creation-time-actually
|
||||||
|
let ts = PFS.modificationTimeHiRes stat
|
||||||
|
|
||||||
try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case
|
try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case
|
||||||
|
|
||||||
|
_ | ts == 0 -> do
|
||||||
|
|
||||||
|
warn $ red "remove obsolete files" <+> pretty fo
|
||||||
|
rm dataFile
|
||||||
|
rm (ncqGetFileName me (toFileName (IndexFile fo)))
|
||||||
|
|
||||||
Left e -> do
|
Left e -> do
|
||||||
err (viaShow e)
|
err (viaShow e)
|
||||||
-- TODO: try-fix-later
|
-- TODO: try-fix-later
|
||||||
|
@ -818,13 +874,15 @@ ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT
|
||||||
pure $ Just (ts,fk)
|
pure $ Just (ts,fk)
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar ncqTrackedFiles (HPSQ.delete a)
|
for_ [a,b] $ \x -> do
|
||||||
modifyTVar ncqTrackedFiles (HPSQ.delete b)
|
modifyTVar ncqTrackedFiles (HPSQ.delete x)
|
||||||
|
modifyTVar ncqFilesToRemove (HS.insert x)
|
||||||
|
|
||||||
for_ idx $ \(ts,fk) -> do
|
for_ idx $ \(ts,fk) -> do
|
||||||
ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts)
|
ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts)
|
||||||
|
|
||||||
mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA]
|
for_ [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] $ \fn -> do
|
||||||
|
setFileTimesHiRes fn 0 0
|
||||||
|
|
||||||
orFail what e = do
|
orFail what e = do
|
||||||
r <- what
|
r <- what
|
||||||
|
|
Loading…
Reference in New Issue