diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index d4c68180..da5c4270 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -44,6 +44,7 @@ import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Data.Char (isDigit) +import Data.Fixed import Data.Coerce import Data.Word import Data.Either @@ -65,6 +66,7 @@ import System.Posix.IO.ByteString as Posix import System.Posix.Unistd import System.Posix.Files ( getFileStatus , modificationTimeHiRes + , setFileTimesHiRes , getFdStatus , FileStatus(..) , setFileMode @@ -92,6 +94,7 @@ data NCQStorageException = | NCQStorageTimeout | NCQStorageCurrentAlreadyOpen | NCQStorageCantOpenCurrent + | NCQMergeInvariantFailed String deriving stock (Show,Typeable) instance Exception NCQStorageException @@ -153,6 +156,7 @@ data NCQStorage = , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) , ncqFlushNow :: TVar [TQueue ()] + , ncqMergeReq :: TVar Int , ncqOpenDone :: TMVar Bool , ncqStopped :: TVar Bool } @@ -201,6 +205,12 @@ ncqGetNewFossilName n@NCQStorage{} = do let (p,tpl) = splitFileName fn liftIO $ emptyTempFile p tpl +ncqGetNewMergeName :: MonadIO m => NCQStorage -> m FilePath +ncqGetNewMergeName n@NCQStorage{} = do + let fn = ncqGetFileName n "merge-.data" + let (p,tpl) = splitFileName fn + liftIO $ emptyTempFile p tpl + ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath ncqGetIndexFileName ncq fk = do ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq") @@ -244,7 +254,7 @@ ncqAddTrackedFilesIO ncq fps = do let dataFile = ncqGetDataFileName ncq fp stat <- getFileStatus dataFile let ts = modificationTimeHiRes stat - pure $ Just (fp, TimeSpec (floor ts) 0)) + pure $ Just (fp, posixToTimeSpec ts)) (\e -> do err $ "ncqAddTrackedFilesIO: failed to stat " <+> viaShow e pure Nothing) @@ -279,7 +289,7 @@ ncqWriteError ncq txt = liftIO do let msg = Text.pack $ show $ "error" <+> fill 12 (pretty p) <+> pretty txt <> line Text.appendFile (ncqGetErrorLogName ncq) msg -ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m (FilePath, [HashRef]) +ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m FilePath ncqIndexFile n@NCQStorage{} fp' = do let fp = ncqGetFileName n fp' @@ -301,7 +311,7 @@ ncqIndexFile n@NCQStorage{} fp' = do mv result fp - pure (fp, fmap (coerce @_ @HashRef . fst) items) + pure fp ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do @@ -326,8 +336,9 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do reader <- makeReader writer <- makeWriter indexQ indexer <- makeIndexer writer indexQ + merge <- makeMerge - mapM_ waitCatch [writer,indexer] + mapM_ waitCatch [writer,indexer,merge] -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] mapM_ cancel [reader] @@ -338,6 +349,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do False -> loop _ -> debug "STOPPING THREAD" + micropause :: forall a m . (IsTimeout a, MonadUnliftIO m) => Timeout a -> m () + micropause p = do + void $ race @m (pause p) $ + atomically do + s <- readTVar ncqStopped + unless s STM.retry + makeReader = do cap <- getNumCapabilities reader <- ContT $ withAsync $ untilStopped do @@ -360,6 +378,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link reader pure reader + makeMerge = do + me <- ContT $ withAsync $ untilStopped do + micropause @'Seconds 10 + debug "MERGE THREAD" + + link me + pure me makeWriter indexQ = do @@ -411,7 +436,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug $ "FUCKING WRITE INDEX" <+> pretty fn - (key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList + key <- ncqIndexFile ncq fn ncqAddTrackedFilesIO ncq [key] ncqLoadSomeIndexes ncq [fromString key] @@ -512,13 +537,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do mv current fossilized atomically do - writeTVar ncqIndexNow 0 -- NOTE: extra-use -- добавляем лишний 1 для индексации. -- исходный файл закрываем, только когда проиндексировано. -- то есть должны отнять 1 после индексации. modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fdr) 1) writeTQueue indexQ (fdr, fossilized) + writeTVar ncqIndexNow 0 closeFd fh writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0) @@ -872,7 +897,7 @@ ncqFixIndexes ncq@NCQStorage{..} = do unless here do warn $ "missed-index" <+> pretty k let dataName = ncqGetDataFileName ncq k - (newKey,_) <- ncqIndexFile ncq dataName + newKey <- ncqIndexFile ncq dataName ncqAddTrackedFilesIO ncq [newKey] @@ -976,9 +1001,9 @@ ncqStorageInit_ check path = do let ncqRoot = path - let ncqSyncSize = 64 * (1024 ^ 2) - let ncqMinLog = 2 * (1024 ^ 3) - let ncqMaxLog = 10 * (1024 ^ 3) + let ncqSyncSize = 64 * (1024 ^ 2) + let ncqMinLog = 512 * (1024 ^ 2) + let ncqMaxLog = 4 * (1024 ^ 3) let ncqMaxCached = 64 @@ -1002,6 +1027,7 @@ ncqStorageInit_ check path = do ncqIndexNow <- newTVarIO 0 ncqCurrentFd <- newTVarIO Nothing ncqIndexed <- newTVarIO mempty + ncqMergeReq <- newTVarIO 0 let currentName = ncqGetCurrentName_ path ncqGen @@ -1068,3 +1094,136 @@ withNCQ setopts p action = flip runContT pure do wait writer pure e + +ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m () +ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ + +ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage -> m () +ncqStorageMergeStep ncq@NCQStorage{..} = do + tracked <- readTVarIO ncqTrackedFiles + <&> HPSQ.toList + <&> fmap (over _2 (coerce @_ @TimeSpec)) + <&> List.sortOn (view _2) + <&> List.take 2 + + + for_ tracked $ \(f, t, _) -> do + debug $ "FILE TO MERGE" <+> pretty (realToFrac @_ @(Fixed E6) t) <+> pretty f + + mergeStep (fmap (view _1) tracked) + + where + + writeFiltered :: forall m . MonadIO m + => FilePath + -> Handle + -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) + -> m () + + writeFiltered fn out filt = do + ncqStorageScanDataFile ncq fn $ \o s k v -> do + skip <- filt o s k v <&> not + + when skip do + debug $ pretty k <+> pretty "skipped" + + unless skip $ liftIO do + BS.hPut out (LBS.toStrict (makeEntryLBS k v)) + + mergeStep [] = none + mergeStep [_] = none + + mergeStep [b,a] = do + warn $ "merge" <+> pretty a <+> pretty b + + let fDataNameA = ncqGetDataFileName ncq a + let fIndexNameA = ncqGetIndexFileName ncq a + + let fDataNameB = ncqGetDataFileName ncq b + let fIndexNameB = ncqGetIndexFileName ncq b + + warn $ "file A" <+> pretty fDataNameA <+> pretty fIndexNameA + warn $ "file B" <+> pretty fDataNameB <+> pretty fIndexNameB + + doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) + doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) + doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA) + + flip runContT pure $ callCC \exit -> do + + mfile <- ncqGetNewMergeName ncq + + ContT $ bracket none $ const do + rm mfile + + liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do + + debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile) + + (mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA + >>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA)) + + debug $ "SCAN FILE A" <+> pretty fDataNameA + + writeFiltered fDataNameA fwh $ \_ _ _ v -> do + pure $ not (ncqIsTomb (LBS.fromStrict v)) + + debug $ "SCAN FILE B" <+> pretty fDataNameA + + writeFiltered fDataNameB fwh $ \_ _ k v -> do + let tomb = ncqIsTomb (LBS.fromStrict v) + foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust + let skip = tomb || foundInA + pure $ not skip + + result <- fileSize mfile + + when (result == 0) $ exit () + + liftIO do + + fossil <- ncqGetNewFossilName ncq + mv mfile fossil + + statA <- getFileStatus fDataNameA + + let ts = modificationTimeHiRes statA + setFileTimesHiRes fossil ts ts + + fname <- ncqIndexFile ncq fossil + + atomically do + let fp = fromString fname + modifyTVar ncqTrackedFiles (HPSQ.delete a) + modifyTVar ncqTrackedFiles (HPSQ.delete b) + ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)] + + mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] + + mergeStep _ = do + mergeError "assertion failed: more than 2 files to merge" + + mergeError d = throwIO (NCQMergeInvariantFailed (show d)) + + orFail what e = do + r <- what + unless r (throwIO (NCQMergeInvariantFailed (show e))) + + makeEntryLBS h bs = do + let b = byteString (coerce @_ @ByteString h) + <> byteString bs + + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = byteString (N.bytestring32 (fromIntegral len)) + + toLazyByteString (ws <> b) + + +posixToTimeSpec :: POSIXTime -> TimeSpec +posixToTimeSpec pt = + let (s, frac) = properFraction pt :: (Integer, POSIXTime) + ns = round (frac * 1e9) + in TimeSpec (fromIntegral s) ns + + diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs index 75849c7c..f2fcbb02 100644 --- a/hbs2-tests/test/TCQ.hs +++ b/hbs2-tests/test/TCQ.hs @@ -148,6 +148,11 @@ main = do <&> fmap fst >>= orThrow (TCQGone p) + let getTCQ (TCQ p) = do + readTVarIO instances + <&> HM.lookup p + >>= orThrow (TCQGone p) + let dict = makeDict @C do entry $ bindMatch "--help" $ nil_ \case @@ -223,6 +228,19 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:merge:step" $ \syn -> lift do + + tcq <- case syn of + [ isOpaqueOf @TCQ -> Just tcq ] -> do + pure tcq + + e -> throwIO $ BadFormException @C (mkList e) + + ncq <- getNCQ tcq + ncqStorageMergeStep ncq + + pure nil + entry $ bindMatch "ncq:close" $ nil_ \case [ isOpaqueOf @TCQ -> Just tcq ] -> lift do ncq <- getNCQ tcq @@ -279,6 +297,14 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:stop" $ nil_ \case + [ isOpaqueOf @TCQ -> Just tcq ] -> lift do + (ncq, w) <- getTCQ tcq + ncqStorageStop ncq + debug "wait storage to stop" + wait w + + e -> throwIO $ BadFormException @C (mkList e) entry $ bindMatch "ncq:set:ref" $ \case [ isOpaqueOf @TCQ -> Just tcq, HashLike ref , HashLike val ] -> lift do @@ -343,6 +369,20 @@ main = do r <- ncqStoragePutBlock ncq bs pure $ maybe nil (mkSym . show . pretty) r + entry $ bindMatch "ncq:merkle:hashes" $ \case + [ isOpaqueOf @TCQ -> Just tcq, HashLike h ] -> lift do + ncq <- getNCQ tcq + liftIO do + let sto = AnyStorage ncq + mkList <$> S.toList_ do + walkMerkle (coerce h) (getBlock sto) $ \case + Left{} -> throwIO MissedBlockError + Right (hrr :: [HashRef]) -> do + forM_ hrr $ \hx -> do + S.yield (mkSym $ show $ pretty hx) + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:merkle:write" $ \syn -> do (tcq,fname) <- case syn of [ isOpaqueOf @TCQ -> Just tcq, StringLike f ] -> lift do