wip, new block format

This commit is contained in:
Dmitry Zuykov 2025-05-14 12:39:20 +03:00
parent 38821dd138
commit 67acde04d6
2 changed files with 75 additions and 47 deletions

View File

@ -433,8 +433,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
Just (h,_,WQItem{..},rest) -> do Just (h,_,WQItem{..},rest) -> do
off <- fdSeek fh SeekFromEnd 0 off <- fdSeek fh SeekFromEnd 0
-- we really have to write tomb prefix here
let b = byteString (coerce @_ @ByteString h) let b = byteString (coerce @_ @ByteString h)
<> lazyByteString (fromMaybe mempty wqData) <> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData)
let wbs = toLazyByteString b let wbs = toLazyByteString b
let len = LBS.length wbs let len = LBS.length wbs
let ws = N.bytestring32 (fromIntegral len) let ws = N.bytestring32 (fromIntegral len)
@ -535,8 +538,14 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
debug $ "CLOSE FD" <+> pretty f debug $ "CLOSE FD" <+> pretty f
Posix.closeFd (fromIntegral f) Posix.closeFd (fromIntegral f)
ncqStoragePut_ :: MonadUnliftIO m => Bool -> NCQStorage -> LBS.ByteString -> m (Maybe HashRef) ncqStoragePut_ :: MonadUnliftIO m
ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do => Bool
-> NCQStorage
-> HashRef
-> LBS.ByteString
-> m (Maybe HashRef)
ncqStoragePut_ check ncq@NCQStorage{..} h lbs = flip runContT pure $ callCC \exit -> do
stoped <- readTVarIO ncqStopped stoped <- readTVarIO ncqStopped
@ -544,11 +553,10 @@ ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit
when (LBS.null lbs) $ exit Nothing when (LBS.null lbs) $ exit Nothing
let h = hashObject @HbSync lbs & coerce
when check do when check do
already <- lift (ncqStorageGet ncq h) already <- lift (ncqStorageGet ncq h)
when (isJust already) do let tomb = maybe False (not . ncqIsNotTomb) already
when (isJust already && not tomb) do
exit $ Just h exit $ Just h
now <- getTimeCoarse now <- getTimeCoarse
@ -558,8 +566,47 @@ ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit
modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs)) modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs))
pure (Just h) pure (Just h)
ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) ncqStoragePutBlock :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef)
ncqStoragePut = ncqStoragePut_ True ncqStoragePutBlock ncq lbs = ncqStoragePut_ True ncq h (LBS.fromStrict ncqBlockPrefix <> lbs)
where h = HashRef (hashObject lbs)
ncqIsNotTomb :: LBS.ByteString -> Bool
ncqIsNotTomb lbs = do
let (pre,_) = LBS.splitAt (fromIntegral ncqPrefixLen) lbs
pre /= LBS.fromStrict ncqTombPrefix
ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer)
ncqStorageHasBlock ncq h = runMaybeT do
location <- ncqLocate ncq h >>= toMPlus
let s = ncqLocatedSize location
if s > ncqPrefixLen then
pure (s - ncqPrefixLen)
else do
what <- lift (ncqStorageGet ncq h) >>= toMPlus
guard (ncqIsNotTomb what)
pure 0
ncqStorageGetBlock :: MonadUnliftIO m
=> NCQStorage
-> HashRef
-> m (Maybe LBS.ByteString)
ncqStorageGetBlock ncq h = runMaybeT do
lbs <- lift (ncqStorageGet ncq h) >>= toMPlus
guard (ncqIsNotTomb lbs)
pure $ LBS.drop (fromIntegral ncqPrefixLen) lbs
ncqPrefixLen :: Integer
ncqPrefixLen = 4
ncqRefPrefix :: ByteString
ncqRefPrefix = "R;;\x00"
ncqBlockPrefix :: ByteString
ncqBlockPrefix = "B;;\x00"
ncqTombPrefix :: ByteString
ncqTombPrefix = "T;;\x00"
ncqLocatedSize :: Location -> Integer ncqLocatedSize :: Location -> Integer
ncqLocatedSize = \case ncqLocatedSize = \case
@ -655,34 +702,6 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
, fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) ) , fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) )
ncqCheckDeleted :: Monad m
=> HashRef
-> Maybe Location
-> (Location -> m (Maybe a))
-> m (Maybe a)
ncqCheckDeleted _ Nothing _ = pure Nothing
ncqCheckDeleted h (Just loc) act = case loc of
InWriteQueue WQItem{ wqData = Nothing } -> pure Nothing
InWriteQueue WQItem{ wqData = Just _ } -> act loc
InFossil _ (_, l)
| l == 0 && h /= ncqEmptyDataHash -> pure Nothing
| otherwise -> act loc
InCurrent (_, l)
| l == 0 && h /= ncqEmptyDataHash -> pure Nothing
| otherwise -> act loc
ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer)
ncqStorageHasBlock ncq h = do
mloc <- ncqLocate ncq h
ncqCheckDeleted h mloc (pure . Just . ncqLocatedSize)
ncqStorageScanDataFile :: MonadIO m ncqStorageScanDataFile :: MonadIO m
=> NCQStorage => NCQStorage
-> FilePath -> FilePath
@ -714,11 +733,11 @@ ncqStorageScanDataFile ncq fp' action = do
ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString)
ncqStorageGet ncq@NCQStorage{..} h = do ncqStorageGet ncq@NCQStorage{..} h = do
location <- ncqLocate ncq h location <- ncqLocate ncq h
ncqCheckDeleted h location \case case location of
InWriteQueue WQItem{ wqData = Just lbs } -> do Just (InWriteQueue WQItem{ wqData = Just lbs }) -> do
pure $ Just lbs pure $ Just lbs
InCurrent (o,l) -> do Just (InCurrent (o,l)) -> do
r <- atomically do r <- atomically do
a <- newEmptyTMVar a <- newEmptyTMVar
fd <- readTVar ncqCurrentHandleR fd <- readTVar ncqCurrentHandleR
@ -728,7 +747,7 @@ ncqStorageGet ncq@NCQStorage{..} h = do
atomically (takeTMVar r) <&> Just . LBS.fromStrict atomically (takeTMVar r) <&> Just . LBS.fromStrict
InFossil ce (o,l) -> do Just (InFossil ce (o,l)) -> do
now <- getTimeCoarse now <- getTimeCoarse
atomically $ writeTVar (cachedTs ce) now atomically $ writeTVar (cachedTs ce) now
let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) (cachedMmapedData ce)) let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) (cachedMmapedData ce))
@ -754,7 +773,7 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
now <- getTimeCoarse now <- getTimeCoarse
let writeTombstone wq = do let writeTombstone wq = do
modifyTVar ncqWriteQueue (HPSQ.insert h now wq) modifyTVar ncqWriteQueue (HPSQ.insert h now wq)
modifyTVar ncqNotWritten (+ fromIntegral (4 + 32 + 0)) modifyTVar ncqNotWritten (+ fromIntegral (4 + 32 + ncqPrefixLen))
ncqLocate ncq h >>= atomically . \case ncqLocate ncq h >>= atomically . \case
Just (InFossil _ _) -> writeTombstone (WQItem False Nothing) Just (InFossil _ _) -> writeTombstone (WQItem False Nothing)

