new merge

This commit is contained in:
voidlizard 2025-06-03 10:10:36 +03:00
parent b308c10343
commit 5a8ad51ee4
1 changed files with 61 additions and 48 deletions

View File

@ -151,7 +151,7 @@ data NCQStorage =
, ncqGen :: Int , ncqGen :: Int
, ncqSyncSize :: Int , ncqSyncSize :: Int
, ncqMinLog :: Int , ncqMinLog :: Int
, ncqMaxLog :: Int , ncqMaxSegments :: Int
, ncqMaxCached :: Int , ncqMaxCached :: Int
, ncqCompactTreshold :: Int , ncqCompactTreshold :: Int
, ncqSalt :: HashRef , ncqSalt :: HashRef
@ -427,10 +427,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
merge <- makeMerge merge <- makeMerge
compact <- makeCompact compact <- makeCompact
checkCompact <- makeCheckCompact checkCompact <- makeCheckCompact
checkMerge <- makeCheckMerge
flagWatcher <- makeFlagWatcher flagWatcher <- makeFlagWatcher
mapM_ waitCatch [writer,indexer,merge,compact] mapM_ waitCatch [writer,indexer,merge,compact]
mapM_ cancel [reader,flagWatcher,checkCompact] mapM_ cancel [reader,flagWatcher,checkCompact,checkMerge]
where where
@ -507,7 +508,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
debug "SCAN/CHECK FOR COMPACT" debug "SCAN/CHECK FOR COMPACT"
profit <- ncqLinearScanForCompact ncq (\_ _ -> none) profit <- ncqLinearScanForCompact ncq (\_ _ -> none)
-- FIXME: profit-hardcode -- FIXME: profit-hardcode
when (profit >= ncqCompactTreshold ) do when ( profit >= ncqCompactTreshold ) do
atomically $ modifyTVar ncqCompactReq succ atomically $ modifyTVar ncqCompactReq succ
makeCompact = do makeCompact = do
@ -536,6 +537,14 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
link me link me
pure me pure me
makeCheckMerge = do
ContT $ withAsync $ untilStopped do
pause @'Seconds 600
debug "CHECK FOR MERGE"
num <- readTVarIO ncqTrackedFiles <&> HPSQ.size
when (num > ncqMaxSegments) do
atomically $ modifyTVar ncqMergeReq succ
makeMerge = do makeMerge = do
me <- ContT $ withAsync $ untilStopped do me <- ContT $ withAsync $ untilStopped do
micropause @'Seconds 10 micropause @'Seconds 10
@ -1241,10 +1250,10 @@ ncqStorageInit_ check path = do
let ncqRoot = path let ncqRoot = path
let ncqSyncSize = 64 * (1024 ^ 2) let ncqSyncSize = 64 * (1024 ^ 2)
let ncqMinLog = 1024 * (1024 ^ 2) let ncqMinLog = 1024 * (1024 ^ 2)
let ncqMaxLog = 4 * (1024 ^ 3) let ncqMaxSegments = 64
let ncqCompactTreshold = 128 * 1024^2 let ncqCompactTreshold = 128 * 1024^2
let ncqMaxCached = 128 let ncqMaxCached = 128
@ -1475,26 +1484,37 @@ ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ
ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage -> m () ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageMergeStep ncq@NCQStorage{..} = do ncqStorageMergeStep ncq@NCQStorage{..} = flip runContT pure do
tracked <- readTVarIO ncqTrackedFiles
<&> HPSQ.toList
<&> fmap (over _2 (coerce @_ @TimeSpec))
<&> List.sortOn (view _2)
<&> List.take 2
ContT $ bracket ( atomically (takeTMVar ncqCompactBusy) ) $ const do
atomically $ putTMVar ncqCompactBusy ()
for_ tracked $ \(f, t, _) -> do liftIO do
debug $ "FILE TO MERGE" <+> pretty (realToFrac @_ @(Fixed E6) t) <+> pretty f
mergeStep (fmap (view _1) tracked) tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
files <- for tracked $ \(f,p,_) -> do
let fn = ncqGetDataFileName ncq f
sz <- liftIO (fileSize fn)
pure (f, sz, p)
let found = flip fix (files, Nothing, Nothing) $ \next -> \case
([], _, r) -> r
(a:b:rest, Nothing, _) ->
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
(a:b:rest, Just s, _ ) | view _2 a + view _2 b < s ->
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
(_:rest, s, r) ->
next (rest, s, r)
case found of
Just (a,b) -> mergeStep a b
_ -> none
where where
mergeStep (a,_,p1) (b,_,p2) = do
mergeStep [] = none
mergeStep [_] = none
mergeStep [b,a] = do
warn $ "merge" <+> pretty a <+> pretty b warn $ "merge" <+> pretty a <+> pretty b
let fDataNameA = ncqGetDataFileName ncq a let fDataNameA = ncqGetDataFileName ncq a
@ -1503,8 +1523,8 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do
let fDataNameB = ncqGetDataFileName ncq b let fDataNameB = ncqGetDataFileName ncq b
let fIndexNameB = ncqGetIndexFileName ncq b let fIndexNameB = ncqGetIndexFileName ncq b
warn $ "file A" <+> pretty fDataNameA <+> pretty fIndexNameA warn $ "file A" <+> pretty (timeSpecFromFilePrio p1) <+> pretty fDataNameA <+> pretty fIndexNameA
warn $ "file B" <+> pretty fDataNameB <+> pretty fIndexNameB warn $ "file B" <+> pretty (timeSpecFromFilePrio p2) <+> pretty fDataNameB <+> pretty fIndexNameB
doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA)
doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB)
@ -1526,52 +1546,45 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do
debug $ "SCAN FILE A" <+> pretty fDataNameA debug $ "SCAN FILE A" <+> pretty fDataNameA
writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do writeFiltered ncq fDataNameA fwh $ \_ _ _ _ -> do
pure $ not (ncqIsTomb (LBS.fromStrict v)) pure True
debug $ "SCAN FILE B" <+> pretty fDataNameA debug $ "SCAN FILE B" <+> pretty fDataNameA
writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do writeFiltered ncq fDataNameB fwh $ \_ _ k _ -> do
let tomb = ncqIsTomb (LBS.fromStrict v)
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
let skip = tomb || foundInA let skip = foundInA
pure $ not skip pure $ not skip
result <- fileSize mfile
when (result == 0) $ exit ()
liftIO do liftIO do
fossil <- ncqGetNewFossilName ncq result <- fileSize mfile
mv mfile fossil
statA <- getFileStatus fDataNameA fp' <- if result == 0 then
pure Nothing
let ts = modificationTimeHiRes statA else do
setFileTimesHiRes fossil ts ts fossil <- ncqGetNewFossilName ncq
mv mfile fossil
fname <- ncqIndexFile ncq fossil statA <- getFileStatus fDataNameA
let ts = modificationTimeHiRes statA
setFileTimesHiRes fossil ts ts
Just . (ts,) . fromString <$> ncqIndexFile ncq fossil
atomically do atomically do
let fp = fromString fname
modifyTVar ncqTrackedFiles (HPSQ.delete a) modifyTVar ncqTrackedFiles (HPSQ.delete a)
modifyTVar ncqTrackedFiles (HPSQ.delete b) modifyTVar ncqTrackedFiles (HPSQ.delete b)
ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)] for_ fp' $ \(ts,fp) -> do
ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)]
mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA]
mergeStep _ = do
mergeError "assertion failed: more than 2 files to merge"
mergeError d = throwIO (NCQMergeInvariantFailed (show d))
orFail what e = do orFail what e = do
r <- what r <- what
unless r (throwIO (NCQMergeInvariantFailed (show e))) unless r (throwIO (NCQMergeInvariantFailed (show e)))
posixToTimeSpec :: POSIXTime -> TimeSpec posixToTimeSpec :: POSIXTime -> TimeSpec
posixToTimeSpec pt = posixToTimeSpec pt =
let (s, frac) = properFraction pt :: (Integer, POSIXTime) let (s, frac) = properFraction pt :: (Integer, POSIXTime)
@ -1692,8 +1705,8 @@ ncqCompact ncq@NCQStorage{..} = do
liftIO do liftIO do
withBinaryFileAtomic mfile WriteMode $ \fwh -> do withBinaryFileAtomic mfile WriteMode $ \fwh -> do
writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
pure $ not $ HS.member k es pure $ not $ HS.member k es
result <- fileSize mfile result <- fileSize mfile