diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 477292ba..1fdc47b2 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -68,6 +68,7 @@ library , binary , bytestring , bytestring-mmap + , bitvec , containers , directory , filepath @@ -125,6 +126,7 @@ executable hbs2-ncq , base58-bytestring , binary , bytestring + , bitvec , cborg , clock , containers diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index d3916b3d..324cf66e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -124,7 +124,7 @@ ncqFullTombLen = ncqSLen + ncqKeyLen + ncqPrefixLen + 0 {-# INLINE ncqFullTombLen #-} -data NCQSectionType = B | R | T +data NCQSectionType = B | R | T | M deriving stock (Eq,Ord,Show) instance Pretty NCQSectionType where @@ -132,6 +132,7 @@ instance Pretty NCQSectionType where B -> "B" T -> "T" R -> "R" + M -> "M" ncqPrefixLen :: Integral a => a ncqPrefixLen = 4 @@ -146,6 +147,9 @@ ncqBlockPrefix = "B;;\x00" ncqTombPrefix :: ByteString ncqTombPrefix = "T;;\x00" +ncqMetaPrefix :: ByteString +ncqMetaPrefix = "M;;\x00" + ncqMakeSectionBS :: Maybe NCQSectionType -> HashRef -> ByteString @@ -163,6 +167,7 @@ ncqMakeSectionBS t h bs = do Just B -> (ncqPrefixLen, ncqBlockPrefix) Just T -> (ncqPrefixLen, ncqTombPrefix) Just R -> (ncqPrefixLen, ncqRefPrefix) + Just M -> (ncqPrefixLen, ncqMetaPrefix) {-# INLINE ncqMakeSectionBS #-} diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index ccaa9d1d..94057922 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -32,6 +32,7 @@ import Codec.Compression.Zstd.Streaming (Result(..)) import Control.Applicative import Data.ByteString.Builder import Network.ByteOrder qualified as N +import Data.Bit.ThreadSafe qualified as BV import Data.HashMap.Strict (HashMap) import Control.Monad.Except import Control.Monad.Trans.Cont @@ -110,6 +111,7 @@ data NCQStorage2 = NCQStorage2 { ncqRoot :: FilePath , ncqGen :: Int + , ncqSalt :: HashRef , ncqFsync :: Int , ncqWriteQLen :: Int , ncqWriteBlock :: Int @@ -147,14 +149,19 @@ ncqStorageOpen2 fp upd = do ncqSyncNo <- newTVarIO 0 ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 + + let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" + let ncq = NCQStorage2{..} & upd mkdir (ncqGetWorkDir ncq) ncqRepair ncq + ncqLoadIndexes ncq pure ncq + ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a ncqWithStorage fp action = flip runContT pure do sto <- lift (ncqStorageOpen2 fp id) @@ -277,6 +284,14 @@ ncqAlterEntrySTM ncq h alterFn = do let shard = ncqGetShard ncq h modifyTVar shard (HM.alter alterFn h) + +ncqStorageDel :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m () +ncqStorageDel ncq@NCQStorage2{..} h = flip runContT pure $ callCC \exit -> do + -- 1. absent + -- 1. in memtable only + -- 2. in disk + none + data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) @@ -407,7 +422,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do -- on open: last w64be == fileSize let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) let h = hashObject @HbSync paylo & coerce - ncqMakeSectionBS (Just B) h paylo + ncqMakeSectionBS (Just M) h paylo {-# INLINE fileTailRecord #-} appendSection :: forall m . MonadUnliftIO m @@ -491,39 +506,24 @@ ncqIndexFile n@NCQStorage2{} fk = do mv result dest pure dest - -ncqAddTrackedFilesSTM :: NCQStorage2 -> [(FileKey, TimeSpec)] -> STM () -ncqAddTrackedFilesSTM NCQStorage2{..} keys = do - old <- readTVar ncqTrackedFiles - let new = flip fix (old, keys) \next -> \case - (s, []) -> s - (s, (k,ts):xs) -> next (HPSQ.insert k (FilePrio (Down ts)) Nothing s, xs) - writeTVar ncqTrackedFiles new - -ncqAddTrackedFile :: MonadIO m => NCQStorage2 -> DataFile FileKey -> m () -ncqAddTrackedFile ncq fkey = do +ncqAddTrackedFile :: MonadIO m => NCQStorage2 -> DataFile FileKey -> m Bool +ncqAddTrackedFile ncq@NCQStorage2{..} fkey = flip runContT pure $ callCC \exit -> do let fname = ncqGetFileName ncq (toFileName fkey) + let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey))) + + idxHere <- doesFileExist idxName + + unless idxHere do + err $ "Index does not exist" <+> pretty (takeFileName idxName) + exit False + stat <- liftIO $ PFS.getFileStatus fname + -- FIXME: maybe-creation-time-actually let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat let fk = fromString (takeFileName fname) - atomically $ ncqAddTrackedFilesSTM ncq [(fk, ts)] - - -ncqAddTrackedFilesIO :: MonadIO m => NCQStorage2 -> [FilePath] -> m () -ncqAddTrackedFilesIO ncq fps = do - tsFiles <- catMaybes <$> forM fps \fp' -> liftIO $ do - catchIOError - (do - let fp = fromString fp' - let dataFile = ncqGetFileName ncq (toFileName (DataFile fp)) - stat <- getFileStatus dataFile - let ts = modificationTimeHiRes stat - pure $ Just (fp, posixToTimeSpec ts)) - (\e -> do - err $ "ncqAddTrackedFilesIO: failed to stat " <+> viaShow e - pure Nothing) - - atomically $ ncqAddTrackedFilesSTM ncq tsFiles + atomically do + modifyTVar' ncqTrackedFiles (HPSQ.insert fk (FilePrio (Down ts)) Nothing) + pure True evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM () evictIfNeededSTM NCQStorage2{..} howMany = do @@ -562,18 +562,54 @@ ncqListTrackedFiles ncq = do <&> List.filter (List.isPrefixOf "fossil-") <&> HS.toList . HS.fromList + +ncqLoadSomeIndexes :: MonadIO m => NCQStorage2 -> [FileKey] -> m () +ncqLoadSomeIndexes ncq@NCQStorage2{..} keys = do + now <- getTimeCoarse + + mapM_ (ncqAddTrackedFile ncq) (fmap DataFile keys) + + loaded <- catMaybes <$> forM keys \key -> runMaybeT do + mEntry <- liftIO $ readTVarIO ncqTrackedFiles <&> HPSQ.lookup key + guard (maybe True (\(_, m) -> isNothing m) mEntry) + + let idxFile = ncqGetFileName ncq (toFileName $ IndexFile key) + let datFile = ncqGetFileName ncq (toFileName $ DataFile key) + + (mmIdx, nway) <- MaybeT $ liftIO $ nwayHashMMapReadOnly idxFile + mmData <- liftIO $ mmapFileByteString datFile Nothing + tnow <- newTVarIO now + pure (key, CachedEntry mmIdx mmData nway tnow) + + atomically do + evictIfNeededSTM ncq (Just (List.length loaded)) + + for_ loaded \(k, ce) -> do + files <- readTVar ncqTrackedFiles + case HPSQ.lookup k files of + Just (p, Nothing) -> do + modifyTVar ncqTrackedFiles (HPSQ.insert k p (Just ce)) + modifyTVar ncqCachedEntries (+1) + _ -> pure () + +ncqLoadIndexes :: MonadIO m => NCQStorage2 -> m () +ncqLoadIndexes ncq@NCQStorage2{..} = do + w <- readTVarIO ncqTrackedFiles + <&> List.take (ncqMaxCached `div` 2) . HPSQ.keys + ncqLoadSomeIndexes ncq w + ncqRepair :: MonadIO m => NCQStorage2 -> m () -ncqRepair me@NCQStorage2{..} = do +ncqRepair me@NCQStorage2{} = do fossils <- ncqListTrackedFiles me - debug "ncqRepair" - debug $ vcat (fmap pretty fossils) for_ fossils $ \fo -> liftIO $ flip fix 0 \next i -> do let dataFile = ncqGetFileName me $ toFileName (DataFile fo) - try @_ @IOException (ncqFileFastCheck dataFile) >>= \case + try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case Left e -> do err (viaShow e) - mv fo (dropExtension fo `addExtension` ".broken") + -- TODO: try-fix-later + mv dataFile (dropExtension dataFile `addExtension` ".broken") + rm (ncqGetFileName me (toFileName (IndexFile fo))) Right{} | i <= 1 -> do let dataKey = DataFile (fromString fo) @@ -585,8 +621,12 @@ ncqRepair me@NCQStorage2{..} = do debug $ "indexed" <+> pretty r next (succ i) - ncqAddTrackedFile me dataKey + void $ ncqAddTrackedFile me dataKey Right{} -> do err $ "skip indexing" <+> pretty dataFile + +ncqRefHash :: NCQStorage2 -> HashRef -> HashRef +ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 34781d21..6487d627 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -597,7 +597,7 @@ testNCQ2Simple1 TestEnv{..} = do g <- liftIO MWC.createSystemRandom - bz <- replicateM 1000 $ liftIO do + bz <- replicateM 30000 $ liftIO do n <- (`mod` (256*1024)) <$> uniformM @Int g uniformByteStringM n g @@ -616,6 +616,50 @@ testNCQ2Simple1 TestEnv{..} = do -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) +testNCQ2Repair1:: MonadUnliftIO m + => TestEnv + -> m () + +testNCQ2Repair1 TestEnv{..} = do + debug "testNCQ2Repair1" + let tmp = testEnvDir + let ncqDir = tmp + q <- newTQueueIO + + g <- liftIO MWC.createSystemRandom + + bz <- replicateM 3000 $ liftIO do + n <- (`mod` (256*1024)) <$> uniformM @Int g + uniformByteStringM n g + + ncqWithStorage ncqDir $ \sto -> liftIO do + for_ bz $ \z -> do + h <- ncqPutBS sto (Just B) Nothing z + atomically $ writeTQueue q h + found <- ncqSearchBS sto h <&> maybe (-1) BS.length + assertBool (show $ "found-immediate" <+> pretty h) (found > 0) + written <- N2.ncqListTrackedFiles sto + debug $ "TRACKED" <+> vcat (fmap pretty written) + toDestroy <- pure (headMay written) `orDie` "no file written" + + debug $ "adding garbage to" <+> pretty toDestroy + + k <- (`mod` 4096) <$> uniformM @Int g + shit <- uniformByteStringM k g + let df = toFileName (DataFile toDestroy) + let f = N2.ncqGetFileName sto df + let cq = N2.ncqGetFileName sto (toFileName (IndexFile toDestroy)) + rm cq + BS.appendFile f shit + + ncqWithStorage ncqDir $ \sto -> liftIO do + hashes <- atomically (STM.flushTQueue q) + for_ hashes $ \ha -> do + found <- ncqSearchBS sto ha <&> maybe (-1) BS.length + none + -- assertBool (show $ "found-immediate" <+> pretty ha) (found > 0) + -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) + testNCQ2Concurrent1 :: MonadUnliftIO m => Bool -> Int @@ -830,6 +874,9 @@ main = do entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do runTest testNCQ2Simple1 + entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do + runTest testNCQ2Repair1 + entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case [ StringLike fn ] -> do ncqFileFastCheck fn