diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 76aec3f1..c4253189 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -94,7 +94,8 @@ data NCQStorage = , ncqRefsMem :: TVar (HashMap HashRef HashRef) , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) - , ncqDeleteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) + , ncqDeleted :: TVar (HashSet HashRef) + , ncqDeleteQ :: TBQueue HashRef , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqTrackedFiles :: TVar (HashSet FileKey) , ncqCachedIndexes :: TVar (HashPSQ FileKey TimeSpec (ByteString,NWayHash)) @@ -103,6 +104,7 @@ data NCQStorage = , ncqLastWritten :: TVar TimeSpec , ncqCurrentHandleW :: TVar Fd , ncqCurrentHandleR :: TVar Fd + , ncqDeletedW :: TVar Fd , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) , ncqFlushNow :: TVar [TQueue ()] @@ -170,6 +172,10 @@ ncqGetErrorLogName :: NCQStorage -> FilePath ncqGetErrorLogName ncq = do ncqGetFileName ncq "errors.log" +ncqGetDeletedFileName :: NCQStorage -> FilePath +ncqGetDeletedFileName ncq = do + ncqGetFileName ncq "deleted.data" + -- ncqCheckCurrentSize :: MonadIO m => NCQStorage -> m (Either Integer Integer) -- ncqCheckCurrentSize ncq = liftIO $ readCurrent `catch` (\(_ :: IOError) -> pure $ Left 0) -- where @@ -248,124 +254,188 @@ ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do ncqStorageSync ncq atomically $ writeTVar ncqStopped True - atomically $ fix \next -> do - done <- readTVar ncqWriteQueue <&> HPSQ.null + atomically do + doneW <- readTVar ncqWriteQueue <&> HPSQ.null + doneD <- isEmptyTBQueue ncqDeleteQ + let done = doneW && doneD unless done STM.retry ncqStorageRun :: MonadUnliftIO m => NCQStorage -> m () ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do - let dumpTimeout = round 10e6 - let dumpData = 1024 ^ 2 - let syncData = fromIntegral ncqSyncSize - - let untilStopped m = fix \loop -> do - m >> readTVarIO ncqStopped >>= \case - False -> loop - _ -> debug "STOPPING THREAD" + indexQ <- newTQueueIO ContT $ bracket none $ const $ liftIO do -- writeJournal syncData readTVarIO ncqCurrentHandleW >>= closeFd + readTVarIO ncqDeletedW >>= closeFd debug "RUNNING STORAGE!" - -- cap <- (10*) <$> getNumCapabilities - cap <- getNumCapabilities + refsWriter <- makeRefsWriter + reader <- makeReader + indexer <- makeIndexer indexQ + writer <- makeWriter indexQ + delWriter <- makeDelWriter - refsWriter <- ContT $ withAsync do - myFlushQ <- newTQueueIO - atomically $ modifyTVar ncqFlushNow (myFlushQ:) - - untilStopped do - -- FIXME: timeout-hardcode - - void $ race (pause @'Seconds 1) $ atomically do - void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ - - dirty <- readTVarIO ncqRefsDirty - - when (dirty > 0) do - refs <- readTVarIO ncqRefsMem <&> HM.toList - withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do - for_ refs $ \(k,v) -> do - let ks = coerce @_ @ByteString k - let vs = coerce @_ @ByteString v - let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay - liftIO do - BS.hPutStr fh (N.bytestring32 (fromIntegral w)) - BS.hPutStr fh ks - BS.hPutStr fh vs - atomically $ writeTVar ncqRefsDirty 0 - - link refsWriter - - reader <- ContT $ withAsync $ untilStopped do - - reqs <- atomically do - xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) - when (List.null xs) STM.retry - pure xs - - for_ reqs $ \(fd,off,l,answ) -> liftIO do - atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) - fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) - bs <- Posix.fdRead fd (fromIntegral l) - atomically $ putTMVar answ bs - - link reader - - indexQ <- newTQueueIO - - indexer <- ContT $ withAsync $ untilStopped do - - what' <- race (pause @'Seconds 1) $ atomically do - peekTQueue indexQ >> STM.flushTQueue indexQ - - let what = fromRight mempty what' - - for_ what $ \(fd,fn) -> do - - key <- ncqIndexFile ncq fn <&> fromString @FileKey - - atomically do - ncqAddTrackedFilesSTM ncq [key] - modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) - - ncqLoadSomeIndexes ncq [key] - - link indexer - - writer <- ContT $ withAsync do - - myFlushQ <- newTQueueIO - atomically $ modifyTVar ncqFlushNow (myFlushQ:) - - untilStopped do - - flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do - void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ - pure True - - let flushNow = fromRight False flush - - now <- getTimeCoarse - lastW <- readTVarIO ncqLastWritten - bytes <- readTVarIO ncqNotWritten - - let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 - - stopped <- readTVarIO ncqStopped - - when (dumpByTime || bytes >= dumpData || flushNow || stopped) do - -- debug "NCQStorage: dump data!" - liftIO $ writeJournal indexQ syncData - - mapM_ waitCatch [writer,indexer,refsWriter] + mapM_ waitCatch [writer,indexer,refsWriter,delWriter] mapM_ cancel [reader] where + untilStopped m = fix \loop -> do + m >> readTVarIO ncqStopped >>= \case + False -> loop + _ -> debug "STOPPING THREAD" + + makeReader = do + cap <- getNumCapabilities + reader <- ContT $ withAsync $ untilStopped do + + reqs <- atomically do + xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) + when (List.null xs) STM.retry + pure xs + + for_ reqs $ \(fd,off,l,answ) -> liftIO do + atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) + bs <- Posix.fdRead fd (fromIntegral l) + atomically $ putTMVar answ bs + + link reader + pure reader + + + makeWriter indexQ = do + + let dumpTimeout = round 10e6 + let dumpData = 1024 ^ 2 + let syncData = fromIntegral ncqSyncSize + + writer <- ContT $ withAsync do + + myFlushQ <- newTQueueIO + atomically $ modifyTVar ncqFlushNow (myFlushQ:) + + fix \next -> do + + flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do + flush <- isEmptyTQueue myFlushQ <&> not + stop <- readTVar ncqStopped + if flush || stop then pure True else STM.retry + + void $ atomically (readTQueue myFlushQ >> STM.flushTQueue myFlushQ) + + let flushNow = fromRight False flush + + now <- getTimeCoarse + lastW <- readTVarIO ncqLastWritten + bytes <- readTVarIO ncqNotWritten + + let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 + + stopped <- readTVarIO ncqStopped + + when (dumpByTime || bytes >= dumpData || flushNow || stopped) do + -- debug "NCQStorage: dump data!" + liftIO $ writeJournal indexQ syncData + + done <- atomically $ readTVar ncqWriteQueue <&> HPSQ.null + + if done && stopped then none else next + + link writer + pure writer + + makeRefsWriter = do + refsWriter <- ContT $ withAsync do + myFlushQ <- newTQueueIO + atomically $ modifyTVar ncqFlushNow (myFlushQ:) + + untilStopped do + -- FIXME: timeout-hardcode + + void $ race (pause @'Seconds 1) $ atomically do + void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ + + dirty <- readTVarIO ncqRefsDirty + + when (dirty > 0) do + refs <- readTVarIO ncqRefsMem <&> HM.toList + withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do + for_ refs $ \(k,v) -> do + let ks = coerce @_ @ByteString k + let vs = coerce @_ @ByteString v + let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay + liftIO do + BS.hPutStr fh (N.bytestring32 (fromIntegral w)) + BS.hPutStr fh ks + BS.hPutStr fh vs + atomically $ writeTVar ncqRefsDirty 0 + + link refsWriter + pure refsWriter + + + makeIndexer indexQ = do + indexer <- ContT $ withAsync $ untilStopped do + + what' <- race (pause @'Seconds 1) $ atomically do + peekTQueue indexQ >> STM.flushTQueue indexQ + + let what = fromRight mempty what' + + for_ what $ \(fd,fn) -> do + + key <- ncqIndexFile ncq fn <&> fromString @FileKey + + atomically do + ncqAddTrackedFilesSTM ncq [key] + modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + + ncqLoadSomeIndexes ncq [key] + + link indexer + pure indexer + + makeDelWriter = do + + let fsyncAt = 150 + + delWriter <- ContT $ withAsync do + + myFlushQ <- newTQueueIO + atomically $ modifyTVar ncqFlushNow (myFlushQ:) + + debug "delWriter running" + + fix \next -> do + + void $ race (pause @'Seconds 1) $ atomically do + stop <- readTVar ncqStopped + flush <- isEmptyTQueue myFlushQ <&> not + size <- lengthTBQueue ncqDeleteQ <&> (>= fsyncAt) + unless (flush || size || stop) STM.retry + + toWrite <- atomically $ STM.flushTBQueue ncqDeleteQ + + liftIO do + w <- readTVarIO ncqDeletedW + for_ toWrite $ \hx -> do + let k = coerce @_ @ByteString hx + _ <- Posix.fdWrite w (N.bytestring32 (fromIntegral $ BS.length k)) + Posix.fdWrite w k + debug $ "DELETED" <+> pretty hx + fileSynchronise w + + atomically (isEmptyTBQueue ncqDeleteQ) >>= \case + True -> none + False -> next + + link delWriter + pure delWriter + writeJournal indexQ syncData = liftIO do trace $ "writeJournal" <+> pretty syncData @@ -565,7 +635,9 @@ ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs))) ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) -ncqStorageHasBlock ncq h = ncqLocate ncq h <&> fmap ncqLocatedSize +ncqStorageHasBlock ncq@NCQStorage{..} h = runMaybeT do + readTVarIO ncqDeleted <&> not . HS.member h >>= guard + toMPlus =<< (ncqLocate ncq h <&> fmap ncqLocatedSize) ncqStorageScanDataFile :: MonadIO m => NCQStorage @@ -595,7 +667,11 @@ ncqStorageScanDataFile ncq fp' action = do next (4 + o + fromIntegral w, BS.drop (w+4) bs) ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) -ncqStorageGet ncq@NCQStorage{..} h = do +ncqStorageGet ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do + + deleted <- readTVarIO ncqDeleted <&> HS.member h + + when deleted $ exit Nothing ncqLocate ncq h >>= \case Nothing -> pure Nothing @@ -642,6 +718,7 @@ ncqStorageSetRef NCQStorage{..} ref val = atomically do ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m () ncqStorageDelRef NCQStorage{..} ref = atomically do modifyTVar ncqRefsMem (HM.delete ref) + modifyTVar ncqRefsDirty succ ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m () ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do @@ -649,12 +726,11 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do True -> exit () _ -> none - atomically $ modifyTVar ncqWriteQueue (HPSQ.delete h) - - ncqLocate ncq h >>= \case - _ -> none - - error "not implemented yet" + atomically do + already <- readTVar ncqDeleted <&> HS.member h + unless already do + writeTBQueue ncqDeleteQ h + modifyTVar ncqDeleted (HS.insert h) ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do @@ -698,6 +774,7 @@ ncqStorageOpen fp = do ncqReadTrackedFiles ncq ncqFixIndexes ncq ncqLoadIndexes ncq + readDeleted ncq readCurrent ncq readRefs ncq atomically $ putTMVar ncqOpenDone True @@ -714,6 +791,30 @@ ncqStorageOpen fp = do S.yield (k,v) atomically $ writeTVar ncqRefsMem (HM.fromList kvs) + + readDeleted ncq@NCQStorage{..} = do + let fn = ncqGetDeletedFileName ncq + -- liftIO $ print $ pretty "FILE" <+> pretty fn + bs0 <- liftIO $ mmapFileByteString fn Nothing + + items <- HS.fromList <$> S.toList_ do + flip runContT pure $ callCC \exit -> do + flip fix bs0 $ \next bs -> do + when (BS.length bs < 4) $ exit () + let w = BS.take 4 bs & N.word32 & fromIntegral + let p = BS.take w (BS.drop 4 bs) + + when (BS.length p < w ) do + err $ "broken file" <+> pretty fn + exit () + + let k = BS.take 32 p & coerce . BS.copy + lift $ S.yield (k :: HashRef) + next (BS.drop (w+4) bs) + + debug $ "NCQStorage.deleted" <+> pretty (HS.size items) + atomically $ writeTVar ncqDeleted items + readCurrent ncq@NCQStorage{..} = do let fn = ncqGetCurrentName ncq -- liftIO $ print $ pretty "FILE" <+> pretty fn @@ -721,28 +822,28 @@ ncqStorageOpen fp = do now <- getTimeCoarse - flip runContT pure $ callCC \exit ->do - flip fix (0,bs0) $ \next (o,bs) -> do - when (BS.length bs < 4) $ exit () - let w = BS.take 4 bs & N.word32 & fromIntegral - let p = BS.take w (BS.drop 4 bs) + deleted <- readTVarIO ncqDeleted - when (BS.length p < w ) do - err $ "broken file" <+> pretty fn - exit () + items <- S.toList_ <$> + flip runContT pure $ callCC \exit ->do + flip fix (0,bs0) $ \next (o,bs) -> do + when (BS.length bs < 4) $ exit () + let w = BS.take 4 bs & N.word32 & fromIntegral + let p = BS.take w (BS.drop 4 bs) - let k = BS.take 32 p & coerce - let vs = w - 32 + when (BS.length p < w ) do + err $ "broken file" <+> pretty fn + exit () - -- trace $ "GOT RECORD" - -- <+> pretty w - -- <+> pretty k - -- <+> pretty o - -- <+> pretty vs + let k = BS.take 32 p & coerce . BS.copy + let vs = w - 32 - atomically $ modifyTVar ncqWaitIndex (HPSQ.insert k now (fromIntegral o, fromIntegral vs)) + unless (HS.member k deleted) do + lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs)) - next (o+w+4, BS.drop (w+4) bs) + next (o+w+4, BS.drop (w+4) bs) + + atomically $ writeTVar ncqWaitIndex (HPSQ.fromList items) ncqStorageInit :: MonadUnliftIO m => FilePath -> m NCQStorage ncqStorageInit = ncqStorageInit_ True @@ -780,7 +881,8 @@ ncqStorageInit_ check path = do let ncqMaxCachedData = ncqMaxCachedIdx `div` 2 ncqWriteQueue <- newTVarIO HPSQ.empty - ncqDeleteQueue <- newTVarIO HPSQ.empty + ncqDeleted <- newTVarIO mempty + ncqDeleteQ <- newTBQueueIO 3000 ncqNotWritten <- newTVarIO 0 ncqLastWritten <- getTimeCoarse >>= newTVarIO @@ -804,6 +906,7 @@ ncqStorageInit_ check path = do when hereCurrent $ liftIO do let ncqCurrentHandleW = undefined let ncqCurrentHandleR = undefined + let ncqDeletedW = undefined let ncq0 = NCQStorage{..} lastSz <- try @_ @IOException (BS.readFile currentSize) @@ -833,9 +936,15 @@ ncqStorageInit_ check path = do debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen) + ncqDeletedW <- newTVarIO undefined + let ncq = NCQStorage{..} touch (ncqGetRefsDataFileName ncq) + touch (ncqGetDeletedFileName ncq) + + liftIO (PosixBase.openFd (ncqGetDeletedFileName ncq) Posix.WriteOnly flags { append = True}) + >>= atomically . writeTVar ncqDeletedW pure ncq diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index b5fcc611..8d621c69 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -65,6 +65,7 @@ import Text.InterpolatedString.Perl6 (qc) import Streaming.Prelude qualified as S import System.TimeIt +import System.IO.Unsafe (unsafePerformIO) setupLogger :: MonadIO m => m () setupLogger = do @@ -85,6 +86,8 @@ silence = do setLoggingOff @NOTICE setLoggingOff @TRACE + + main :: IO () main = do @@ -503,6 +506,35 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:raw:del-some" $ nil_ \case + [StringLike fn] -> liftIO $ flip runContT pure do + + hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + ContT $ bracket none $ const do + none + + debug $ "TO DELETE" <+> pretty (length hashes) + + for_ hashes $ \h -> runMaybeT do + liftIO do + print $ "delete" <+> pretty h + ncqStorageDel ncq h + + liftIO $ ncqStorageStop ncq + + pause @'Seconds 5 + + wait writer + + e -> throwIO $ BadFormException @C (mkList e) + + setupLogger argz <- liftIO getArgs