From 67acde04d6b79082045fbd0aeac91b0421065a8d Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Wed, 14 May 2025 12:39:20 +0300 Subject: [PATCH] wip, new block format --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 101 ++++++++++++++--------- hbs2-tests/test/TCQ.hs | 21 +++-- 2 files changed, 75 insertions(+), 47 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 6deed021..533b4f61 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -433,8 +433,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do Just (h,_,WQItem{..},rest) -> do off <- fdSeek fh SeekFromEnd 0 + + -- we really have to write tomb prefix here let b = byteString (coerce @_ @ByteString h) - <> lazyByteString (fromMaybe mempty wqData) + <> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData) + let wbs = toLazyByteString b let len = LBS.length wbs let ws = N.bytestring32 (fromIntegral len) @@ -535,8 +538,14 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug $ "CLOSE FD" <+> pretty f Posix.closeFd (fromIntegral f) -ncqStoragePut_ :: MonadUnliftIO m => Bool -> NCQStorage -> LBS.ByteString -> m (Maybe HashRef) -ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do +ncqStoragePut_ :: MonadUnliftIO m + => Bool + -> NCQStorage + -> HashRef + -> LBS.ByteString + -> m (Maybe HashRef) + +ncqStoragePut_ check ncq@NCQStorage{..} h lbs = flip runContT pure $ callCC \exit -> do stoped <- readTVarIO ncqStopped @@ -544,11 +553,10 @@ ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit when (LBS.null lbs) $ exit Nothing - let h = hashObject @HbSync lbs & coerce - when check do already <- lift (ncqStorageGet ncq h) - when (isJust already) do + let tomb = maybe False (not . ncqIsNotTomb) already + when (isJust already && not tomb) do exit $ Just h now <- getTimeCoarse @@ -558,8 +566,47 @@ ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs)) pure (Just h) -ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) -ncqStoragePut = ncqStoragePut_ True +ncqStoragePutBlock :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) +ncqStoragePutBlock ncq lbs = ncqStoragePut_ True ncq h (LBS.fromStrict ncqBlockPrefix <> lbs) + where h = HashRef (hashObject lbs) + +ncqIsNotTomb :: LBS.ByteString -> Bool +ncqIsNotTomb lbs = do + let (pre,_) = LBS.splitAt (fromIntegral ncqPrefixLen) lbs + pre /= LBS.fromStrict ncqTombPrefix + +ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) +ncqStorageHasBlock ncq h = runMaybeT do + location <- ncqLocate ncq h >>= toMPlus + let s = ncqLocatedSize location + if s > ncqPrefixLen then + pure (s - ncqPrefixLen) + else do + what <- lift (ncqStorageGet ncq h) >>= toMPlus + guard (ncqIsNotTomb what) + pure 0 + +ncqStorageGetBlock :: MonadUnliftIO m + => NCQStorage + -> HashRef + -> m (Maybe LBS.ByteString) + +ncqStorageGetBlock ncq h = runMaybeT do + lbs <- lift (ncqStorageGet ncq h) >>= toMPlus + guard (ncqIsNotTomb lbs) + pure $ LBS.drop (fromIntegral ncqPrefixLen) lbs + +ncqPrefixLen :: Integer +ncqPrefixLen = 4 + +ncqRefPrefix :: ByteString +ncqRefPrefix = "R;;\x00" + +ncqBlockPrefix :: ByteString +ncqBlockPrefix = "B;;\x00" + +ncqTombPrefix :: ByteString +ncqTombPrefix = "T;;\x00" ncqLocatedSize :: Location -> Integer ncqLocatedSize = \case @@ -655,34 +702,6 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do , fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) ) - - -ncqCheckDeleted :: Monad m - => HashRef - -> Maybe Location - -> (Location -> m (Maybe a)) - -> m (Maybe a) - -ncqCheckDeleted _ Nothing _ = pure Nothing - -ncqCheckDeleted h (Just loc) act = case loc of - InWriteQueue WQItem{ wqData = Nothing } -> pure Nothing - - InWriteQueue WQItem{ wqData = Just _ } -> act loc - - InFossil _ (_, l) - | l == 0 && h /= ncqEmptyDataHash -> pure Nothing - | otherwise -> act loc - - InCurrent (_, l) - | l == 0 && h /= ncqEmptyDataHash -> pure Nothing - | otherwise -> act loc - -ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) -ncqStorageHasBlock ncq h = do - mloc <- ncqLocate ncq h - ncqCheckDeleted h mloc (pure . Just . ncqLocatedSize) - ncqStorageScanDataFile :: MonadIO m => NCQStorage -> FilePath @@ -714,11 +733,11 @@ ncqStorageScanDataFile ncq fp' action = do ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet ncq@NCQStorage{..} h = do location <- ncqLocate ncq h - ncqCheckDeleted h location \case - InWriteQueue WQItem{ wqData = Just lbs } -> do + case location of + Just (InWriteQueue WQItem{ wqData = Just lbs }) -> do pure $ Just lbs - InCurrent (o,l) -> do + Just (InCurrent (o,l)) -> do r <- atomically do a <- newEmptyTMVar fd <- readTVar ncqCurrentHandleR @@ -728,7 +747,7 @@ ncqStorageGet ncq@NCQStorage{..} h = do atomically (takeTMVar r) <&> Just . LBS.fromStrict - InFossil ce (o,l) -> do + Just (InFossil ce (o,l)) -> do now <- getTimeCoarse atomically $ writeTVar (cachedTs ce) now let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) (cachedMmapedData ce)) @@ -754,7 +773,7 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do now <- getTimeCoarse let writeTombstone wq = do modifyTVar ncqWriteQueue (HPSQ.insert h now wq) - modifyTVar ncqNotWritten (+ fromIntegral (4 + 32 + 0)) + modifyTVar ncqNotWritten (+ fromIntegral (4 + 32 + ncqPrefixLen)) ncqLocate ncq h >>= atomically . \case Just (InFossil _ _) -> writeTombstone (WQItem False Nothing) diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs index 26b13723..a8d4f433 100644 --- a/hbs2-tests/test/TCQ.hs +++ b/hbs2-tests/test/TCQ.hs @@ -109,8 +109,8 @@ newtype TCQ = deriving newtype (Eq,Ord,Show,Typeable) instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where - putBlock ncq lbs = fmap coerce <$> ncqStoragePut ncq lbs - enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePut ncq lbs + putBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs + enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs getBlock ncq h = ncqStorageGet ncq (coerce h) getChunk _ _ _ = error "getChunk not defined" hasBlock ncq = hasBlock ncq . coerce @@ -257,7 +257,7 @@ main = do entry $ bindMatch "ncq:get" $ \case [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do ncq <- getNCQ tcq - ncqStorageGet ncq hash >>= maybe (pure nil) mkOpaque + ncqStorageGetBlock ncq hash >>= maybe (pure nil) mkOpaque e -> throwIO $ BadFormException @C (mkList e) @@ -275,6 +275,15 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:hash" $ \case + [ isOpaqueOf @ByteString -> Just bs ] -> lift do + pure $ mkSym ( show $ pretty $ hashObject @HbSync bs ) + + [ StringLike s ] -> lift do + pure $ mkSym ( show $ pretty $ hashObject @HbSync (BS8.pack s) ) + + e -> pure nil + entry $ bindMatch "ncq:put" $ \syn -> do (tcq,bs) <- case syn of [ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do @@ -287,7 +296,7 @@ main = do lift do ncq <- getNCQ tcq - r <- ncqStoragePut ncq bs + r <- ncqStoragePutBlock ncq bs pure $ maybe nil (mkSym . show . pretty) r entry $ bindMatch "ncq:merkle:write" $ \syn -> do @@ -304,13 +313,13 @@ main = do chu <- S.toList_ (readChunkedBS lbs (256*1024)) hashes <- forConcurrently chu $ \chunk -> do - ncqStoragePut ncq chunk >>= orThrowUser "can't save" + ncqStoragePutBlock ncq chunk >>= orThrowUser "can't save" -- FIXME: handle-hardcode let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do - void $ ncqStoragePut ncq bss >>= orThrowUser "can't save" + void $ ncqStoragePutBlock ncq bss >>= orThrowUser "can't save" pure $ mkSym (show $ pretty m)