From 617ce7d4db6a6b6ad1a66eefdc5926622d76beda Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 22 Jun 2025 09:29:05 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/hbs2-storage-ncq.cabal | 4 +- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 221 ++++++++++++++++++++++ hbs2-tests/test/TestNCQ.hs | 66 +++++++ 3 files changed, 290 insertions(+), 1 deletion(-) create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 163d16b4..78dff6be 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -57,7 +57,9 @@ common shared-properties library import: shared-properties - exposed-modules: HBS2.Storage.NCQ + exposed-modules: + HBS2.Storage.NCQ + HBS2.Storage.NCQ2 -- other-modules: -- other-extensions: build-depends: base, hbs2-core, hbs2-log-structured, suckless-conf diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs new file mode 100644 index 00000000..94b7461d --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -0,0 +1,221 @@ +{-# Language MultiWayIf #-} +{-# Language RecordWildCards #-} +module HBS2.Storage.NCQ2 where + +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.OrDie +import HBS2.Data.Types.Refs +import HBS2.Base58 +import HBS2.Net.Auth.Credentials +import HBS2.Storage +import HBS2.Misc.PrettyStuff +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Data.Log.Structured.NCQ +import HBS2.Data.Log.Structured.SD + +import Data.Config.Suckless.System +import Data.Config.Suckless.Script hiding (void) + +import Codec.Compression.Zstd qualified as Zstd +import Codec.Compression.Zstd.Lazy as ZstdL +import Codec.Compression.Zstd.Streaming qualified as ZstdS +import Codec.Compression.Zstd.Streaming (Result(..)) + +import Control.Applicative +import Data.ByteString.Builder +import Network.ByteOrder qualified as N +import Data.HashMap.Strict (HashMap) +import Control.Monad.Except +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Data.Ord (Down(..),comparing) +import Control.Concurrent.STM qualified as STM +import Data.HashPSQ qualified as HPSQ +import Data.HashPSQ (HashPSQ) +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.IntSet qualified as IntSet +import Data.IntSet (IntSet) +import Data.Sequence qualified as Seq +import Data.Sequence (Seq(..), (|>)) +import Data.List qualified as List +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Lazy.Char8 qualified as LBS8 +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BS8 +import Data.Char (isDigit) +import Data.Fixed +import Data.Coerce +import Data.Word +import Data.Either +import Data.Maybe +import Data.Text qualified as Text +import Data.Text.IO qualified as Text +import Data.Int +import Data.Vector qualified as V +import Data.Vector (Vector, (!)) +import Lens.Micro.Platform +import Data.HashSet (HashSet) +import Data.HashSet qualified as HS +import Data.HashMap.Strict qualified as HM +import System.Directory (makeAbsolute) +import System.FilePath.Posix +import System.Posix.Fcntl +import System.Posix.Files qualified as Posix +import System.Posix.IO as PosixBase +import System.Posix.Types as Posix +import System.Posix.IO.ByteString as Posix +import System.Posix.Unistd +import System.Posix.Files ( getFileStatus + , modificationTimeHiRes + , setFileTimesHiRes + , getFdStatus + , FileStatus(..) + , setFileMode + ) +import System.Posix.Files qualified as PFS +import System.IO.Error (catchIOError) +import System.IO.MMap as MMap +import System.IO.Temp (emptyTempFile) +import System.Mem +-- import Foreign.Ptr +-- import Foreign di +import qualified Data.ByteString.Internal as BSI +import Streaming.Prelude qualified as S + +import UnliftIO +import UnliftIO.Concurrent(getNumCapabilities) +import UnliftIO.IO.File + +import System.FileLock as FL + + +data NCQEntry = + NCQEntryNew ByteString + | NCQEntryJustWritten Handle Word64 + +type Shard = TVar (HashMap HashRef (TVar NCQEntry)) + +data NCQStorage2 = + NCQStorage2 + { ncqFsync :: Int + , ncqMemTable :: Vector Shard + , ncqWriteQ :: TBQueue HashRef + , ncqStorageStopReq :: TVar Bool + } deriving (Generic) + +ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 +ncqStorageOpen2 fp upd = do + let ncqFsync = 16 * 1024^2 + cap <- getNumCapabilities <&> fromIntegral + ncqWriteQ <- newTBQueueIO 32768 + ncqMemTable <- V.fromList <$> replicateM cap (newTVarIO mempty) + ncqStorageStopReq <- newTVarIO False + pure $ NCQStorage2{..} & upd + +ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () +ncqStorageStop2 NCQStorage2{..} = do + atomically $ writeTVar ncqStorageStopReq True + +ncqShardIdx :: NCQStorage2 -> HashRef -> Int +ncqShardIdx NCQStorage2{..} h = + fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable + +ncqGetShard :: NCQStorage2 -> HashRef -> Shard +ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h + +ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe (NCQEntry, TVar NCQEntry)) +ncqLookupEntrySTM ncq h = do + readTVar (ncqGetShard ncq h) + <&> HM.lookup h + >>= \case + Nothing -> pure Nothing + Just tv -> do + v <- readTVar tv + pure $ Just (v, tv) + +ncqPutBS :: MonadUnliftIO m => NCQStorage2 -> ByteString -> m HashRef +ncqPutBS ncq@NCQStorage2{..} bs = do + let h = HashRef (hashObject @HbSync bs) + atomically do + ncqAlterEntrySTM ncq h $ \case + Just e -> Just e + Nothing -> do + Just (NCQEntryNew bs) + writeTBQueue ncqWriteQ h + + pure h + +ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry) +ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) <&> fmap fst + +ncqAlterEntrySTM :: NCQStorage2 -> HashRef -> (Maybe NCQEntry -> Maybe NCQEntry) -> STM () +ncqAlterEntrySTM ncq h alterFn = do + let shard = ncqGetShard ncq h + readTVar shard <&> HM.lookup h >>= \case + Just tve -> do + e <- readTVar tve + case alterFn (Just e) of + Nothing -> modifyTVar' shard (HM.delete h) + Just e' -> writeTVar tve e' + + Nothing -> case alterFn Nothing of + Nothing -> modifyTVar' shard (HM.delete h) + Just e -> do + tve <- newTVar e + modifyTVar' shard (HM.insert h tve) + +ncqStorageRun2 :: forall m . MonadUnliftIO m => NCQStorage2 -> m () +ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do + + fname <- liftIO $ emptyTempFile "." "datafile-.data" + + let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } + fh0 <- liftIO (PosixBase.openFd fname Posix.ReadWrite flags) + + ContT $ bracket none $ const do + liftIO $ closeFd fh0 + + flip fix (fh0,0) \loop (fh,w) -> do + + what <- atomically do + h <- tryReadTBQueue ncqWriteQ + stop <- readTVar ncqStorageStopReq + + case (stop,h) of + (False, Nothing) -> STM.retry + (True, Nothing) -> pure $ Left () + (_, Just h) -> ncqLookupEntrySTM ncq h >>= \case + Nothing -> pure $ Right Nothing + Just (r,t) -> pure $ Right (Just (h,r,t)) + + case what of + Left _ -> exit + Right Nothing -> loop (fh,w) + Right (Just (h,r,t)) -> do + n <- lift (appendEntry fh h r) + + w' <- if (w + n) < ncqFsync then do + pure (w + n) + else do + liftIO $ fileSynchronise fh + pure 0 + + loop (fh, w') + + where + exit = none + + appendEntry :: Fd -> HashRef -> NCQEntry -> m Int + + appendEntry fh h (NCQEntryNew bs) = do + let ss = N.bytestring32 (32 + fromIntegral (BS.length bs)) + let section = ss <> coerce h <> bs + liftIO (Posix.fdWrite fh section) <&> fromIntegral + + appendEntry fh h _ = do + pure 0 + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 31d193c3..1a8d162c 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -21,6 +21,7 @@ import HBS2.Storage.Operations.ByteString import HBS2.System.Logger.Simple.ANSI import HBS2.Storage.NCQ +import HBS2.Storage.NCQ2 import HBS2.Data.Log.Structured.NCQ import HBS2.CLI.Run.Internal.Merkle @@ -578,6 +579,64 @@ testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do rm ncqDir + +testNCQ2Concurrent1 :: MonadUnliftIO m + => Bool + -> Int + -> Int + -> TestEnv + -> m () + +testNCQ2Concurrent1 noRead tn n TestEnv{..} = flip runContT pure do + + let tmp = testEnvDir + let inputDir = tmp "input" + let ncqDir = tmp "ncq-test-data" + + debug "preparing" + + mkdir inputDir + + debug $ pretty inputDir + + filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do + size <- randomRIO (64*1024, 256*1024) + w <- liftIO (randomIO :: IO Word8) + let tbs = BS.replicate size w -- replicateM size w <&> BS.pack + let ha = hashObject @HbSync tbs -- & show . pretty + let fn = inputDir show (pretty ha) + liftIO $ BS.writeFile fn tbs + pure (fn, ha, BS.length tbs) + + debug "done" + + let fnv = V.fromList filez + let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac + + notice "NO SHIT" + + setLoggingOff @DEBUG + + for_ [1 .. tn] $ \tnn -> do + + ncq1 <- ncqStorageOpen2 ncqDir (\x -> x { ncqFsync = 64^(1024^2) } ) + w <- ContT $ withAsync (ncqStorageRun2 ncq1) + + (t,_) <- timeItT $ liftIO do + + pooledForConcurrentlyN_ tnn fnv $ \(n,ha,_) -> do + co <- BS.readFile n + ncqPutBS ncq1 co + + let tt = realToFrac @_ @(Fixed E2) t + let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2) + notice $ pretty tnn <+> pretty tt <+> pretty speed + + rm ncqDir + + lift $ ncqStorageStop2 ncq1 + wait w + main :: IO () main = do @@ -680,6 +739,13 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq2:concurrent1" $ nil_ $ \case + [ LitIntVal tn, LitIntVal n ] -> do + debug $ "ncq2:concurrent1" <+> pretty tn <+> pretty n + runTest $ testNCQ2Concurrent1 False ( fromIntegral tn) (fromIntegral n) + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:concurrent1:wo" $ nil_ $ \case [ LitIntVal tn, LitIntVal n ] -> do debug $ "ncq:concurrent1" <+> pretty tn <+> pretty n