wip, merge storage

This commit is contained in:
Dmitry Zuykov 2025-05-16 10:01:22 +03:00
parent 90b9204e58
commit 31a476a73a
2 changed files with 209 additions and 10 deletions

View File

@ -44,6 +44,7 @@ import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8
import Data.Char (isDigit)
import Data.Fixed
import Data.Coerce
import Data.Word
import Data.Either
@ -65,6 +66,7 @@ import System.Posix.IO.ByteString as Posix
import System.Posix.Unistd
import System.Posix.Files ( getFileStatus
, modificationTimeHiRes
, setFileTimesHiRes
, getFdStatus
, FileStatus(..)
, setFileMode
@ -92,6 +94,7 @@ data NCQStorageException =
| NCQStorageTimeout
| NCQStorageCurrentAlreadyOpen
| NCQStorageCantOpenCurrent
| NCQMergeInvariantFailed String
deriving stock (Show,Typeable)
instance Exception NCQStorageException
@ -153,6 +156,7 @@ data NCQStorage =
, ncqCurrentUsage :: TVar (IntMap Int)
, ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString))
, ncqFlushNow :: TVar [TQueue ()]
, ncqMergeReq :: TVar Int
, ncqOpenDone :: TMVar Bool
, ncqStopped :: TVar Bool
}
@ -201,6 +205,12 @@ ncqGetNewFossilName n@NCQStorage{} = do
let (p,tpl) = splitFileName fn
liftIO $ emptyTempFile p tpl
ncqGetNewMergeName :: MonadIO m => NCQStorage -> m FilePath
ncqGetNewMergeName n@NCQStorage{} = do
let fn = ncqGetFileName n "merge-.data"
let (p,tpl) = splitFileName fn
liftIO $ emptyTempFile p tpl
ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath
ncqGetIndexFileName ncq fk = do
ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq")
@ -244,7 +254,7 @@ ncqAddTrackedFilesIO ncq fps = do
let dataFile = ncqGetDataFileName ncq fp
stat <- getFileStatus dataFile
let ts = modificationTimeHiRes stat
pure $ Just (fp, TimeSpec (floor ts) 0))
pure $ Just (fp, posixToTimeSpec ts))
(\e -> do
err $ "ncqAddTrackedFilesIO: failed to stat " <+> viaShow e
pure Nothing)
@ -279,7 +289,7 @@ ncqWriteError ncq txt = liftIO do
let msg = Text.pack $ show $ "error" <+> fill 12 (pretty p) <+> pretty txt <> line
Text.appendFile (ncqGetErrorLogName ncq) msg
ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m (FilePath, [HashRef])
ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m FilePath
ncqIndexFile n@NCQStorage{} fp' = do
let fp = ncqGetFileName n fp'
@ -301,7 +311,7 @@ ncqIndexFile n@NCQStorage{} fp' = do
mv result fp
pure (fp, fmap (coerce @_ @HashRef . fst) items)
pure fp
ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageStop ncq@NCQStorage{..} = do
@ -326,8 +336,9 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
reader <- makeReader
writer <- makeWriter indexQ
indexer <- makeIndexer writer indexQ
merge <- makeMerge
mapM_ waitCatch [writer,indexer]
mapM_ waitCatch [writer,indexer,merge]
-- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter]
mapM_ cancel [reader]
@ -338,6 +349,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
False -> loop
_ -> debug "STOPPING THREAD"
micropause :: forall a m . (IsTimeout a, MonadUnliftIO m) => Timeout a -> m ()
micropause p = do
void $ race @m (pause p) $
atomically do
s <- readTVar ncqStopped
unless s STM.retry
makeReader = do
cap <- getNumCapabilities
reader <- ContT $ withAsync $ untilStopped do
@ -360,6 +378,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
link reader
pure reader
makeMerge = do
me <- ContT $ withAsync $ untilStopped do
micropause @'Seconds 10
debug "MERGE THREAD"
link me
pure me
makeWriter indexQ = do
@ -411,7 +436,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
debug $ "FUCKING WRITE INDEX" <+> pretty fn
(key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList
key <- ncqIndexFile ncq fn
ncqAddTrackedFilesIO ncq [key]
ncqLoadSomeIndexes ncq [fromString key]
@ -512,13 +537,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
mv current fossilized
atomically do
writeTVar ncqIndexNow 0
-- NOTE: extra-use
-- добавляем лишний 1 для индексации.
-- исходный файл закрываем, только когда проиндексировано.
-- то есть должны отнять 1 после индексации.
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fdr) 1)
writeTQueue indexQ (fdr, fossilized)
writeTVar ncqIndexNow 0
closeFd fh
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0)
@ -872,7 +897,7 @@ ncqFixIndexes ncq@NCQStorage{..} = do
unless here do
warn $ "missed-index" <+> pretty k
let dataName = ncqGetDataFileName ncq k
(newKey,_) <- ncqIndexFile ncq dataName
newKey <- ncqIndexFile ncq dataName
ncqAddTrackedFilesIO ncq [newKey]
@ -976,9 +1001,9 @@ ncqStorageInit_ check path = do
let ncqRoot = path
let ncqSyncSize = 64 * (1024 ^ 2)
let ncqMinLog = 2 * (1024 ^ 3)
let ncqMaxLog = 10 * (1024 ^ 3)
let ncqSyncSize = 64 * (1024 ^ 2)
let ncqMinLog = 512 * (1024 ^ 2)
let ncqMaxLog = 4 * (1024 ^ 3)
let ncqMaxCached = 64
@ -1002,6 +1027,7 @@ ncqStorageInit_ check path = do
ncqIndexNow <- newTVarIO 0
ncqCurrentFd <- newTVarIO Nothing
ncqIndexed <- newTVarIO mempty
ncqMergeReq <- newTVarIO 0
let currentName = ncqGetCurrentName_ path ncqGen
@ -1068,3 +1094,136 @@ withNCQ setopts p action = flip runContT pure do
wait writer
pure e
ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ
ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageMergeStep ncq@NCQStorage{..} = do
tracked <- readTVarIO ncqTrackedFiles
<&> HPSQ.toList
<&> fmap (over _2 (coerce @_ @TimeSpec))
<&> List.sortOn (view _2)
<&> List.take 2
for_ tracked $ \(f, t, _) -> do
debug $ "FILE TO MERGE" <+> pretty (realToFrac @_ @(Fixed E6) t) <+> pretty f
mergeStep (fmap (view _1) tracked)
where
writeFiltered :: forall m . MonadIO m
=> FilePath
-> Handle
-> ( Integer -> Integer -> HashRef -> ByteString -> m Bool)
-> m ()
writeFiltered fn out filt = do
ncqStorageScanDataFile ncq fn $ \o s k v -> do
skip <- filt o s k v <&> not
when skip do
debug $ pretty k <+> pretty "skipped"
unless skip $ liftIO do
BS.hPut out (LBS.toStrict (makeEntryLBS k v))
mergeStep [] = none
mergeStep [_] = none
mergeStep [b,a] = do
warn $ "merge" <+> pretty a <+> pretty b
let fDataNameA = ncqGetDataFileName ncq a
let fIndexNameA = ncqGetIndexFileName ncq a
let fDataNameB = ncqGetDataFileName ncq b
let fIndexNameB = ncqGetIndexFileName ncq b
warn $ "file A" <+> pretty fDataNameA <+> pretty fIndexNameA
warn $ "file B" <+> pretty fDataNameB <+> pretty fIndexNameB
doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA)
doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB)
doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA)
flip runContT pure $ callCC \exit -> do
mfile <- ncqGetNewMergeName ncq
ContT $ bracket none $ const do
rm mfile
liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do
debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile)
(mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA
>>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA))
debug $ "SCAN FILE A" <+> pretty fDataNameA
writeFiltered fDataNameA fwh $ \_ _ _ v -> do
pure $ not (ncqIsTomb (LBS.fromStrict v))
debug $ "SCAN FILE B" <+> pretty fDataNameA
writeFiltered fDataNameB fwh $ \_ _ k v -> do
let tomb = ncqIsTomb (LBS.fromStrict v)
foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust
let skip = tomb || foundInA
pure $ not skip
result <- fileSize mfile
when (result == 0) $ exit ()
liftIO do
fossil <- ncqGetNewFossilName ncq
mv mfile fossil
statA <- getFileStatus fDataNameA
let ts = modificationTimeHiRes statA
setFileTimesHiRes fossil ts ts
fname <- ncqIndexFile ncq fossil
atomically do
let fp = fromString fname
modifyTVar ncqTrackedFiles (HPSQ.delete a)
modifyTVar ncqTrackedFiles (HPSQ.delete b)
ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)]
mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA]
mergeStep _ = do
mergeError "assertion failed: more than 2 files to merge"
mergeError d = throwIO (NCQMergeInvariantFailed (show d))
orFail what e = do
r <- what
unless r (throwIO (NCQMergeInvariantFailed (show e)))
makeEntryLBS h bs = do
let b = byteString (coerce @_ @ByteString h)
<> byteString bs
let wbs = toLazyByteString b
let len = LBS.length wbs
let ws = byteString (N.bytestring32 (fromIntegral len))
toLazyByteString (ws <> b)
posixToTimeSpec :: POSIXTime -> TimeSpec
posixToTimeSpec pt =
let (s, frac) = properFraction pt :: (Integer, POSIXTime)
ns = round (frac * 1e9)
in TimeSpec (fromIntegral s) ns

