This commit is contained in:
voidlizard 2025-03-18 06:40:38 +03:00 committed by Dmitry Zuykov
parent d7dadfed41
commit b8b2ed4d14
2 changed files with 45 additions and 21 deletions

View File

@ -40,6 +40,7 @@ import Data.Either
import Data.Maybe import Data.Maybe
import Data.Text qualified as Text import Data.Text qualified as Text
import Data.Text.IO qualified as Text import Data.Text.IO qualified as Text
import Data.Int
import Lens.Micro.Platform import Lens.Micro.Platform
import Data.HashSet (HashSet) import Data.HashSet (HashSet)
import Data.HashSet qualified as HS import Data.HashSet qualified as HS
@ -94,8 +95,8 @@ data NCQStorage =
, ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsMem :: TVar (HashMap HashRef HashRef)
, ncqRefsDirty :: TVar Int , ncqRefsDirty :: TVar Int
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString)
, ncqDeleted :: TVar (HashSet HashRef) , ncqDeleted :: TVar (HashMap HashRef Int16)
, ncqDeleteQ :: TBQueue HashRef , ncqDeleteQ :: TBQueue (HashRef, Int16)
, ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64))
, ncqTrackedFiles :: TVar (HashSet FileKey) , ncqTrackedFiles :: TVar (HashSet FileKey)
, ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash)) , ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash))
@ -252,6 +253,7 @@ ncqIndexFile n@NCQStorage{} fp' = do
ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageStop ncq@NCQStorage{..} = do ncqStorageStop ncq@NCQStorage{..} = do
debug "ncqStorageStop"
ncqStorageSync ncq ncqStorageSync ncq
atomically $ writeTVar ncqStopped True atomically $ writeTVar ncqStopped True
atomically do atomically do
@ -412,20 +414,28 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
fix \next -> do fix \next -> do
debug "BEBEBEBE"
void $ race (pause @'Seconds 1) $ atomically do void $ race (pause @'Seconds 1) $ atomically do
stop <- readTVar ncqStopped stop <- readTVar ncqStopped
flush <- isEmptyTQueue myFlushQ <&> not -- flush <- isEmptyTQueue myFlushQ <&> not
size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt) size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt)
unless (flush || size || stop) STM.retry -- unless (flush || size || stop) STM.retry
unless (size || stop) STM.retry
toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ
liftIO do liftIO do
w <- readTVarIO ncqDeletedW w <- readTVarIO ncqDeletedW
for_ toWrite $ \hx -> do for_ toWrite $ \(hx,delta) -> do
let sdelta = N.bytestring16 (fromIntegral delta)
let k = coerce @_ @ByteString hx let k = coerce @_ @ByteString hx
_ <- Posix.fdWrite w (N.bytestring32 (fromIntegral $ BS.length k)) let size = BS.length k + BS.length sdelta
Posix.fdWrite w k let deleted = mconcat [ N.bytestring32 (fromIntegral size)
, k
, sdelta
]
void $ Posix.fdWrite w deleted
debug $ "DELETED" <+> pretty hx debug $ "DELETED" <+> pretty hx
fileSynchronise w fileSynchronise w
@ -553,8 +563,14 @@ ncqStoragePut ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do
let h = hashObject @HbSync lbs & coerce let h = hashObject @HbSync lbs & coerce
ncqLocate ncq h >>= \case ncqLocate ncq h >>= \case
Just{} -> exit (Just h) Nothing -> none
_ -> none Just{} -> do
d <- readTVarIO ncqDeleted <&> fromMaybe 0 . HM.lookup h
if d < 1 then
exit (Just h)
else do
let delta = negate d - 1
atomically $ writeTBQueue ncqDeleteQ (h, delta)
now <- getTimeCoarse now <- getTimeCoarse
atomically do atomically do
@ -636,7 +652,7 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer)
ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do
readTVarIO ncqDeleted <&> not . HS.member h >>= guard ncqStorageIsDeleted ncq h >>= guard . not
toMPlus =<< (ncqLocate ncq h <&> fmap ncqLocatedSize) toMPlus =<< (ncqLocate ncq h <&> fmap ncqLocatedSize)
ncqStorageScanDataFile :: MonadIO m ncqStorageScanDataFile :: MonadIO m
@ -666,10 +682,15 @@ ncqStorageScanDataFile ncq fp' action = do
next (4 + o + fromIntegral w, BS.drop (w+4) bs) next (4 + o + fromIntegral w, BS.drop (w+4) bs)
ncqStorageIsDeleted :: MonadIO m => NCQStorage -> HashRef -> m Bool
ncqStorageIsDeleted NCQStorage{..} what = do
readTVarIO ncqDeleted <&> (>0) . fromMaybe 0 . HM.lookup what
ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString)
ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
deleted <- readTVarIO ncqDeleted <&> HS.member h deleted <- ncqStorageIsDeleted ncq h
when deleted $ exit Nothing when deleted $ exit Nothing
@ -727,10 +748,11 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
_ -> none _ -> none
atomically do atomically do
already <- readTVar ncqDeleted <&> HS.member h what <- readTVar ncqDeleted <&> fromMaybe 0 . HM.lookup h
unless already do when (what < 1) do
writeTBQueue ncqDeleteQ h let delta = negate what + 1
modifyTVar ncqDeleted (HS.insert h) writeTBQueue ncqDeleteQ (h, delta)
modifyTVar ncqDeleted (HM.insertWith (+) h delta)
ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageSync NCQStorage{..} = do ncqStorageSync NCQStorage{..} = do
@ -797,7 +819,7 @@ ncqStorageOpen fp = do
-- liftIO $ print $ pretty "FILE" <+> pretty fn -- liftIO $ print $ pretty "FILE" <+> pretty fn
bs0 <- liftIO $ mmapFileByteString fn Nothing bs0 <- liftIO $ mmapFileByteString fn Nothing
items <- HS.fromList <$> S.toList_ do items <- HM.fromListWith (+) <$> S.toList_ do
flip runContT pure $ callCC \exit -> do flip runContT pure $ callCC \exit -> do
flip fix bs0 $ \next bs -> do flip fix bs0 $ \next bs -> do
when (BS.length bs < 4) $ exit () when (BS.length bs < 4) $ exit ()
@ -809,10 +831,12 @@ ncqStorageOpen fp = do
exit () exit ()
let k = BS.take 32 p & coerce . BS.copy let k = BS.take 32 p & coerce . BS.copy
lift $ S.yield (k :: HashRef) let v = BS.take 2 (BS.drop 32 p) & N.word16 & fromIntegral @_ @Int16
lift $ S.yield (k,v)
next (BS.drop (w+4) bs) next (BS.drop (w+4) bs)
debug $ "NCQStorage.deleted" <+> pretty (HS.size items) debug $ "NCQStorage.deleted" <+> pretty (HM.size items)
atomically $ writeTVar ncqDeleted items atomically $ writeTVar ncqDeleted items
readCurrent ncq@NCQStorage{..} = do readCurrent ncq@NCQStorage{..} = do
@ -838,7 +862,7 @@ ncqStorageOpen fp = do
let k = BS.take 32 p & coerce . BS.copy let k = BS.take 32 p & coerce . BS.copy
let vs = w - 32 let vs = w - 32
unless (HS.member k deleted) do unless (fromMaybe 0 (HM.lookup k deleted) > 0) do
lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs)) lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs))
next (o+w+4, BS.drop (w+4) bs) next (o+w+4, BS.drop (w+4) bs)

View File

@ -493,6 +493,8 @@ main = do
none none
for_ hashes $ \h -> runMaybeT do for_ hashes $ \h -> runMaybeT do
already <- liftIO (ncqStorageHasBlock ncq h <&> isJust)
guard (not already)
blk <- getBlock sto (coerce h) >>= toMPlus blk <- getBlock sto (coerce h) >>= toMPlus
liftIO do liftIO do
let l = LBS.length blk let l = LBS.length blk
@ -528,8 +530,6 @@ main = do
liftIO $ ncqStorageStop ncq liftIO $ ncqStorageStop ncq
pause @'Seconds 5
wait writer wait writer
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)