This commit is contained in:
Dmitry Zuykov 2025-05-13 10:41:03 +03:00
parent 7196fcc54c
commit aa3d32387d
5 changed files with 295 additions and 10 deletions

View File

@ -27,6 +27,7 @@ BINS := \
hbs2-git3 \
git-remote-hbs23 \
hbs2-ncq \
tcq \
RT_DIR := tests/RT

View File

@ -45,6 +45,7 @@ import Lens.Micro.Platform
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.HashMap.Strict qualified as HM
import System.Directory (makeAbsolute)
import System.FilePath.Posix
import System.Posix.Fcntl
import System.Posix.Files qualified as Posix
@ -119,6 +120,7 @@ data NCQStorage =
, ncqRefsDirty :: TVar Int
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem)
, ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64))
, ncqIndexNow :: TVar Int
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
, ncqCachedEntries :: TVar Int
, ncqNotWritten :: TVar Word64
@ -308,7 +310,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
indexer <- makeIndexer indexQ
writer <- makeWriter indexQ
mapM_ waitCatch [writer,indexer,refsWriter]
mapM_ waitCatch [writer,refsWriter]
-- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter]
mapM_ cancel [reader]
where
@ -381,7 +384,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
-- FIXME: timeout-hardcode
void $ race (pause @'Seconds 1) $ atomically do
void $ readTQueue myFlushQ >> STM.flushTQueue myFlushQ
q <- tryPeekTQueue myFlushQ
s <- readTVar ncqStopped
if not (isJust q || s) then
STM.retry
else do
STM.flushTQueue myFlushQ
dirty <- readTVarIO ncqRefsDirty
@ -405,20 +413,28 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
makeIndexer indexQ = do
indexer <- ContT $ withAsync $ untilStopped do
debug $ "STARTED INDEXER"
what' <- race (pause @'Seconds 1) $ atomically do
peekTQueue indexQ >> STM.flushTQueue indexQ
stop <- readTVar ncqStopped
q <- tryPeekTQueue indexQ
if not (stop || isJust q) then
STM.retry
else do
STM.flushTQueue indexQ
let what = fromRight mempty what'
for_ what $ \(fd,fn) -> do
(key, added) <- ncqIndexFile ncq fn <&> over _2 HS.fromList
debug $ "FUCKING WRITE INDEX" <+> pretty fn
atomically do
r <- readTVar ncqWaitIndex <&> HPSQ.toList
let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)]
writeTVar ncqWaitIndex (HPSQ.fromList new)
(key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList
-- atomically do
-- r <- readTVar ncqWaitIndex <&> HPSQ.toList
-- let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)]
-- writeTVar ncqWaitIndex (HPSQ.fromList new)
ncqAddTrackedFilesIO ncq [key]
atomically do
@ -489,7 +505,9 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size))
when (fromIntegral size >= ncqMinLog) do
indexNow <- readTVarIO ncqIndexNow
when (fromIntegral size >= ncqMinLog || indexNow > 0) do
(n,u) <- atomically do
r <- readTVar ncqCurrentHandleR <&> fromIntegral
@ -505,6 +523,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
mv current fossilized
atomically do
writeTVar ncqIndexNow 0
r <- readTVar ncqCurrentHandleR
-- NOTE: extra-use
-- добавляем лишний 1 для индексации.
@ -831,7 +850,8 @@ ncqFixIndexes ncq@NCQStorage{..} = do
ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage
ncqStorageOpen fp = do
ncqStorageOpen fp' = do
fp <- liftIO $ makeAbsolute fp'
ncq@NCQStorage{..} <- ncqStorageInit_ False fp
ncqReadTrackedFiles ncq
ncqFixIndexes ncq
@ -926,6 +946,7 @@ ncqStorageInit_ check path = do
ncqStopped <- newTVarIO False
ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqCachedEntries <- newTVarIO 0
ncqIndexNow <- newTVarIO 0
let currentName = ncqGetCurrentName_ path ncqGen
@ -973,6 +994,9 @@ ncqStorageInit_ check path = do
pure ncq
ncqIndexRightNow :: MonadUnliftIO m => NCQStorage -> m ()
ncqIndexRightNow NCQStorage{..} = atomically $ modifyTVar ncqIndexNow succ
withNCQ :: forall m a . MonadUnliftIO m
=> (NCQStorage -> NCQStorage)
-> FilePath

View File

@ -1200,3 +1200,24 @@ executable test-cq-storage
, unix
executable tcq
import: shared-properties
import: common-deps
default-language: Haskell2010
ghc-options:
hs-source-dirs: test
main-is: TCQ.hs
build-depends:
base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq
, network
, string-conversions
, db-pipe
, suckless-conf
, network-byte-order
, text
, time
, mmap
, zstd
, unix

219
hbs2-tests/test/TCQ.hs Normal file
View File

