diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index d57cb224..ef9ab28e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -100,7 +100,12 @@ data CachedEntry = } instance Show CachedEntry where - show _ = "" + show _ = "CachedEntry{...}" + +data WQItem = + WQItem { wqNew :: Bool + , wqData :: Maybe LBS.ByteString + } data NCQStorage = NCQStorage @@ -112,7 +117,7 @@ data NCQStorage = , ncqMaxCached :: Int , ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsDirty :: TVar Int - , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) + , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqCachedEntries :: TVar Int @@ -192,6 +197,9 @@ ncqGetDeletedFileName ncq = do ncqGetFileName ncq "deleted.data" +ncqEmptyDataHash :: HashRef +ncqEmptyDataHash = HashRef $ hashObject @HbSync (mempty :: ByteString) + ncqAddCachedSTM :: TimeSpec -- ^ now -> Int -- ^ limit -> TVar (HashPSQ FileKey TimeSpec a) -- ^ entry @@ -251,7 +259,7 @@ ncqWriteError ncq txt = liftIO do let msg = Text.pack $ show $ "error" <+> fill 12 (pretty p) <+> pretty txt <> line Text.appendFile (ncqGetErrorLogName ncq) msg -ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m FilePath +ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m (FilePath, [HashRef]) ncqIndexFile n@NCQStorage{} fp' = do let fp = ncqGetFileName n fp' @@ -273,7 +281,7 @@ ncqIndexFile n@NCQStorage{} fp' = do mv result fp - pure fp + pure (fp, fmap (coerce @_ @HashRef . fst) items) ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do @@ -401,7 +409,13 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do for_ what $ \(fd,fn) -> do - key <- ncqIndexFile ncq fn + (key, added) <- ncqIndexFile ncq fn <&> over _2 HS.fromList + + atomically do + r <- readTVar ncqWaitIndex <&> HPSQ.toList + let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)] + writeTVar ncqWaitIndex (HPSQ.fromList new) + ncqAddTrackedFilesIO ncq [key] atomically do @@ -420,23 +434,24 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do fdSeek fh SeekFromEnd 0 - init <- readTVarIO ncqWriteQueue + initQ <- readTVarIO ncqWriteQueue - wResult <- flip fix (0,init) \next (written,q) -> case HPSQ.minView q of + wResult <- flip fix (0,initQ) \next (written,q) -> case HPSQ.minView q of Nothing -> pure mempty - Just (h,_,bs,rest) -> do + Just (h,_,WQItem{..},rest) -> do off <- fdSeek fh SeekFromEnd 0 - let b = byteString (coerce @_ @ByteString h) <> lazyByteString bs + let b = byteString (coerce @_ @ByteString h) + <> lazyByteString (fromMaybe mempty wqData) let wbs = toLazyByteString b let len = LBS.length wbs let ws = N.bytestring32 (fromIntegral len) let w = 4 + len - liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) - - let kks = LBS.take 32 (toLazyByteString b) & coerce @_ @HashRef . LBS.toStrict - -- debug $ "WRITE SHIT!" <+> pretty len <+> pretty kks <+> pretty (LBS.length bs) + if isNothing wqData && wqNew then + pure () + else void do + liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) written' <- if written < syncData then do pure (written + w) @@ -519,8 +534,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug $ "CLOSE FD" <+> pretty f Posix.closeFd (fromIntegral f) -ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) -ncqStoragePut ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do +ncqStoragePut_ :: MonadUnliftIO m => Bool -> NCQStorage -> LBS.ByteString -> m (Maybe HashRef) +ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do stoped <- readTVarIO ncqStopped @@ -530,15 +545,24 @@ ncqStoragePut ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do let h = hashObject @HbSync lbs & coerce + when check do + already <- lift (ncqStorageGet ncq h) + when (isJust already) do + exit $ Just h + now <- getTimeCoarse atomically do - ql <- readTVar ncqWriteQueue <&> HPSQ.size - -- FIXME: hardcode - -- when (ql > 8192) STM.retry - modifyTVar ncqWriteQueue (HPSQ.insert h now lbs) + let wqi = WQItem True (Just lbs) + modifyTVar ncqWriteQueue (HPSQ.insert h now wqi) modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs)) pure (Just h) +ncqStoragePut :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) +ncqStoragePut = ncqStoragePut_ True + +ncqStoragePutFaster :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) +ncqStoragePutFaster = ncqStoragePut_ False + ncqLocatedSize :: Location -> Integer ncqLocatedSize = \case InWriteQueue lbs -> fromIntegral $ LBS.length lbs @@ -556,7 +580,6 @@ evictIfNeededSTM NCQStorage{..} howMany = do when (excess > 0) do files <- readTVar ncqTrackedFiles <&> HPSQ.toList - -- собрать [(ts, k, prio)] с чтением TVar oldest <- forM files \case (k, prio, Just ce) -> do ts <- readTVar (cachedTs ce) @@ -574,12 +597,16 @@ evictIfNeededSTM NCQStorage{..} howMany = do modifyTVar ncqCachedEntries (subtract 1) - ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do - -- сначала проверяем очередь и current l1 <- atomically do - inQ <- readTVar ncqWriteQueue <&> (fmap snd . HPSQ.lookup h) <&> fmap InWriteQueue + + inQ <- readTVar ncqWriteQueue + <&> (fmap snd . HPSQ.lookup h) + <&> \case + Just (WQItem{ wqData = Just bs}) -> Just (InWriteQueue bs) + _ -> Nothing + inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent pure (inQ <|> inC) @@ -633,10 +660,29 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do +ncqCheckDeleted :: Monad m + => HashRef + -> Maybe Location + -> (Location -> m (Maybe a)) + -> m (Maybe a) + +ncqCheckDeleted _ Nothing _ = pure Nothing + +ncqCheckDeleted h (Just loc) act = case loc of + InWriteQueue bs + | LBS.null bs && h /= ncqEmptyDataHash -> pure Nothing + | otherwise -> act loc + + InFossil _ (_, l) + | l == 0 && h /= ncqEmptyDataHash -> pure Nothing + | otherwise -> act loc + + _ -> act loc + ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) -ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do - ncqStorageIsDeleted ncq h >>= guard . not - toMPlus =<< (ncqLocate ncq h <&> fmap ncqLocatedSize) +ncqStorageHasBlock ncq h = do + mloc <- ncqLocate ncq h + ncqCheckDeleted h mloc (pure . Just . ncqLocatedSize) ncqStorageScanDataFile :: MonadIO m => NCQStorage @@ -666,36 +712,25 @@ ncqStorageScanDataFile ncq fp' action = do next (4 + o + fromIntegral w, BS.drop (w+4) bs) -ncqStorageIsDeleted :: MonadIO m => NCQStorage -> HashRef -> m Bool -ncqStorageIsDeleted NCQStorage{..} what = do - pure False - - ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) -ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do +ncqStorageGet ncq@NCQStorage{..} h = do + mloc <- ncqLocate ncq h + ncqCheckDeleted h mloc \case - deleted <- ncqStorageIsDeleted ncq h - when deleted $ exit Nothing - - ncqLocate ncq h >>= \case - Nothing -> pure Nothing - - Just (InWriteQueue lbs) -> + InWriteQueue lbs -> pure $ Just lbs - Just (InCurrent (o,l)) -> do - answ <- atomically do - a <- newEmptyTMVar - fd <- readTVar ncqCurrentHandleR - modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) - modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) - pure a - atomically $ takeTMVar answ <&> Just . LBS.fromStrict + InCurrent (o,l) -> atomically do + a <- newEmptyTMVar + fd <- readTVar ncqCurrentHandleR + modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) + modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) + Just . LBS.fromStrict <$> takeTMVar a - Just (InFossil CachedEntry{..} (o,l)) -> do + InFossil ce (o,l) -> do now <- getTimeCoarse - atomically $ writeTVar cachedTs now - let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) cachedMmapedData) + atomically $ writeTVar (cachedTs ce) now + let chunk = BS.take (fromIntegral l) (BS.drop (fromIntegral o + 4 + 32) (cachedMmapedData ce)) pure $ Just $ LBS.fromStrict chunk @@ -720,8 +755,16 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do True -> exit () _ -> none - error "ncqStorageDel not implemented" + now <- getTimeCoarse + let writeTombstone wq = do + modifyTVar ncqWriteQueue (HPSQ.insert h now wq) + modifyTVar ncqNotWritten (+ fromIntegral (4 + 32 + 0)) + ncqLocate ncq h >>= atomically . \case + Just (InFossil _ _) -> writeTombstone (WQItem False Nothing) + Just (InCurrent _) -> writeTombstone (WQItem False Nothing) + Just (InWriteQueue _) -> writeTombstone (WQItem True Nothing) + _ -> pure () ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do @@ -780,7 +823,7 @@ ncqFixIndexes ncq@NCQStorage{..} = do unless here do warn $ "missed-index" <+> pretty k let dataName = ncqGetDataFileName ncq k - newKey <- ncqIndexFile ncq dataName + (newKey,_) <- ncqIndexFile ncq dataName ncqAddTrackedFilesIO ncq [newKey] @@ -880,6 +923,7 @@ ncqStorageInit_ check path = do ncqStopped <- newTVarIO False ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 + ncqSeqNo <- newTVarIO 1 let currentName = ncqGetCurrentName_ path ncqGen diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 022f0a6e..6b480a01 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -240,7 +240,7 @@ main = do writer <- ContT $ withAsync $ ncqStorageRun ncq link writer - fres <- lift $ ncqIndexFile ncq fsrc + (fres,_) <- lift $ ncqIndexFile ncq fsrc pure $ mkSym fres