diff --git a/Makefile b/Makefile index cec88f99..d790e350 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,7 @@ BINS := \ hbs2-git3 \ git-remote-hbs23 \ hbs2-ncq \ + tcq \ RT_DIR := tests/RT diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 51100dc1..70f62cd1 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -45,6 +45,7 @@ import Lens.Micro.Platform import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM +import System.Directory (makeAbsolute) import System.FilePath.Posix import System.Posix.Fcntl import System.Posix.Files qualified as Posix @@ -119,6 +120,7 @@ data NCQStorage = , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) + , ncqIndexNow :: TVar Int , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqCachedEntries :: TVar Int , ncqNotWritten :: TVar Word64 @@ -308,7 +310,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do indexer <- makeIndexer indexQ writer <- makeWriter indexQ - mapM_ waitCatch [writer,indexer,refsWriter] + mapM_ waitCatch [writer,refsWriter] + -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] mapM_ cancel [reader] where @@ -381,7 +384,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do -- FIXME: timeout-hardcode void $ race (pause @'Seconds 1) $ atomically do - void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ + q <- tryPeekTQueue myFlushQ + s <- readTVar ncqStopped + if not (isJust q || s) then + STM.retry + else do + STM.flushTQueue myFlushQ dirty <- readTVarIO ncqRefsDirty @@ -405,20 +413,28 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do makeIndexer indexQ = do indexer <- ContT $ withAsync $ untilStopped do + debug $ "STARTED INDEXER" + what' <- race (pause @'Seconds 1) $ atomically do - peekTQueue indexQ >> STM.flushTQueue indexQ + stop <- readTVar ncqStopped + q <- tryPeekTQueue indexQ + if not (stop || isJust q) then + STM.retry + else do + STM.flushTQueue indexQ let what = fromRight mempty what' for_ what $ \(fd,fn) -> do - (key, added) <- ncqIndexFile ncq fn <&> over _2 HS.fromList + debug $ "FUCKING WRITE INDEX" <+> pretty fn - atomically do - r <- readTVar ncqWaitIndex <&> HPSQ.toList - let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)] - writeTVar ncqWaitIndex (HPSQ.fromList new) + (key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList + -- atomically do + -- r <- readTVar ncqWaitIndex <&> HPSQ.toList + -- let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)] + -- writeTVar ncqWaitIndex (HPSQ.fromList new) ncqAddTrackedFilesIO ncq [key] atomically do @@ -489,7 +505,9 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size)) - when (fromIntegral size >= ncqMinLog) do + indexNow <- readTVarIO ncqIndexNow + + when (fromIntegral size >= ncqMinLog || indexNow > 0) do (n,u) <- atomically do r <- readTVar ncqCurrentHandleR <&> fromIntegral @@ -505,6 +523,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do mv current fossilized atomically do + writeTVar ncqIndexNow 0 r <- readTVar ncqCurrentHandleR -- NOTE: extra-use -- добавляем лишний 1 для индексации. @@ -831,7 +850,8 @@ ncqFixIndexes ncq@NCQStorage{..} = do ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage -ncqStorageOpen fp = do +ncqStorageOpen fp' = do + fp <- liftIO $ makeAbsolute fp' ncq@NCQStorage{..} <- ncqStorageInit_ False fp ncqReadTrackedFiles ncq ncqFixIndexes ncq @@ -926,6 +946,7 @@ ncqStorageInit_ check path = do ncqStopped <- newTVarIO False ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 + ncqIndexNow <- newTVarIO 0 let currentName = ncqGetCurrentName_ path ncqGen @@ -973,6 +994,9 @@ ncqStorageInit_ check path = do pure ncq +ncqIndexRightNow :: MonadUnliftIO m => NCQStorage -> m () +ncqIndexRightNow NCQStorage{..} = atomically $ modifyTVar ncqIndexNow succ + withNCQ :: forall m a . MonadUnliftIO m => (NCQStorage -> NCQStorage) -> FilePath diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 77ab447b..1162e01c 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1200,3 +1200,24 @@ executable test-cq-storage , unix +executable tcq + import: shared-properties + import: common-deps + default-language: Haskell2010 + ghc-options: + hs-source-dirs: test + main-is: TCQ.hs + build-depends: + base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq + , network + , string-conversions + , db-pipe + , suckless-conf + , network-byte-order + , text + , time + , mmap + , zstd + , unix + + diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs new file mode 100644 index 00000000..838f9ea1 --- /dev/null +++ b/hbs2-tests/test/TCQ.hs @@ -0,0 +1,219 @@ +{-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} +{-# Language RecordWildCards #-} +{-# Language ViewPatterns #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Clock +import HBS2.Merkle + +import HBS2.Storage +import HBS2.Storage.Simple +import HBS2.Storage.Operations.ByteString + +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Storage.NCQ +import HBS2.Data.Log.Structured.NCQ + + +import Data.Config.Suckless.Syntax +import Data.Config.Suckless.Script +import Data.Config.Suckless.System + +import DBPipe.SQLite hiding (field) + +import Data.Bits +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Text.Encoding qualified as TE +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Builder +import Data.Maybe +import Data.Word +import Data.List qualified as List +import Data.Vector qualified as V +import Data.Vector ((!)) +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Network.ByteOrder qualified as N +import Data.Coerce +import Data.HashPSQ qualified as HPSQ +import Data.HashSet qualified as HS +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.Fixed +import System.Environment +import System.Directory +import System.Posix.Fcntl +import System.Posix.IO +import System.IO.MMap +import System.IO qualified as IO +import System.Exit (exitSuccess, exitFailure) +import System.Random +import Safe +import Lens.Micro.Platform +import Control.Concurrent.STM qualified as STM + +import UnliftIO + +import Text.InterpolatedString.Perl6 (qc) + +import Streaming.Prelude qualified as S +import System.TimeIt + +import System.IO.Unsafe (unsafePerformIO) + +{- HLINT ignore "Functor law" -} + +setupLogger :: MonadIO m => m () +setupLogger = do + -- setLogging @DEBUG $ toStderr . logPrefix "[debug] " + setLogging @ERROR $ toStderr . logPrefix "[error] " + setLogging @WARN $ toStderr . logPrefix "[warn] " + setLogging @NOTICE $ toStdout . logPrefix "" + +flushLoggers :: MonadIO m => m () +flushLoggers = do + silence + +silence :: MonadIO m => m () +silence = do + setLoggingOff @DEBUG + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + + +data TCQError = + TCQAlreadyOpen FilePath + | TCQGone FilePath + deriving stock (Show,Typeable) + +instance Exception TCQError + +newtype TCQ = + TCQ FilePath + deriving newtype (Eq,Ord,Show,Typeable) + +main :: IO () +main = do + + instances <- newTVarIO (mempty :: HashMap FilePath (NCQStorage, Async ())) + + tvd <- newTVarIO mempty + + let finalizeStorages = do + debug "finalize ncq" + r <- readTVarIO instances <&> HM.toList + mapM_ ncqStorageStop (fmap (fst.snd) r) + mapM_ wait (fmap (snd.snd) r) + + let getNCQ (TCQ p) = do + readTVarIO instances + <&> HM.lookup p + <&> fmap fst + >>= orThrow (TCQGone p) + + let dict = makeDict @C do + + entry $ bindMatch "--help" $ nil_ \case + HelpEntryBound what -> helpEntry what + [StringLike s] -> helpList False (Just s) + _ -> helpList False Nothing + + internalEntries + + entry $ bindMatch "--run" $ \case + [ StringLike what ] -> liftIO do + liftIO (readFile what) + <&> parseTop + >>= either (error.show) pure + >>= runEval tvd + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "debug" $ nil_ \case + + [ LitBoolVal False ] -> do + setLoggingOff @DEBUG + + [ StringLike "off" ] -> do + setLoggingOff @DEBUG + + _ -> + setLogging @DEBUG $ toStderr . logPrefix "[debug] " + + entry $ bindMatch "ncq:open" $ \case + [ StringLike path ] -> do + debug $ "ncq:open" <+> pretty path + ncq <- ncqStorageOpen path + r <- async (ncqStorageRun ncq) + + e <- atomically do + already <- readTVar instances <&> HM.member path + if already then + pure $ Left $ TCQAlreadyOpen path + else do + modifyTVar instances (HM.insert path (ncq,r)) + pure $ Right () + + either throwIO pure e + + mkOpaque (TCQ path) + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "ncq:poke" $ \case + [ isOpaqueOf @TCQ -> Just tcq ] -> lift do + ncq <- getNCQ tcq + pure $ mkSym "okay" + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "ncq:get" $ \case + [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do + ncq <- getNCQ tcq + ncqStorageGet ncq hash >>= maybe (pure nil) mkOpaque + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "ncq:put" $ \syn -> do + (tcq,bs) <- case syn of + [ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do + pure (tcq, LBS.fromStrict bs) + + [ isOpaqueOf @TCQ -> Just tcq, TextLike s ] -> lift do + pure (tcq, LBS.fromStrict (TE.encodeUtf8 s)) + + e -> throwIO $ BadFormException @C (mkList e) + + lift do + ncq <- getNCQ tcq + r <- ncqStoragePut ncq bs + pure $ maybe nil (mkSym . show . pretty) r + + + setupLogger + + argz <- liftIO getArgs + + forms <- parseTop (unlines $ unwords <$> splitForms argz) + & either (error.show) pure + + atomically $ writeTVar tvd dict + + (runEval tvd forms >>= eatNil display) + `finally` (finalizeStorages >> flushLoggers) + + + diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 7c04828a..90f16880 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -103,6 +103,15 @@ main = do internalEntries + entry $ bindMatch "run" $ \case + [ StringLike what ] -> do + liftIO (readFile what) + <&> parseTop + >>= either (error.show) pure + >>= evalTop + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:sqlite" $ nil_ $ \case [StringLike fn] -> liftIO do hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines @@ -514,6 +523,17 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:index:now" $ nil_ \case + [StringLike p] -> lift do + withNCQ id p $ \ncq -> do + display_ $ "test:ncq:index:now" <+> pretty p + ncqIndexRightNow ncq + pause @'Seconds 10 + display_ $ "test:ncq:index:now" <+> pretty p <+> "done" + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:run" $ nil_ \case [StringLike p] -> lift do withNCQ id p $ \_ -> do