This commit is contained in:
voidlizard 2025-07-11 12:37:13 +03:00
parent 930c824dbb
commit e5b4b27901
2 changed files with 216 additions and 8 deletions

View File

@ -47,7 +47,7 @@ import Data.IntMap (IntMap)
import Data.IntSet qualified as IntSet
import Data.IntSet (IntSet)
import Data.Sequence qualified as Seq
import Data.Sequence (Seq(..), (|>))
import Data.Sequence (Seq(..), (|>),(<|))
import Data.List qualified as List
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Lazy.Char8 qualified as LBS8
@ -110,6 +110,10 @@ type Shard = TVar (HashMap HashRef NCQEntry)
type NCQOffset = Word64
type NCQSize = Word32
data NCQFlag =
NCQMergeNow | NCQCompactNow
deriving (Eq,Ord,Generic)
data Location =
InFossil ByteString NCQOffset NCQSize
| InMemory ByteString
@ -124,7 +128,8 @@ data NCQStorage2 =
, ncqWriteBlock :: Int
, ncqMinLog :: Int
, ncqMaxCached :: Int
, ncqMemTable :: Vector Shard
, ncqIdleThrsh :: Double
, ncqMemTable :: Vector Shard
, ncqWriteSem :: TSem
, ncqWriteQ :: TVar (Seq HashRef)
, ncqStorageTasks :: TVar Int
@ -133,8 +138,10 @@ data NCQStorage2 =
, ncqSyncNo :: TVar Int
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
, ncqCachedEntries :: TVar Int
} deriving (Generic)
, ncqWrites :: TVar Int
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
, ncqJobQ :: TQueue (IO ())
}
megabytes :: forall a . Integral a => a
megabytes = 1024 ^ 2
@ -148,6 +155,8 @@ ncqStorageOpen2 fp upd = do
let ncqMinLog = 256 * megabytes
let ncqWriteBlock = 1024
let ncqMaxCached = 128
let ncqIdleThrsh = 50.00
cap <- getNumCapabilities <&> fromIntegral
ncqWriteQ <- newTVarIO mempty
ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap)
@ -158,6 +167,9 @@ ncqStorageOpen2 fp upd = do
ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqCachedEntries <- newTVarIO 0
ncqStorageTasks <- newTVarIO 0
ncqWrites <- newTVarIO 0
ncqWriteEMA <- newTVarIO 0.00
ncqJobQ <- newTQueueIO
let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk"
@ -194,6 +206,11 @@ ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewFossilName ncq = do
liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data"
ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewCompactName n@NCQStorage2{} = do
let (p,tpl) = splitFileName (ncqGetFileName n "compact-.data")
liftIO $ emptyTempFile p tpl
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqStorageStop2 NCQStorage2{..} = do
atomically $ writeTVar ncqStorageStopReq True
@ -225,6 +242,8 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do
let bs = ncqMakeSectionBS mtp h bs'
atomically do
waitTSem ncqWriteSem
modifyTVar' ncqWrites succ
stop <- readTVar ncqStorageStopReq
filled <- readTVar ncqWriteQ <&> Seq.length
@ -233,6 +252,7 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do
ncqAlterEntrySTM ncq h $ \case
Just e -> Just e
Nothing -> Just (NCQEntry bs)
modifyTVar' ncqWriteQ (|> h)
signalTSem ncqWriteSem
@ -349,8 +369,18 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
link closer
jobz <- ContT $ withAsync $ forever (atomically (readTQueue jobQ) >>= join)
link jobz
spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ))
spawnActivity measureWPS
spawnActivity $ forever do
ema <- readTVarIO ncqWriteEMA
when (ema < ncqIdleThrsh) do
debug "SPAWN MERGE"
spawnJob $ void (ncqStorageMergeStep ncq)
pause @'Seconds 10
ContT $ bracket none $ const $ liftIO do
fhh <- atomically (STM.flushTQueue closeQ)
@ -422,8 +452,8 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
where
emptyKey = BS.replicate ncqKeyLen 0
emptyKey = BS.replicate ncqKeyLen 0
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
openNewDataFile = do
@ -432,6 +462,33 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
(fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
spawnJob :: IO () -> m ()
spawnJob m = atomically $ writeTQueue ncqJobQ m
spawnActivity m = do
a <- ContT $ withAsync m
link a
pure a
measureWPS = void $ flip fix Nothing \loop -> \case
Nothing -> do
w <- readTVarIO ncqWrites
t <- getTimeCoarse
pause @'Seconds step >> loop (Just (w,t))
Just (w0,t0) -> do
w1 <- readTVarIO ncqWrites
t1 <- getTimeCoarse
let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9
dw = fromIntegral (w1 - w0)
atomically $ modifyTVar' ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema
pause @'Seconds step >> loop (Just (w1,t1))
where
alpha = 0.1
step = 1.00
ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m ()
ncqFileFastCheck fp = do
mmaped <- liftIO $ mmapFileByteString fp Nothing
@ -749,6 +806,144 @@ ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT
unless r (throwIO (NCQMergeInvariantFailed (show e)))
ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqCompact ncq@NCQStorage2{..} = do
q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) )
ncqLinearScanForCompact ncq $ \fk h -> atomically do
modifyTVar q (HM.insertWith (<>) fk (HS.singleton h))
state0 <- readTVarIO q
for_ (HM.toList state0) $ \(fk, es) -> do
trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es)
let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk)
let fIndexNameA = ncqGetFileName ncq (toFileName (IndexFile fk))
flip runContT pure do
mfile <- ncqGetNewCompactName ncq
ContT $ bracket none $ const do
rm mfile
liftIO do
withBinaryFileAtomic mfile WriteMode $ \fwh -> do
writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
pure $ not $ HS.member k es
appendTailSection =<< handleToFd fwh
result <- fileSize mfile
if result == 0 then do
atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk)
else do
fossil <- ncqGetNewFossilName ncq
mv mfile fossil
statA <- getFileStatus fDataNameA
let ts = modificationTimeHiRes statA
setFileTimesHiRes fossil ts ts
fname <- ncqIndexFile ncq (DataFile (fromString fossil))
atomically do
let fp = fromString fname
modifyTVar ncqTrackedFiles (HPSQ.delete fk)
ncqAddTrackedFileSTM ncq fp (posixToTimeSpec ts)
mapM_ rm [fDataNameA, fIndexNameA]
debug $ "compact done" <+> pretty (HM.size state0)
-- NOTE: incremental
-- now it may became incremental if we'll
-- limit amount of tombs per one pass
-- then remove all dead entries,
-- then call again to remove tombs. etc
-- as for now, seems it should work up to 10TB
-- of storage
ncqLinearScanForCompact :: MonadUnliftIO m
=> NCQStorage2
-> ( FileKey -> HashRef -> m () )
-> m Int
ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
let state0 = mempty :: HashMap HashRef TimeSpec
profit <- newTVarIO 0
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
-- TODO: explicit-unmap-files
flip fix (tracked, state0) $ \next -> \case
([], s) -> none
((fk,p,_):rest, state) -> do
let cqFile = ncqGetFileName ncq (toFileName (IndexFile fk))
let dataFile = ncqGetFileName ncq (toFileName (DataFile fk))
(mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile
>>= orThrow (NWayHashInvalidMetaData cqFile)
let emptyKey = BS.replicate nwayKeySize 0
found <- S.toList_ do
nwayHashScanAll meta mmaped $ \o k entryBs -> do
unless (k == emptyKey) do
let off = N.word64 (BS.take 8 entryBs)
let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs))
when (sz == ncqPrefixLen || sz == ncqPrefixLen + 32) do
S.yield off
let kk = coerce k
case HM.lookup kk state of
Just ts | ts > timeSpecFromFilePrio p -> do
notice $ pretty kk <+> pretty (sz + ncqSLen)
atomically do
modifyTVar profit ( + (sz + ncqSLen) )
modifyTVar tombUse (HM.adjust (over _2 succ) kk)
lift $ lift $ action (fromString dataFile) kk
_ -> none
newEntries <- S.toList_ do
unless (List.null found) do
dataBs <- liftIO $ mmapFileByteString dataFile Nothing
for_ found $ \o -> do
let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs)
when (pre == ncqRefPrefix || pre == ncqTombPrefix) do
let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs)
let key = coerce (BS.copy keyBs)
unless (HM.member key state) do
S.yield (key, timeSpecFromFilePrio p)
when ( pre == ncqTombPrefix ) do
atomically $ modifyTVar tombUse (HM.insert key (fk,0))
next (rest, state <> HM.fromList newEntries)
use <- readTVarIO tombUse
let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ]
for_ useless $ \(f,h) -> do
atomically $ modifyTVar profit (+ncqFullTombLen)
lift $ action f h
readTVarIO profit <&> fromIntegral
writeFiltered :: forall m . MonadIO m
=> NCQStorage2
-> FilePath
@ -789,7 +984,7 @@ zeroSyncEntrySize :: Word64
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
{-# INLINE zeroSyncEntrySize #-}
-- 1. It's B-record
-- 1. It's M-record
-- 2. It's last w64be == fileSize
-- 3. It's hash == hash (bytestring64be fileSize)
-- 4. recovery-strategy: start-to-end, end-to-start

View File

@ -1105,6 +1105,19 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:ema" $ nil_ $ const do
notice "test:ncq2:ema"
runTest $ \TestEnv{..} -> do
g <- liftIO MWC.createSystemRandom
let dir = testEnvDir </> "ncq1"
let n = 50000
ncqWithStorage dir $ \sto -> do
replicateM_ n do
ncqPutBS sto (Just B) Nothing =<< genRandomBS g (256*1024)
notice $ "written" <+> pretty n
pause @'Seconds 120
entry $ bindMatch "test:filter:emulate-1" $ nil_ $ \case
[ LitIntVal n ] -> runTest $ testFilterEmulate1 (fromIntegral n)