diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index c38de755..2510ba75 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -132,6 +132,7 @@ data NCQStorage2 = , ncqWriteQLen :: Int , ncqWriteBlock :: Int , ncqMinLog :: Int + , ncqMaxLog :: Int , ncqMaxCached :: Int , ncqIdleThrsh :: Double , ncqMemTable :: Vector Shard @@ -143,9 +144,11 @@ data NCQStorage2 = , ncqMergeReq :: TVar Bool , ncqMergeSem :: TSem , ncqSyncNo :: TVar Int + , ncqCurrentFiles :: TVar (HashSet FileKey) , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqStateVersion :: TVar StateVersion , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) + , ncqStateName :: TVar (Maybe FilePath) , ncqCachedEntries :: TVar Int , ncqWrites :: TVar Int , ncqWriteEMA :: TVar Double -- for writes-per-seconds @@ -155,6 +158,9 @@ data NCQStorage2 = megabytes :: forall a . Integral a => a megabytes = 1024 ^ 2 +gigabytes :: forall a . Integral a => a +gigabytes = 1024 ^ 3 + ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 ncqStorageOpen2 fp upd = do let ncqRoot = fp @@ -162,6 +168,7 @@ ncqStorageOpen2 fp upd = do let ncqFsync = 16 * megabytes let ncqWriteQLen = 1024 * 16 let ncqMinLog = 256 * megabytes + let ncqMaxLog = 16 * gigabytes -- ??? let ncqWriteBlock = 1024 let ncqMaxCached = 128 let ncqIdleThrsh = 50.00 @@ -175,9 +182,11 @@ ncqStorageOpen2 fp upd = do ncqMergeReq <- newTVarIO False ncqMergeSem <- atomically (newTSem 1) ncqSyncNo <- newTVarIO 0 + ncqCurrentFiles <- newTVarIO mempty ncqTrackedFiles <- newTVarIO HPSQ.empty ncqStateVersion <- newTVarIO 0 ncqStateUsage <- newTVarIO mempty + ncqStateName <- newTVarIO Nothing ncqCachedEntries <- newTVarIO 0 ncqStorageTasks <- newTVarIO 0 ncqWrites <- newTVarIO 0 @@ -190,8 +199,9 @@ ncqStorageOpen2 fp upd = do mkdir (ncqGetWorkDir ncq) - ncqRepair ncq - ncqLoadIndexes ncq + liftIO $ withSem ncqMergeSem do + ncqRepair ncq + ncqLoadIndexes ncq pure ncq @@ -407,21 +417,26 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do nwayHashScanAll nway bs $ \_ k _ -> do unless (k == emptyKey) $ atomically do ncqAlterEntrySTM ncq (coerce k) (const Nothing) + -- atomically (modifyTVar' ncqCurrentFiles (HS.delete fk)) loop spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ)) spawnActivity measureWPS - spawnActivity $ forever $ (>> pause @'Seconds 10) do - notice $ yellow "remove unused files" + -- FIXME: bigger-period + spawnActivity $ forever $ (>> pause @'Seconds 300) $ do + ema <- readTVarIO ncqWriteEMA + when (ema < ncqIdleThrsh * 1.5) do + ncqSweepStates ncq + ncqSweepFossils ncq spawnActivity $ fix \again -> (>> again) do ema <- readTVarIO ncqWriteEMA mergeReq <- atomically $ stateTVar ncqMergeReq (,False) - if ema > ncqIdleThrsh || mergeReq then do - pause @'Seconds 2.5 + if ema > ncqIdleThrsh && not mergeReq then do + pause @'Seconds 10 else do mq <- newEmptyTMVarIO @@ -460,6 +475,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do loop RunFin else do (fk,fhx) <- openNewDataFile + atomically $ modifyTVar' ncqCurrentFiles (HS.insert fk) loop $ RunWrite (fk,fhx,0,0) RunSync (fk, fh, w, total, continue) -> do @@ -678,14 +694,14 @@ ncqListTrackedFiles ncq = do <&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f) <&> HS.toList . HS.fromList -ncqListStateFiles :: MonadIO m => NCQStorage2 -> m [FilePath] +ncqListStateFiles :: MonadIO m => NCQStorage2 -> m [(TimeSpec, FilePath)] ncqListStateFiles ncq = do let wd = ncqGetWorkDir ncq dirFiles wd >>= mapM (pure . takeBaseName) <&> List.filter (List.isPrefixOf "state-") >>= mapM (\x -> (,x) <$> timespecOf x) - <&> fmap snd . List.sortOn Down + <&> List.sortOn Down where timespecOf x = liftIO do @@ -728,8 +744,8 @@ ncqLoadIndexes ncq@NCQStorage2{..} = do ncqRepair :: MonadIO m => NCQStorage2 -> m () -ncqRepair me@NCQStorage2{} = do - states <- ncqListStateFiles me +ncqRepair me@NCQStorage2{..} = do + states <- ncqListStateFiles me <&> fmap snd fossils <- flip fix states $ \next -> \case [] -> do @@ -739,6 +755,7 @@ ncqRepair me@NCQStorage2{} = do (s:ss) -> tryLoadState s >>= \case Just files -> do debug $ yellow "used state" <+> pretty s + atomically $ writeTVar ncqStateName (Just $ takeFileName s) pure files Nothing -> do warn $ red "inconsistent state" <+> pretty s @@ -750,10 +767,7 @@ ncqRepair me@NCQStorage2{} = do where - readState path = liftIO do - keys <- BS8.readFile (ncqGetFileName me path) - <&> filter (not . BS8.null) . BS8.lines - pure $ fmap (DataFile . coerce @_ @FileKey) keys + readState path = ncqReadStateKeys me path <&> fmap DataFile tryLoadState path = liftIO do @@ -879,39 +893,62 @@ ncqStateUpdate me@NCQStorage2{..} ops' = flip runContT pure $ callCC \exit -> do pure (c && k1 /= keys0) - when changed (lift $ ncqDumpCurrentState me) + when changed $ liftIO do + name <- ncqDumpCurrentState me + atomically $ writeTVar ncqStateName (Just name) pure changed -ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m () +ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m FilePath ncqDumpCurrentState me@NCQStorage2{..} = do keys <- readTVarIO ncqTrackedFiles <&> List.sort . HPSQ.keys name <- ncqGetNewStateName me writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | k <- keys]) + pure name + +ncqMergeFull :: forall m . MonadUnliftIO m => NCQStorage2 -> m () +ncqMergeFull me = fix \next -> ncqMergeStep me >>= \case + False -> none + True -> next + -- FIXME: sometime-causes-no-such-file-or-directory ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool -ncqMergeStep ncq@NCQStorage2{..} = withSem $ ncqRunTask ncq False $ flip runContT pure do +ncqMergeStep ncq@NCQStorage2{..} = do + withSem ncqMergeSem $ ncqRunTask ncq False $ flip runContT pure $ liftIO do - liftIO do + debug "ncqMergeStep" tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList - files <- for tracked $ \(f,p,_) -> do + + files <- for tracked $ \(f,p,e) -> do + let fn = ncqGetFileName ncq (toFileName $ DataFile f) let idx = ncqGetFileName ncq (toFileName $ IndexFile f) - sz <- liftIO (fileSize fn) + + dataHere <- doesFileExist fn + + sz <- case e of + Just PendingEntry -> pure (-1) + _ | dataHere -> liftIO (fileSize fn) + | otherwise -> pure (-1) + idxHere <- doesFileExist idx + pure (f, sz, p, idxHere) - let bothIdx a b = view _4 a && view _4 b + let bothOk (_, sz1, _, here1) (_, sz2, _, here2) = + here1 && here2 + && sz1 > 0 && sz2 > 0 + && (sz1 + sz2) < fromIntegral ncqMaxLog let found = flip fix (files, Nothing, Nothing) $ \next -> \case ([], _, r) -> r - (a:b:rest, Nothing, _) | bothIdx a b -> do + (a:b:rest, Nothing, _) | bothOk a b -> do next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) - (a:b:rest, Just s, _ ) | bothIdx a b && view _2 a + view _2 b < s -> do + (a:b:rest, Just s, _ ) | bothOk a b && view _2 a + view _2 b < s -> do next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) (_:rest, s, r) -> do @@ -923,10 +960,6 @@ ncqMergeStep ncq@NCQStorage2{..} = withSem $ ncqRunTask ncq False $ flip runCon where - withSem m = bracket enter leave (const m) - where enter = atomically (waitTSem ncqMergeSem) - leave = const $ atomically (signalTSem ncqMergeSem) - ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewMergeName n@NCQStorage2{} = do let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data") @@ -1143,6 +1176,72 @@ ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do readTVarIO profit <&> fromIntegral +ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> FilePath -> m [FileKey] +ncqReadStateKeys me path = liftIO do + keys <- BS8.readFile (ncqGetFileName me path) + <&> filter (not . BS8.null) . BS8.lines + pure $ fmap (coerce @_ @FileKey) keys + +ncqSweepFossils :: forall m . MonadUnliftIO m => NCQStorage2 -> m () +ncqSweepFossils me@NCQStorage2{..} = withSem ncqMergeSem do + debug $ yellow "sweep fossils" + + -- better be safe than sorry + + current <- readTVarIO ncqCurrentFiles + + -- >>= mapM (try @_ @IOException . ncqReadStateKeys me) + mentioned <- ncqListStateFiles me <&> fmap snd + >>= mapM (ncqReadStateKeys me) + <&> HS.fromList . mconcat + + sfs <- ncqListStateFiles me <&> fmap snd + + debug $ "STATE FILES" <+> vcat (fmap pretty sfs) + + active <- readTVarIO ncqTrackedFiles <&> HS.fromList . HPSQ.keys + + used' <- readTVarIO ncqStateUsage <&> IntMap.elems + + let used = current + <> active + <> mentioned + <> HS.unions [ keys | (n, keys) <- used', n > 0 ] + + kicked <- ncqListTrackedFiles me + <&> fmap (fromString @FileKey) + <&> filter (\x -> not (HS.member x used)) + <&> HS.toList . HS.fromList + + debug $ "KICK" <+> vcat (fmap pretty kicked) + + debug $ "LIVE SET" <+> vcat (fmap pretty (HS.toList used)) + + for_ kicked $ \fo -> do + debug $ "sweep fossil file" <+> pretty fo + rm (ncqGetFileName me (toFileName (IndexFile fo))) + rm (ncqGetFileName me (toFileName (DataFile fo))) + +ncqSweepStates :: MonadUnliftIO m => NCQStorage2 -> m () +ncqSweepStates me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure do + debug $ yellow "remove unused states" + current' <- readTVarIO ncqStateName + + current <- ContT $ for_ current' + + states <- ncqListStateFiles me <&> fmap snd + + flip fix (Left states) $ \next -> \case + Left [] -> none + Right [] -> none + Left (x:xs) | x == current -> next (Right xs) + | otherwise -> next (Left xs) + + Right (x:xs) -> do + debug $ "Remove obsolete state" <+> pretty x + rm (ncqGetFileName me x) + next (Right xs) + writeFiltered :: forall m . MonadIO m => NCQStorage2 @@ -1213,3 +1312,9 @@ appendTailSection fh = liftIO do void (appendSection fh (fileTailRecord s)) {-# INLINE appendTailSection #-} + +withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a +withSem sem m = bracket enter leave (const m) + where enter = atomically (waitTSem sem) + leave = const $ atomically (signalTSem sem) + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 3c0bc1e2..b485c86e 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -602,7 +602,7 @@ testNCQ2Simple1 TestEnv{..} = do g <- liftIO MWC.createSystemRandom - bz <- replicateM 30000 $ liftIO do + bz <- replicateM 100000 $ liftIO do n <- (`mod` (256*1024)) <$> uniformM @Int g uniformByteStringM n g @@ -613,11 +613,19 @@ testNCQ2Simple1 TestEnv{..} = do found <- ncqLocate2 sto h <&> maybe (-1) ncqEntrySize assertBool (show $ "found-immediate" <+> pretty h) (found > 0) + ncqWithStorage ncqDir $ \sto -> liftIO do + notice "perform merge" + ncqMergeFull sto + + ncqWithStorage ncqDir $ \sto -> liftIO do + ncqSweepStates sto + ncqSweepFossils sto + ncqWithStorage ncqDir $ \sto -> liftIO do hashes <- atomically (STM.flushTQueue q) for_ hashes $ \ha -> do found <- ncqLocate2 sto ha <&> maybe (-1) ncqEntrySize - assertBool (show $ "found-immediate" <+> pretty ha) (found > 0) + assertBool (show $ "found" <+> pretty ha) (found > 0) -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found)