View File

@ -148,6 +148,11 @@ main = do
<&> fmap fst
>>= orThrow (TCQGone p)
let getTCQ (TCQ p) = do
readTVarIO instances
<&> HM.lookup p
>>= orThrow (TCQGone p)
let dict = makeDict @C do
entry $ bindMatch "--help" $ nil_ \case
@ -223,6 +228,19 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:merge:step" $ \syn -> lift do
tcq <- case syn of
[ isOpaqueOf @TCQ -> Just tcq ] -> do
pure tcq
e -> throwIO $ BadFormException @C (mkList e)
ncq <- getNCQ tcq
ncqStorageMergeStep ncq
pure nil
entry $ bindMatch "ncq:close" $ nil_ \case
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
ncq <- getNCQ tcq
@ -279,6 +297,14 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:stop" $ nil_ \case
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
(ncq, w) <- getTCQ tcq
ncqStorageStop ncq
debug "wait storage to stop"
wait w
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:set:ref" $ \case
[ isOpaqueOf @TCQ -> Just tcq, HashLike ref , HashLike val ] -> lift do
@ -343,6 +369,20 @@ main = do
r <- ncqStoragePutBlock ncq bs
pure $ maybe nil (mkSym . show . pretty) r
entry $ bindMatch "ncq:merkle:hashes" $ \case
[ isOpaqueOf @TCQ -> Just tcq, HashLike h ] -> lift do
ncq <- getNCQ tcq
liftIO do
let sto = AnyStorage ncq
mkList <$> S.toList_ do
walkMerkle (coerce h) (getBlock sto) $ \case
Left{} -> throwIO MissedBlockError
Right (hrr :: [HashRef]) -> do
forM_ hrr $ \hx -> do
S.yield (mkSym $ show $ pretty hx)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:merkle:write" $ \syn -> do
(tcq,fname) <- case syn of
[ isOpaqueOf @TCQ -> Just tcq, StringLike f ] -> lift do