@ -0,0 +1,219 @@
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
{-# Language MultiWayIf #-}
{-# Language RecordWildCards #-}
{-# Language ViewPatterns #-}
module Main where
import HBS2.Prelude.Plated
import HBS2.OrDie
import HBS2.Hash
import HBS2.Data.Types.Refs
import HBS2.Clock
import HBS2.Merkle
import HBS2.Storage
import HBS2.Storage.Simple
import HBS2.Storage.Operations.ByteString
import HBS2.System.Logger.Simple.ANSI
import HBS2.Storage.NCQ
import HBS2.Data.Log.Structured.NCQ
import Data.Config.Suckless.Syntax
import Data.Config.Suckless.Script
import Data.Config.Suckless.System
import DBPipe.SQLite hiding (field)
import Data.Bits
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as LBS
import Data.Text.Encoding qualified as TE
import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Builder
import Data.Maybe
import Data.Word
import Data.List qualified as List
import Data.Vector qualified as V
import Data.Vector ((!))
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Network.ByteOrder qualified as N
import Data.Coerce
import Data.HashPSQ qualified as HPSQ
import Data.HashSet qualified as HS
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
import Data.Fixed
import System.Environment
import System.Directory
import System.Posix.Fcntl
import System.Posix.IO
import System.IO.MMap
import System.IO qualified as IO
import System.Exit (exitSuccess, exitFailure)
import System.Random
import Safe
import Lens.Micro.Platform
import Control.Concurrent.STM qualified as STM
import UnliftIO
import Text.InterpolatedString.Perl6 (qc)
import Streaming.Prelude qualified as S
import System.TimeIt
import System.IO.Unsafe (unsafePerformIO)
{- HLINT ignore "Functor law" -}
setupLogger :: MonadIO m => m ()
setupLogger = do
-- setLogging @DEBUG $ toStderr . logPrefix "[debug] "
setLogging @ERROR $ toStderr . logPrefix "[error] "
setLogging @WARN $ toStderr . logPrefix "[warn] "
setLogging @NOTICE $ toStdout . logPrefix ""
flushLoggers :: MonadIO m => m ()
flushLoggers = do
silence
silence :: MonadIO m => m ()
silence = do
setLoggingOff @DEBUG
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
setLoggingOff @TRACE
data TCQError =
TCQAlreadyOpen FilePath
| TCQGone FilePath
deriving stock (Show,Typeable)
instance Exception TCQError
newtype TCQ =
TCQ FilePath
deriving newtype (Eq,Ord,Show,Typeable)
main :: IO ()
main = do
instances <- newTVarIO (mempty :: HashMap FilePath (NCQStorage, Async ()))
tvd <- newTVarIO mempty
let finalizeStorages = do
debug "finalize ncq"
r <- readTVarIO instances <&> HM.toList
mapM_ ncqStorageStop (fmap (fst.snd) r)
mapM_ wait (fmap (snd.snd) r)
let getNCQ (TCQ p) = do
readTVarIO instances
<&> HM.lookup p
<&> fmap fst
>>= orThrow (TCQGone p)
let dict = makeDict @C do
entry $ bindMatch "--help" $ nil_ \case
HelpEntryBound what -> helpEntry what
[StringLike s] -> helpList False (Just s)
_ -> helpList False Nothing
internalEntries
entry $ bindMatch "--run" $ \case
[ StringLike what ] -> liftIO do
liftIO (readFile what)
<&> parseTop
>>= either (error.show) pure
>>= runEval tvd
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "debug" $ nil_ \case
[ LitBoolVal False ] -> do
setLoggingOff @DEBUG
[ StringLike "off" ] -> do
setLoggingOff @DEBUG
_ ->
setLogging @DEBUG $ toStderr . logPrefix "[debug] "
entry $ bindMatch "ncq:open" $ \case
[ StringLike path ] -> do
debug $ "ncq:open" <+> pretty path
ncq <- ncqStorageOpen path
r <- async (ncqStorageRun ncq)
e <- atomically do
already <- readTVar instances <&> HM.member path
if already then
pure $ Left $ TCQAlreadyOpen path
else do
modifyTVar instances (HM.insert path (ncq,r))
pure $ Right ()
either throwIO pure e
mkOpaque (TCQ path)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:poke" $ \case
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
ncq <- getNCQ tcq
pure $ mkSym "okay"
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:get" $ \case
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
ncq <- getNCQ tcq
ncqStorageGet ncq hash >>= maybe (pure nil) mkOpaque
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:put" $ \syn -> do
(tcq,bs) <- case syn of
[ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do
pure (tcq, LBS.fromStrict bs)
[ isOpaqueOf @TCQ -> Just tcq, TextLike s ] -> lift do
pure (tcq, LBS.fromStrict (TE.encodeUtf8 s))
e -> throwIO $ BadFormException @C (mkList e)
lift do
ncq <- getNCQ tcq
r <- ncqStoragePut ncq bs
pure $ maybe nil (mkSym . show . pretty) r
setupLogger
argz <- liftIO getArgs
forms <- parseTop (unlines $ unwords <$> splitForms argz)
& either (error.show) pure
atomically $ writeTVar tvd dict
(runEval tvd forms >>= eatNil display)
`finally` (finalizeStorages >> flushLoggers)

View File

@ -103,6 +103,15 @@ main = do
internalEntries
entry $ bindMatch "run" $ \case
[ StringLike what ] -> do
liftIO (readFile what)
<&> parseTop
>>= either (error.show) pure
>>= evalTop
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:sqlite" $ nil_ $ \case
[StringLike fn] -> liftIO do
hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines
@ -514,6 +523,17 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:index:now" $ nil_ \case
[StringLike p] -> lift do
withNCQ id p $ \ncq -> do
display_ $ "test:ncq:index:now" <+> pretty p
ncqIndexRightNow ncq
pause @'Seconds 10
display_ $ "test:ncq:index:now" <+> pretty p <+> "done"
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:run" $ nil_ \case
[StringLike p] -> lift do
withNCQ id p $ \_ -> do