diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index ec6a7ff1..ce5dbe2d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -98,40 +98,8 @@ import System.FileLock as FL type NCQPerks m = MonadIO m -data NCQStorageException = - NCQStorageAlreadyExist String - | NCQStorageSeedMissed - | NCQStorageTimeout - | NCQStorageCurrentAlreadyOpen - | NCQStorageCantOpenCurrent - | NCQStorageBrokenCurrent - | NCQMergeInvariantFailed String - | NCQStorageCantLock FilePath - deriving stock (Show,Typeable) - -instance Exception NCQStorageException -newtype FilePrio = FilePrio (Down TimeSpec) - deriving newtype (Eq,Ord) - deriving stock (Generic,Show) - -mkFilePrio :: TimeSpec -> FilePrio -mkFilePrio = FilePrio . Down - -timeSpecFromFilePrio :: FilePrio -> TimeSpec -timeSpecFromFilePrio (FilePrio what) = getDown what -{-# INLINE timeSpecFromFilePrio #-} - -data CachedEntry = - CachedEntry { cachedMmapedIdx :: ByteString - , cachedMmapedData :: ByteString - , cachedNway :: NWayHash - , cachedTs :: TVar TimeSpec - } - -instance Show CachedEntry where - show _ = "CachedEntry{...}" data WQItem = WQItem { wqNew :: Bool @@ -1526,12 +1494,6 @@ ncqStorageMergeStep ncq@NCQStorage{..} = flip runContT pure do unless r (throwIO (NCQMergeInvariantFailed (show e))) -posixToTimeSpec :: POSIXTime -> TimeSpec -posixToTimeSpec pt = - let (s, frac) = properFraction pt :: (Integer, POSIXTime) - ns = round (frac * 1e9) - in TimeSpec (fromIntegral s) ns - -- NOTE: incremental -- now it may became incremental if we'll diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 23b941de..d3916b3d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -4,16 +4,21 @@ import HBS2.Prelude import HBS2.Data.Types.Refs import HBS2.Hash +import HBS2.Data.Log.Structured.NCQ + import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Network.ByteOrder qualified as N +import Data.Ord (Down(..)) import Data.Coerce import System.FilePath import Data.Word import Data.Data import Control.Exception +import UnliftIO (TVar) + -- Log structure: -- (SD)* -- S ::= word32be, section prefix @@ -22,6 +27,20 @@ import Control.Exception -- PREFIX ::= BYTESTRING(4) -- DATA ::= BYTESTRING(n) | n == S - LEN(WORD32) - LEN(HASH) - LEN(PREFIX) +data NCQStorageException = + NCQStorageAlreadyExist String + | NCQStorageSeedMissed + | NCQStorageTimeout + | NCQStorageCurrentAlreadyOpen + | NCQStorageCantOpenCurrent + | NCQStorageBrokenCurrent + | NCQMergeInvariantFailed String + | NCQStorageCantLock FilePath + | NCQStorageCantMapFile FilePath + deriving stock (Show,Typeable) + +instance Exception NCQStorageException + newtype FileKey = FileKey ByteString deriving newtype (Eq,Ord,Hashable,Show) @@ -31,6 +50,50 @@ instance IsString FileKey where instance Pretty FileKey where pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) +newtype DataFile a = DataFile a + +newtype IndexFile a = IndexFile a + +class ToFileName a where + toFileName :: a -> FilePath + +instance ToFileName FileKey where + toFileName = BS8.unpack . coerce + +instance ToFileName (DataFile FileKey) where + toFileName (DataFile fk) = dropExtension (toFileName fk) `addExtension` ".data" + + +instance ToFileName (IndexFile FileKey) where + toFileName (IndexFile fk) = dropExtension (toFileName fk) `addExtension` ".cq" + +instance ToFileName (DataFile FilePath) where + toFileName (DataFile fp) = dropExtension fp `addExtension` ".data" + +instance ToFileName (IndexFile FilePath) where + toFileName (IndexFile fp) = dropExtension fp `addExtension` ".cq" + +newtype FilePrio = FilePrio (Down TimeSpec) + deriving newtype (Eq,Ord) + deriving stock (Generic,Show) + +mkFilePrio :: TimeSpec -> FilePrio +mkFilePrio = FilePrio . Down + +timeSpecFromFilePrio :: FilePrio -> TimeSpec +timeSpecFromFilePrio (FilePrio what) = getDown what +{-# INLINE timeSpecFromFilePrio #-} + +data CachedEntry = + CachedEntry { cachedMmapedIdx :: ByteString + , cachedMmapedData :: ByteString + , cachedNway :: NWayHash + , cachedTs :: TVar TimeSpec + } + +instance Show CachedEntry where + show _ = "CachedEntry{...}" + newtype NCQFullRecordLen a = @@ -121,3 +184,10 @@ data NCQFsckIssue = deriving stock (Eq,Ord,Show,Data,Generic) +posixToTimeSpec :: POSIXTime -> TimeSpec +posixToTimeSpec pt = + let (s, frac) = properFraction pt :: (Integer, POSIXTime) + ns = round (frac * 1e9) + in TimeSpec (fromIntegral s) ns + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index e2de969f..ccaa9d1d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -102,11 +102,9 @@ import System.FileLock as FL type FOff = Word64 -data NCQEntry = - NCQEntryNew Int ByteString - -- | NCQEntryWritten Int FileKey FOff (Maybe ByteString) +newtype NCQEntry = NCQEntry ByteString -type Shard = TVar (HashMap HashRef (TVar NCQEntry)) +type Shard = TVar (HashMap HashRef NCQEntry) data NCQStorage2 = NCQStorage2 @@ -116,12 +114,15 @@ data NCQStorage2 = , ncqWriteQLen :: Int , ncqWriteBlock :: Int , ncqMinLog :: Int + , ncqMaxCached :: Int , ncqMemTable :: Vector Shard , ncqWriteSem :: TSem , ncqWriteQ :: TVar (Seq HashRef) , ncqStorageStopReq :: TVar Bool , ncqStorageSyncReq :: TVar Bool , ncqSyncNo :: TVar Int + , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) + , ncqCachedEntries :: TVar Int } deriving (Generic) @@ -136,6 +137,7 @@ ncqStorageOpen2 fp upd = do let ncqWriteQLen = 1024 * 16 let ncqMinLog = 256 * megabytes let ncqWriteBlock = 1024 + let ncqMaxCached = 128 cap <- getNumCapabilities <&> fromIntegral ncqWriteQ <- newTVarIO mempty ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap) @@ -143,12 +145,26 @@ ncqStorageOpen2 fp upd = do ncqStorageStopReq <- newTVarIO False ncqStorageSyncReq <- newTVarIO False ncqSyncNo <- newTVarIO 0 + ncqTrackedFiles <- newTVarIO HPSQ.empty + ncqCachedEntries <- newTVarIO 0 let ncq = NCQStorage2{..} & upd mkdir (ncqGetWorkDir ncq) + ncqRepair ncq + pure ncq +ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a +ncqWithStorage fp action = flip runContT pure do + sto <- lift (ncqStorageOpen2 fp id) + w <- ContT $ withAsync (ncqStorageRun2 sto) + link w + r <- lift (action sto) + lift (ncqStorageStop2 sto) + wait w + pure r + ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp @@ -175,15 +191,8 @@ ncqGetShard :: NCQStorage2 -> HashRef -> Shard ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h {-# INLINE ncqGetShard #-} -ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe (NCQEntry, TVar NCQEntry)) -ncqLookupEntrySTM ncq h = do - readTVar (ncqGetShard ncq h) - <&> HM.lookup h - >>= \case - Nothing -> pure Nothing - Just tv -> do - v <- readTVar tv - pure $ Just (v, tv) +ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry) +ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h ncqPutBS :: MonadUnliftIO m => NCQStorage2 @@ -201,34 +210,72 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do when (not stop && filled > ncqWriteQLen) STM.retry - n <- readTVar ncqSyncNo ncqAlterEntrySTM ncq h $ \case Just e -> Just e - Nothing -> Just (NCQEntryNew n bs) + Nothing -> Just (NCQEntry bs) modifyTVar' ncqWriteQ (|> h) signalTSem ncqWriteSem pure h ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry) -ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) <&> fmap fst +ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) + +ncqReadEntry :: ByteString -> Word64 -> Word32 -> ByteString +ncqReadEntry mmaped off size = BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmaped +{-# INLINE ncqReadEntry #-} + +ncqSearchBS :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe ByteString) +ncqSearchBS ncq@NCQStorage2{..} href = flip runContT pure $ callCC \exit -> do + now <- getTimeCoarse + + lift (ncqLookupEntry ncq href) >>= maybe none (exit . Just . coerce) + + tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList + + for_ tracked $ \(fk, prio, mCached) -> case mCached of + Just CachedEntry{..} -> do + lookupEntry href (cachedMmapedIdx, cachedNway) >>= \case + Nothing -> none + Just (offset,size) -> do + atomically $ writeTVar cachedTs now + exit (Just $ ncqReadEntry cachedMmapedData offset size) + + Nothing -> do + let indexFile = ncqGetFileName ncq (toFileName (IndexFile fk)) + let dataFile = ncqGetFileName ncq (toFileName (DataFile fk)) + + (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) + >>= orThrow (NCQStorageCantMapFile indexFile) + + datBs <- liftIO $ mmapFileByteString dataFile Nothing + + ce <- CachedEntry idxBs datBs idxNway <$> newTVarIO now + + lookupEntry href (idxBs, idxNway) >>= \case + Nothing -> none + Just (offset, size) -> do + + atomically do + modifyTVar ncqTrackedFiles (HPSQ.insert fk prio (Just ce)) + modifyTVar ncqCachedEntries (+1) + evictIfNeededSTM ncq (Just 1) + + exit $ Just (ncqReadEntry datBs offset size) + + pure mzero + + where + lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do + entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= toMPlus + pure + ( fromIntegral $ N.word64 (BS.take 8 entryBs) + , fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) ) ncqAlterEntrySTM :: NCQStorage2 -> HashRef -> (Maybe NCQEntry -> Maybe NCQEntry) -> STM () ncqAlterEntrySTM ncq h alterFn = do let shard = ncqGetShard ncq h - readTVar shard <&> HM.lookup h >>= \case - Just tve -> do - e <- readTVar tve - case alterFn (Just e) of - Nothing -> modifyTVar' shard (HM.delete h) - Just e' -> writeTVar tve e' - - Nothing -> case alterFn Nothing of - Nothing -> modifyTVar' shard (HM.delete h) - Just e -> do - tve <- newTVar e - modifyTVar' shard (HM.insert h tve) - + modifyTVar shard (HM.alter alterFn h) data RunSt = RunNew @@ -254,9 +301,9 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do maybe1 what none $ \(fk, fh) -> do closeFd fh - let fname = BS8.unpack (coerce fk) -- notice $ yellow "indexing" <+> pretty fname - idx <- ncqIndexFile ncq fname + idx <- ncqIndexFile ncq (DataFile fk) + ncqAddTrackedFile ncq (DataFile fk) nwayHashMMapReadOnly idx >>= \case Nothing -> err $ "can't open index" <+> pretty idx Just (bs,nway) -> do @@ -332,7 +379,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do Right chu -> do ws <- for chu $ \h -> do atomically (ncqLookupEntrySTM ncq h) >>= \case - Just (r@(NCQEntryNew ns bs),t) -> do + Just (NCQEntry bs) -> do lift (appendSection fh bs) _ -> pure 0 @@ -392,7 +439,6 @@ ncqFileFastCheck fp = do throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s)) - ncqStorageScanDataFile :: MonadIO m => NCQStorage2 -> FilePath @@ -421,20 +467,16 @@ ncqStorageScanDataFile ncq fp' action = do next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs) -ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> FilePath -> m FilePath -ncqIndexFile n@NCQStorage2{} fp'' = do +ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> DataFile FileKey -> m FilePath +ncqIndexFile n@NCQStorage2{} fk = do - let fp' = addExtension (ncqGetFileName n fp'') ".data" + let fp = toFileName fk & ncqGetFileName n + let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n - let fp = ncqGetFileName n fp' - & takeBaseName - & (`addExtension` ".cq") - & ncqGetFileName n - - trace $ "INDEX" <+> pretty fp' <+> pretty fp + debug $ "INDEX" <+> pretty fp <+> pretty dest items <- S.toList_ do - ncqStorageScanDataFile n fp' $ \o w k v -> do + ncqStorageScanDataFile n fp $ \o w k _ -> do let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32 let os = fromIntegral @_ @Word64 o & N.bytestring64 let record = os <> rs @@ -442,11 +484,109 @@ ncqIndexFile n@NCQStorage2{} fp'' = do S.yield (coerce k, record) let (dir,name) = splitFileName fp + let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" - result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir name items + result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items - mv result fp - - pure fp + mv result dest + pure dest +ncqAddTrackedFilesSTM :: NCQStorage2 -> [(FileKey, TimeSpec)] -> STM () +ncqAddTrackedFilesSTM NCQStorage2{..} keys = do + old <- readTVar ncqTrackedFiles + let new = flip fix (old, keys) \next -> \case + (s, []) -> s + (s, (k,ts):xs) -> next (HPSQ.insert k (FilePrio (Down ts)) Nothing s, xs) + writeTVar ncqTrackedFiles new + +ncqAddTrackedFile :: MonadIO m => NCQStorage2 -> DataFile FileKey -> m () +ncqAddTrackedFile ncq fkey = do + let fname = ncqGetFileName ncq (toFileName fkey) + stat <- liftIO $ PFS.getFileStatus fname + let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat + let fk = fromString (takeFileName fname) + atomically $ ncqAddTrackedFilesSTM ncq [(fk, ts)] + + +ncqAddTrackedFilesIO :: MonadIO m => NCQStorage2 -> [FilePath] -> m () +ncqAddTrackedFilesIO ncq fps = do + tsFiles <- catMaybes <$> forM fps \fp' -> liftIO $ do + catchIOError + (do + let fp = fromString fp' + let dataFile = ncqGetFileName ncq (toFileName (DataFile fp)) + stat <- getFileStatus dataFile + let ts = modificationTimeHiRes stat + pure $ Just (fp, posixToTimeSpec ts)) + (\e -> do + err $ "ncqAddTrackedFilesIO: failed to stat " <+> viaShow e + pure Nothing) + + atomically $ ncqAddTrackedFilesSTM ncq tsFiles + +evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM () +evictIfNeededSTM NCQStorage2{..} howMany = do + cur <- readTVar ncqCachedEntries + + let need = fromMaybe (cur `div` 2) howMany + excess = max 0 (cur + need - ncqMaxCached) + + when (excess > 0) do + files <- readTVar ncqTrackedFiles <&> HPSQ.toList + + oldest <- forM files \case + (k, prio, Just ce) -> do + ts <- readTVar (cachedTs ce) + pure (Just (ts, k, prio)) + _ -> pure Nothing + + let victims = + oldest + & catMaybes + & List.sortOn (\(ts,_,_) -> ts) + & List.take excess + + for_ victims $ \(_,k,prio) -> do + modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing) + modifyTVar ncqCachedEntries (subtract 1) + + +{- HLINT ignore "Functor law" -} + +ncqListTrackedFiles :: MonadIO m => NCQStorage2 -> m [FilePath] +ncqListTrackedFiles ncq = do + let wd = ncqGetWorkDir ncq + dirFiles wd + >>= mapM (pure . takeBaseName) + <&> List.filter (List.isPrefixOf "fossil-") + <&> HS.toList . HS.fromList + +ncqRepair :: MonadIO m => NCQStorage2 -> m () +ncqRepair me@NCQStorage2{..} = do + fossils <- ncqListTrackedFiles me + debug "ncqRepair" + debug $ vcat (fmap pretty fossils) + + for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do + let dataFile = ncqGetFileName me $ toFileName (DataFile fo) + try @_ @IOException (ncqFileFastCheck dataFile) >>= \case + Left e -> do + err (viaShow e) + mv fo (dropExtension fo `addExtension` ".broken") + + Right{} | i <= 1 -> do + let dataKey = DataFile (fromString fo) + idx <- doesFileExist (toFileName (IndexFile dataFile)) + + unless idx do + debug $ "indexing" <+> pretty (toFileName dataKey) + r <- ncqIndexFile me dataKey + debug $ "indexed" <+> pretty r + next (succ i) + + ncqAddTrackedFile me dataKey + + Right{} -> do + err $ "skip indexing" <+> pretty dataFile + diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 8864f304..0070cde1 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1220,6 +1220,7 @@ executable test-ncq , mmap , zstd , unix + , mwc-random diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 6cccdb28..34781d21 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -21,7 +21,7 @@ import HBS2.Storage.Operations.ByteString import HBS2.System.Logger.Simple.ANSI import HBS2.Storage.NCQ -import HBS2.Storage.NCQ2 +import HBS2.Storage.NCQ2 as N2 import HBS2.Data.Log.Structured.NCQ import HBS2.CLI.Run.Internal.Merkle @@ -68,6 +68,8 @@ import System.IO.MMap import System.IO qualified as IO import System.Exit (exitSuccess, exitFailure) import System.Random +import System.Random.MWC as MWC +import System.Random.Stateful import System.Random.Shuffle (shuffleM) import Safe import Lens.Micro.Platform @@ -76,6 +78,7 @@ import System.IO.Temp qualified as Temp import System.Mem import UnliftIO +import UnliftIO.Async import Test.Tasty.HUnit import Text.InterpolatedString.Perl6 (qc) @@ -582,6 +585,37 @@ testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do rm ncqDir +testNCQ2Simple1 :: MonadUnliftIO m + => TestEnv + -> m () + +testNCQ2Simple1 TestEnv{..} = do + debug "testNCQ2Simple1" + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + bz <- replicateM 1000 $ liftIO do + n <- (`mod` (256*1024)) <$> uniformM @Int g + uniformByteStringM n g + + ncqWithStorage ncqDir $ \sto -> liftIO do + for bz $ \z -> do + h <- ncqPutBS sto (Just B) Nothing z + atomically $ writeTQueue q h + found <- ncqSearchBS sto h <&> maybe (-1) BS.length + assertBool (show $ "found-immediate" <+> pretty h) (found > 0) + + ncqWithStorage ncqDir $ \sto -> liftIO do + hashes <- atomically (STM.flushTQueue q) + for_ hashes $ \ha -> do + found <- ncqSearchBS sto ha <&> maybe (-1) BS.length + assertBool (show $ "found-immediate" <+> pretty ha) (found > 0) + -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) + + testNCQ2Concurrent1 :: MonadUnliftIO m => Bool -> Int @@ -793,6 +827,8 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do + runTest testNCQ2Simple1 entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case [ StringLike fn ] -> do