View File

@ -109,8 +109,8 @@ newtype TCQ =
deriving newtype (Eq,Ord,Show,Typeable) deriving newtype (Eq,Ord,Show,Typeable)
instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where
putBlock ncq lbs = fmap coerce <$> ncqStoragePut ncq lbs putBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs
enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePut ncq lbs enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs
getBlock ncq h = ncqStorageGet ncq (coerce h) getBlock ncq h = ncqStorageGet ncq (coerce h)
getChunk _ _ _ = error "getChunk not defined" getChunk _ _ _ = error "getChunk not defined"
hasBlock ncq = hasBlock ncq . coerce hasBlock ncq = hasBlock ncq . coerce
@ -257,7 +257,7 @@ main = do
entry $ bindMatch "ncq:get" $ \case entry $ bindMatch "ncq:get" $ \case
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
ncq <- getNCQ tcq ncq <- getNCQ tcq
ncqStorageGet ncq hash >>= maybe (pure nil) mkOpaque ncqStorageGetBlock ncq hash >>= maybe (pure nil) mkOpaque
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
@ -275,6 +275,15 @@ main = do
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:hash" $ \case
[ isOpaqueOf @ByteString -> Just bs ] -> lift do
pure $ mkSym ( show $ pretty $ hashObject @HbSync bs )
[ StringLike s ] -> lift do
pure $ mkSym ( show $ pretty $ hashObject @HbSync (BS8.pack s) )
e -> pure nil
entry $ bindMatch "ncq:put" $ \syn -> do entry $ bindMatch "ncq:put" $ \syn -> do
(tcq,bs) <- case syn of (tcq,bs) <- case syn of
[ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do [ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do
@ -287,7 +296,7 @@ main = do
lift do lift do
ncq <- getNCQ tcq ncq <- getNCQ tcq
r <- ncqStoragePut ncq bs r <- ncqStoragePutBlock ncq bs
pure $ maybe nil (mkSym . show . pretty) r pure $ maybe nil (mkSym . show . pretty) r
entry $ bindMatch "ncq:merkle:write" $ \syn -> do entry $ bindMatch "ncq:merkle:write" $ \syn -> do
@ -304,13 +313,13 @@ main = do
chu <- S.toList_ (readChunkedBS lbs (256*1024)) chu <- S.toList_ (readChunkedBS lbs (256*1024))
hashes <- forConcurrently chu $ \chunk -> do hashes <- forConcurrently chu $ \chunk -> do
ncqStoragePut ncq chunk >>= orThrowUser "can't save" ncqStoragePutBlock ncq chunk >>= orThrowUser "can't save"
-- FIXME: handle-hardcode -- FIXME: handle-hardcode
let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings
m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do
void $ ncqStoragePut ncq bss >>= orThrowUser "can't save" void $ ncqStoragePutBlock ncq bss >>= orThrowUser "can't save"
pure $ mkSym (show $ pretty m) pure $ mkSym (show $ pretty m)