This commit is contained in:
voidlizard 2025-07-11 10:34:52 +03:00
parent d1aa0a6f2c
commit 930c824dbb
2 changed files with 336 additions and 62 deletions

View File

@ -127,6 +127,7 @@ data NCQStorage2 =
, ncqMemTable :: Vector Shard
, ncqWriteSem :: TSem
, ncqWriteQ :: TVar (Seq HashRef)
, ncqStorageTasks :: TVar Int
, ncqStorageStopReq :: TVar Bool
, ncqStorageSyncReq :: TVar Bool
, ncqSyncNo :: TVar Int
@ -156,6 +157,7 @@ ncqStorageOpen2 fp upd = do
ncqSyncNo <- newTVarIO 0
ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqCachedEntries <- newTVarIO 0
ncqStorageTasks <- newTVarIO 0
let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk"
@ -188,6 +190,10 @@ ncqGetWorkDir NCQStorage2{..} = ncqRoot </> show ncqGen
ncqGetLockFileName :: NCQStorage2 -> FilePath
ncqGetLockFileName ncq = ncqGetFileName ncq ".lock"
ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewFossilName ncq = do
liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data"
ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqStorageStop2 NCQStorage2{..} = do
atomically $ writeTVar ncqStorageStopReq True
@ -331,7 +337,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
maybe1 what none $ \(fk, fh) -> do
closeFd fh
-- notice $ yellow "indexing" <+> pretty fname
idx <- ncqIndexFile ncq (DataFile fk)
idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk))
ncqAddTrackedFile ncq (DataFile fk)
nwayHashMMapReadOnly idx >>= \case
Nothing -> err $ "can't open index" <+> pretty idx
@ -376,9 +382,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
rest <- if not (sync || needClose || w > ncqFsync) then
pure w
else liftIO do
s <- Posix.fileSize <$> Posix.getFdStatus fh
void (appendSection fh (fileTailRecord s))
fileSynchronise fh
appendTailSection fh >> fileSynchronise fh
atomically do
writeTVar ncqStorageSyncReq False
modifyTVar' ncqSyncNo succ
@ -420,40 +424,10 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do
emptyKey = BS.replicate ncqKeyLen 0
zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload
where zeroPayload = N.bytestring64 0
zeroHash = HashRef (hashObject zeroPayload)
{-# INLINE zeroSyncEntry #-}
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
{-# INLINE zeroSyncEntrySize #-}
-- 1. It's B-record
-- 2. It's last w64be == fileSize
-- 3. It's hash == hash (bytestring64be fileSize)
-- 4. recovery-strategy: start-to-end, end-to-start
fileTailRecord w = do
-- on open: last w64be == fileSize
let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize)
let h = hashObject @HbSync paylo & coerce
ncqMakeSectionBS (Just M) h paylo
{-# INLINE fileTailRecord #-}
appendSection :: forall m . MonadUnliftIO m
=> Fd
-> ByteString
-> m Int -- (FOff, Int)
appendSection fh section = do
-- off <- liftIO $ fdSeek fh SeekFromEnd 0
-- pure (fromIntegral off, fromIntegral len)
liftIO (Posix.fdWrite fh section) <&> fromIntegral
{-# INLINE appendSection #-}
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
openNewDataFile = do
fname <- liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data"
fname <- ncqGetNewFossilName ncq
touch fname
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
(fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
@ -535,9 +509,14 @@ ncqAddTrackedFile ncq@NCQStorage2{..} fkey = flip runContT pure $ callCC \exit -
-- FIXME: maybe-creation-time-actually
let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat
let fk = fromString (takeFileName fname)
atomically do
atomically $ ncqAddTrackedFileSTM ncq fk ts
pure True
ncqAddTrackedFileSTM :: NCQStorage2 -> FileKey -> TimeSpec -> STM ()
ncqAddTrackedFileSTM NCQStorage2{..} fk ts = do
modifyTVar' ncqTrackedFiles (HPSQ.insert fk (FilePrio (Down ts)) Nothing)
pure True
{-# INLINE ncqAddTrackedFileSTM #-}
evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM ()
evictIfNeededSTM NCQStorage2{..} howMany = do
@ -640,7 +619,202 @@ ncqRepair me@NCQStorage2{} = do
Right{} -> do
err $ "skip indexing" <+> pretty dataFile
ncqRefHash :: NCQStorage2 -> HashRef -> HashRef
ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt))
ncqRunTaskNoMatterWhat :: MonadUnliftIO m => NCQStorage2 -> m a -> m a
ncqRunTaskNoMatterWhat NCQStorage2{..} task = do
atomically (modifyTVar ncqStorageTasks succ)
task `finally` atomically (modifyTVar ncqStorageTasks pred)
ncqRunTask :: MonadUnliftIO m => NCQStorage2 -> a -> m a -> m a
ncqRunTask ncq@NCQStorage2{..} def task = readTVarIO ncqStorageStopReq >>= \case
True -> pure def
False -> ncqRunTaskNoMatterWhat ncq task
ncqWaitTasks :: MonadUnliftIO m => NCQStorage2 -> m ()
ncqWaitTasks NCQStorage2{..} = atomically do
tno <- readTVar ncqStorageTasks
when (tno > 0) STM.retry
ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool
ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT pure do
liftIO do
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
files <- for tracked $ \(f,p,_) -> do
let fn = ncqGetFileName ncq (toFileName $ DataFile f)
let idx = ncqGetFileName ncq (toFileName $ IndexFile f)
sz <- liftIO (fileSize fn)
idxHere <- doesFileExist idx
pure (f, sz, p, idxHere)
let bothIdx a b = view _4 a && view _4 b
let found = flip fix (files, Nothing, Nothing) $ \next -> \case
([], _, r) -> r
(a:b:rest, Nothing, _) | bothIdx a b -> do
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
(a:b:rest, Just s, _ ) | bothIdx a b && view _2 a + view _2 b < s -> do
next (b:rest, Just (view _2 a + view _2 b), Just (a,b))
(_:rest, s, r) -> do
next (rest, s, r)
case found of
Just (a,b) -> mergeStep a b >> pure True
_ -> pure False
where
ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath
ncqGetNewMergeName n@NCQStorage2{} = do
let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data")
liftIO $ emptyTempFile p tpl
mergeStep (a,_,p1,_) (b,_,p2,_) = do
debug $ "merge" <+> pretty a <+> pretty b
let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a)
let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a)
let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b)
let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b)
debug $ "file A" <+> pretty (timeSpecFromFilePrio p1) <+> pretty fDataNameA <+> pretty fIndexNameA
debug $ "file B" <+> pretty (timeSpecFromFilePrio p2) <+> pretty fDataNameB <+> pretty fIndexNameB
doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA)
doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB)
doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA)
flip runContT pure $ callCC \exit -> do
mfile <- ncqGetNewMergeName ncq
ContT $ bracket none $ const do
rm mfile
liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do
debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile)
(mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA))
debug $ "SCAN FILE A" <+> pretty fDataNameA
writeFiltered ncq fDataNameA fwh $ \_ _ _ _ -> do
pure True
debug $ "SCAN FILE B" <+> pretty fDataNameA
writeFiltered ncq fDataNameB fwh $ \_ _ k _ -> do
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
let skip = foundInA
pure $ not skip
appendTailSection =<< handleToFd fwh
liftIO do
result <- fileSize mfile
idx <- if result == 0 then
pure Nothing
else do
fossil <- ncqGetNewFossilName ncq
mv mfile fossil
statA <- getFileStatus fDataNameA
let ts = modificationTimeHiRes statA
setFileTimesHiRes fossil ts ts
let fk = DataFile (fromString fossil)
void $ ncqIndexFile ncq fk
pure $ Just (ts,fk)
atomically do
modifyTVar ncqTrackedFiles (HPSQ.delete a)
modifyTVar ncqTrackedFiles (HPSQ.delete b)
for_ idx $ \(ts,fk) -> do
ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts)
mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA]
orFail what e = do
r <- what
unless r (throwIO (NCQMergeInvariantFailed (show e)))
writeFiltered :: forall m . MonadIO m
=> NCQStorage2
-> FilePath
-> Handle
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
-> m ()
writeFiltered ncq fn out filt = do
ncqStorageScanDataFile ncq fn $ \o s k v -> do
skip <- filt o s k v <&> not
when skip do
debug $ pretty k <+> pretty "skipped"
unless skip $ liftIO do
BS.hPut out (LBS.toStrict (makeEntryLBS k v))
where
makeEntryLBS h bs = do
let b = byteString (coerce @_ @ByteString h)
<> byteString bs
let wbs = toLazyByteString b
let len = LBS.length wbs
let ws = byteString (N.bytestring32 (fromIntegral len))
toLazyByteString (ws <> b)
zeroSyncEntry :: ByteString
zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload
where zeroPayload = N.bytestring64 0
zeroHash = HashRef (hashObject zeroPayload)
{-# INLINE zeroSyncEntry #-}
zeroSyncEntrySize :: Word64
zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry)
{-# INLINE zeroSyncEntrySize #-}
-- 1. It's B-record
-- 2. It's last w64be == fileSize
-- 3. It's hash == hash (bytestring64be fileSize)
-- 4. recovery-strategy: start-to-end, end-to-start
fileTailRecord :: Integral a => a -> ByteString
fileTailRecord w = do
-- on open: last w64be == fileSize
let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize)
let h = hashObject @HbSync paylo & coerce
ncqMakeSectionBS (Just M) h paylo
{-# INLINE fileTailRecord #-}
appendSection :: forall m . MonadUnliftIO m
=> Fd
-> ByteString
-> m Int -- (FOff, Int)
appendSection fh sect = do
-- off <- liftIO $ fdSeek fh SeekFromEnd 0
-- pure (fromIntegral off, fromIntegral len)
liftIO (Posix.fdWrite fh sect) <&> fromIntegral
{-# INLINE appendSection #-}
appendTailSection :: MonadIO m => Fd -> m ()
appendTailSection fh = liftIO do
s <- Posix.fileSize <$> Posix.getFdStatus fh
void (appendSection fh (fileTailRecord s))
{-# INLINE appendTailSection #-}

View File

@ -621,6 +621,99 @@ testNCQ2Simple1 TestEnv{..} = do
-- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found)
genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString
genRandomBS g n = do
n <- (`mod` (64*1024)) <$> uniformM @Int g
uniformByteStringM n g
sec6 :: RealFrac a => a -> Fixed E6
sec6 = realToFrac
sec2 :: RealFrac a => a -> Fixed E2
sec2 = realToFrac
sec3 :: RealFrac a => a -> Fixed E3
sec3 = realToFrac
testNCQ2Merge1 :: MonadUnliftIO m
=> Int
-> TestEnv
-> m ()
testNCQ2Merge1 n TestEnv{..} = do
let tmp = testEnvDir
let ncqDir = tmp
g <- liftIO MWC.createSystemRandom
let fake = n `div` 3
ncqWithStorage ncqDir $ \sto -> liftIO do
notice $ "write" <+> pretty n <+> "random blocks"
ws <- flip fix (mempty :: HashSet HashRef) $ \loop -> \case
hs | HS.size hs >= n -> pure hs
| otherwise -> do
s <- liftIO $ genRandomBS g (256 * 1024)
h <- ncqPutBS sto (Just B) Nothing s
loop (HS.insert h hs)
notice $ "written" <+> pretty (HS.size ws)
assertBool "all written" (HS.size ws == n)
nHashes <- HS.fromList . filter (not . flip HS.member ws) <$> replicateM fake do
liftIO (genRandomBS g (64*1024)) <&> HashRef . hashObject
notice $ "gen" <+> pretty (HS.size nHashes) <+> pretty "missed hashes"
(t1,n1) <- over _2 sum <$> timeItT do
for (HS.toList ws) $ \h -> do
r <- ncqLocate2 sto h
unless (isJust r) do
err $ "not found" <+> pretty h
pure $ maybe 0 (const 1) r
notice $ pretty (sec3 t1) <+> pretty n1 <+> pretty (n1 == HS.size ws)
assertBool "all written" (n1 == HS.size ws)
ncqWaitTasks sto
let hashes = HS.toList ws <> HS.toList nHashes
(t2,_) <- timeItT do
for hashes $ \h -> do
r <- ncqLocate2 sto h
pure $ maybe 0 (const 1) r
notice $ "before-merge" <+> pretty (sec3 t1) <+> pretty (List.length hashes)
notice $ "merge whatever possible"
n <- flip fix 0 \next i -> do
N2.ncqStorageMergeStep sto >>= \case
False -> pure i
True -> next (succ i)
notice $ "merged" <+> pretty n
(t3,r) <- timeItT do
for hashes $ \h -> do
ncqLocate2 sto h >>= \case
Nothing -> pure $ Left h
Just{} -> pure $ Right h
let w1 = HS.fromList (rights r)
let n2 = HS.fromList (lefts r)
notice $ "after-merge" <+> pretty (sec3 t3) <+> pretty (HS.size w1) <+> pretty (HS.size n2)
testFilterEmulate1 :: MonadUnliftIO m
=> Int
-> TestEnv
@ -945,30 +1038,6 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:concurrent1" $ nil_ $ \case
[ LitIntVal tn, LitIntVal n ] -> do
debug $ "ncq2:concurrent1" <+> pretty tn <+> pretty n
runTest $ testNCQ2Concurrent1 False ( fromIntegral tn) (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do
runTest testNCQ2Simple1
entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do
runTest testNCQ2Repair1
entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case
[ StringLike fn ] -> do
ncqFileFastCheck fn
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case
[ LitIntVal tn, LitIntVal n ] -> do
runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case
[ LitIntVal tn, LitIntVal n ] -> do
@ -1005,6 +1074,37 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:merge1" $ nil_ $ \case
[ LitIntVal n ] -> do
runTest $ testNCQ2Merge1 (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:concurrent1" $ nil_ $ \case
[ LitIntVal tn, LitIntVal n ] -> do
debug $ "ncq2:concurrent1" <+> pretty tn <+> pretty n
runTest $ testNCQ2Concurrent1 False ( fromIntegral tn) (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do
runTest testNCQ2Simple1
entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do
runTest testNCQ2Repair1
entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case
[ StringLike fn ] -> do
ncqFileFastCheck fn
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case
[ LitIntVal tn, LitIntVal n ] -> do
runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:filter:emulate-1" $ nil_ $ \case
[ LitIntVal n ] -> runTest $ testFilterEmulate1 (fromIntegral n)