diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 542ddfbd..1485dffb 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -127,6 +127,7 @@ data NCQStorage2 = , ncqMemTable :: Vector Shard , ncqWriteSem :: TSem , ncqWriteQ :: TVar (Seq HashRef) + , ncqStorageTasks :: TVar Int , ncqStorageStopReq :: TVar Bool , ncqStorageSyncReq :: TVar Bool , ncqSyncNo :: TVar Int @@ -156,6 +157,7 @@ ncqStorageOpen2 fp upd = do ncqSyncNo <- newTVarIO 0 ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 + ncqStorageTasks <- newTVarIO 0 let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" @@ -188,6 +190,10 @@ ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen ncqGetLockFileName :: NCQStorage2 -> FilePath ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" +ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewFossilName ncq = do + liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data" + ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 NCQStorage2{..} = do atomically $ writeTVar ncqStorageStopReq True @@ -331,7 +337,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do maybe1 what none $ \(fk, fh) -> do closeFd fh -- notice $ yellow "indexing" <+> pretty fname - idx <- ncqIndexFile ncq (DataFile fk) + idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk)) ncqAddTrackedFile ncq (DataFile fk) nwayHashMMapReadOnly idx >>= \case Nothing -> err $ "can't open index" <+> pretty idx @@ -376,9 +382,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do rest <- if not (sync || needClose || w > ncqFsync) then pure w else liftIO do - s <- Posix.fileSize <$> Posix.getFdStatus fh - void (appendSection fh (fileTailRecord s)) - fileSynchronise fh + appendTailSection fh >> fileSynchronise fh atomically do writeTVar ncqStorageSyncReq False modifyTVar' ncqSyncNo succ @@ -420,40 +424,10 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do emptyKey = BS.replicate ncqKeyLen 0 - zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload - where zeroPayload = N.bytestring64 0 - zeroHash = HashRef (hashObject zeroPayload) - {-# INLINE zeroSyncEntry #-} - - zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) - {-# INLINE zeroSyncEntrySize #-} - - -- 1. It's B-record - -- 2. It's last w64be == fileSize - -- 3. It's hash == hash (bytestring64be fileSize) - -- 4. recovery-strategy: start-to-end, end-to-start - fileTailRecord w = do - -- on open: last w64be == fileSize - let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) - let h = hashObject @HbSync paylo & coerce - ncqMakeSectionBS (Just M) h paylo - {-# INLINE fileTailRecord #-} - - appendSection :: forall m . MonadUnliftIO m - => Fd - -> ByteString - -> m Int -- (FOff, Int) - - appendSection fh section = do - -- off <- liftIO $ fdSeek fh SeekFromEnd 0 - -- pure (fromIntegral off, fromIntegral len) - liftIO (Posix.fdWrite fh section) <&> fromIntegral - - {-# INLINE appendSection #-} openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do - fname <- liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data" + fname <- ncqGetNewFossilName ncq touch fname let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) @@ -535,9 +509,14 @@ ncqAddTrackedFile ncq@NCQStorage2{..} fkey = flip runContT pure $ callCC \exit - -- FIXME: maybe-creation-time-actually let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat let fk = fromString (takeFileName fname) - atomically do + + atomically $ ncqAddTrackedFileSTM ncq fk ts + pure True + +ncqAddTrackedFileSTM :: NCQStorage2 -> FileKey -> TimeSpec -> STM () +ncqAddTrackedFileSTM NCQStorage2{..} fk ts = do modifyTVar' ncqTrackedFiles (HPSQ.insert fk (FilePrio (Down ts)) Nothing) - pure True +{-# INLINE ncqAddTrackedFileSTM #-} evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM () evictIfNeededSTM NCQStorage2{..} howMany = do @@ -640,7 +619,202 @@ ncqRepair me@NCQStorage2{} = do Right{} -> do err $ "skip indexing" <+> pretty dataFile - ncqRefHash :: NCQStorage2 -> HashRef -> HashRef ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) +ncqRunTaskNoMatterWhat :: MonadUnliftIO m => NCQStorage2 -> m a -> m a +ncqRunTaskNoMatterWhat NCQStorage2{..} task = do + atomically (modifyTVar ncqStorageTasks succ) + task `finally` atomically (modifyTVar ncqStorageTasks pred) + +ncqRunTask :: MonadUnliftIO m => NCQStorage2 -> a -> m a -> m a +ncqRunTask ncq@NCQStorage2{..} def task = readTVarIO ncqStorageStopReq >>= \case + True -> pure def + False -> ncqRunTaskNoMatterWhat ncq task + +ncqWaitTasks :: MonadUnliftIO m => NCQStorage2 -> m () +ncqWaitTasks NCQStorage2{..} = atomically do + tno <- readTVar ncqStorageTasks + when (tno > 0) STM.retry + +ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool +ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT pure do + + liftIO do + + tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList + files <- for tracked $ \(f,p,_) -> do + let fn = ncqGetFileName ncq (toFileName $ DataFile f) + let idx = ncqGetFileName ncq (toFileName $ IndexFile f) + sz <- liftIO (fileSize fn) + idxHere <- doesFileExist idx + pure (f, sz, p, idxHere) + + let bothIdx a b = view _4 a && view _4 b + + let found = flip fix (files, Nothing, Nothing) $ \next -> \case + ([], _, r) -> r + + (a:b:rest, Nothing, _) | bothIdx a b -> do + next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) + + (a:b:rest, Just s, _ ) | bothIdx a b && view _2 a + view _2 b < s -> do + next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) + + (_:rest, s, r) -> do + next (rest, s, r) + + case found of + Just (a,b) -> mergeStep a b >> pure True + _ -> pure False + + where + + ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath + ncqGetNewMergeName n@NCQStorage2{} = do + let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data") + liftIO $ emptyTempFile p tpl + + mergeStep (a,_,p1,_) (b,_,p2,_) = do + debug $ "merge" <+> pretty a <+> pretty b + + let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a) + let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a) + + let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b) + let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b) + + debug $ "file A" <+> pretty (timeSpecFromFilePrio p1) <+> pretty fDataNameA <+> pretty fIndexNameA + debug $ "file B" <+> pretty (timeSpecFromFilePrio p2) <+> pretty fDataNameB <+> pretty fIndexNameB + + doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) + doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) + doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA) + + flip runContT pure $ callCC \exit -> do + + mfile <- ncqGetNewMergeName ncq + + ContT $ bracket none $ const do + rm mfile + + liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do + + debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile) + + (mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA + >>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA)) + + debug $ "SCAN FILE A" <+> pretty fDataNameA + + writeFiltered ncq fDataNameA fwh $ \_ _ _ _ -> do + pure True + + debug $ "SCAN FILE B" <+> pretty fDataNameA + + writeFiltered ncq fDataNameB fwh $ \_ _ k _ -> do + foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust + let skip = foundInA + pure $ not skip + + appendTailSection =<< handleToFd fwh + + liftIO do + + result <- fileSize mfile + + idx <- if result == 0 then + pure Nothing + else do + fossil <- ncqGetNewFossilName ncq + mv mfile fossil + statA <- getFileStatus fDataNameA + let ts = modificationTimeHiRes statA + setFileTimesHiRes fossil ts ts + let fk = DataFile (fromString fossil) + void $ ncqIndexFile ncq fk + pure $ Just (ts,fk) + + atomically do + modifyTVar ncqTrackedFiles (HPSQ.delete a) + modifyTVar ncqTrackedFiles (HPSQ.delete b) + for_ idx $ \(ts,fk) -> do + ncqAddTrackedFileSTM ncq (coerce fk) (posixToTimeSpec ts) + + mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] + + + orFail what e = do + r <- what + unless r (throwIO (NCQMergeInvariantFailed (show e))) + + +writeFiltered :: forall m . MonadIO m + => NCQStorage2 + -> FilePath + -> Handle + -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) + -> m () + +writeFiltered ncq fn out filt = do + ncqStorageScanDataFile ncq fn $ \o s k v -> do + skip <- filt o s k v <&> not + + when skip do + debug $ pretty k <+> pretty "skipped" + + unless skip $ liftIO do + BS.hPut out (LBS.toStrict (makeEntryLBS k v)) + + where + + makeEntryLBS h bs = do + let b = byteString (coerce @_ @ByteString h) + <> byteString bs + + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = byteString (N.bytestring32 (fromIntegral len)) + + toLazyByteString (ws <> b) + + +zeroSyncEntry :: ByteString +zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload + where zeroPayload = N.bytestring64 0 + zeroHash = HashRef (hashObject zeroPayload) +{-# INLINE zeroSyncEntry #-} + +zeroSyncEntrySize :: Word64 +zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) +{-# INLINE zeroSyncEntrySize #-} + +-- 1. It's B-record +-- 2. It's last w64be == fileSize +-- 3. It's hash == hash (bytestring64be fileSize) +-- 4. recovery-strategy: start-to-end, end-to-start +fileTailRecord :: Integral a => a -> ByteString +fileTailRecord w = do + -- on open: last w64be == fileSize + let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) + let h = hashObject @HbSync paylo & coerce + ncqMakeSectionBS (Just M) h paylo +{-# INLINE fileTailRecord #-} + +appendSection :: forall m . MonadUnliftIO m + => Fd + -> ByteString + -> m Int -- (FOff, Int) + +appendSection fh sect = do + -- off <- liftIO $ fdSeek fh SeekFromEnd 0 + -- pure (fromIntegral off, fromIntegral len) + liftIO (Posix.fdWrite fh sect) <&> fromIntegral +{-# INLINE appendSection #-} + +appendTailSection :: MonadIO m => Fd -> m () +appendTailSection fh = liftIO do + s <- Posix.fileSize <$> Posix.getFdStatus fh + void (appendSection fh (fileTailRecord s)) +{-# INLINE appendTailSection #-} + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 980e8097..4e4c8cc6 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -621,6 +621,99 @@ testNCQ2Simple1 TestEnv{..} = do -- debug $ fill 44 (pretty ha) <+> fill 8 (pretty found) +genRandomBS :: forall g m . (Monad m, StatefulGen g m) => g -> Int -> m ByteString +genRandomBS g n = do + n <- (`mod` (64*1024)) <$> uniformM @Int g + uniformByteStringM n g + +sec6 :: RealFrac a => a -> Fixed E6 +sec6 = realToFrac + +sec2 :: RealFrac a => a -> Fixed E2 +sec2 = realToFrac + +sec3 :: RealFrac a => a -> Fixed E3 +sec3 = realToFrac + +testNCQ2Merge1 :: MonadUnliftIO m + => Int + -> TestEnv + -> m () + +testNCQ2Merge1 n TestEnv{..} = do + let tmp = testEnvDir + let ncqDir = tmp + + g <- liftIO MWC.createSystemRandom + + let fake = n `div` 3 + + ncqWithStorage ncqDir $ \sto -> liftIO do + + notice $ "write" <+> pretty n <+> "random blocks" + + ws <- flip fix (mempty :: HashSet HashRef) $ \loop -> \case + hs | HS.size hs >= n -> pure hs + | otherwise -> do + + s <- liftIO $ genRandomBS g (256 * 1024) + h <- ncqPutBS sto (Just B) Nothing s + loop (HS.insert h hs) + + notice $ "written" <+> pretty (HS.size ws) + + assertBool "all written" (HS.size ws == n) + + nHashes <- HS.fromList . filter (not . flip HS.member ws) <$> replicateM fake do + liftIO (genRandomBS g (64*1024)) <&> HashRef . hashObject + + notice $ "gen" <+> pretty (HS.size nHashes) <+> pretty "missed hashes" + + + (t1,n1) <- over _2 sum <$> timeItT do + for (HS.toList ws) $ \h -> do + r <- ncqLocate2 sto h + + unless (isJust r) do + err $ "not found" <+> pretty h + + pure $ maybe 0 (const 1) r + + notice $ pretty (sec3 t1) <+> pretty n1 <+> pretty (n1 == HS.size ws) + + assertBool "all written" (n1 == HS.size ws) + + ncqWaitTasks sto + + let hashes = HS.toList ws <> HS.toList nHashes + + (t2,_) <- timeItT do + for hashes $ \h -> do + r <- ncqLocate2 sto h + pure $ maybe 0 (const 1) r + + notice $ "before-merge" <+> pretty (sec3 t1) <+> pretty (List.length hashes) + + notice $ "merge whatever possible" + + n <- flip fix 0 \next i -> do + N2.ncqStorageMergeStep sto >>= \case + False -> pure i + True -> next (succ i) + + notice $ "merged" <+> pretty n + + (t3,r) <- timeItT do + for hashes $ \h -> do + ncqLocate2 sto h >>= \case + Nothing -> pure $ Left h + Just{} -> pure $ Right h + + let w1 = HS.fromList (rights r) + let n2 = HS.fromList (lefts r) + + notice $ "after-merge" <+> pretty (sec3 t3) <+> pretty (HS.size w1) <+> pretty (HS.size n2) + testFilterEmulate1 :: MonadUnliftIO m => Int -> TestEnv @@ -945,30 +1038,6 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq2:concurrent1" $ nil_ $ \case - [ LitIntVal tn, LitIntVal n ] -> do - debug $ "ncq2:concurrent1" <+> pretty tn <+> pretty n - runTest $ testNCQ2Concurrent1 False ( fromIntegral tn) (fromIntegral n) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do - runTest testNCQ2Simple1 - - entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do - runTest testNCQ2Repair1 - - entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case - [ StringLike fn ] -> do - ncqFileFastCheck fn - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case - [ LitIntVal tn, LitIntVal n ] -> do - runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n) - - e -> throwIO $ BadFormException @C (mkList e) entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do @@ -1005,6 +1074,37 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq2:merge1" $ nil_ $ \case + [ LitIntVal n ] -> do + runTest $ testNCQ2Merge1 (fromIntegral n) + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq2:concurrent1" $ nil_ $ \case + [ LitIntVal tn, LitIntVal n ] -> do + debug $ "ncq2:concurrent1" <+> pretty tn <+> pretty n + runTest $ testNCQ2Concurrent1 False ( fromIntegral tn) (fromIntegral n) + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq2:simple1" $ nil_ $ const $ do + runTest testNCQ2Simple1 + + entry $ bindMatch "test:ncq2:repair1" $ nil_ $ const $ do + runTest testNCQ2Repair1 + + entry $ bindMatch "test:ncq2:filefastcheck" $ nil_ $ \case + [ StringLike fn ] -> do + ncqFileFastCheck fn + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq2:concurrent:write:simple1" $ nil_ $ \case + [ LitIntVal tn, LitIntVal n ] -> do + runTest $ testNCQ2ConcurrentWriteSimple1 ( fromIntegral tn) (fromIntegral n) + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:filter:emulate-1" $ nil_ $ \case [ LitIntVal n ] -> runTest $ testFilterEmulate1 (fromIntegral n)