diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index baa2a0f2..17fd29c7 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -54,11 +54,20 @@ common shared-properties , TypeApplications , TypeFamilies , TypeOperators + , RecordWildCards library import: shared-properties exposed-modules: + HBS2.Storage.NCQ3 + HBS2.Storage.NCQ3.Internal + HBS2.Storage.NCQ3.Internal.Types + HBS2.Storage.NCQ3.Internal.Prelude + HBS2.Storage.NCQ3.Internal.State + HBS2.Storage.NCQ3.Internal.Run + HBS2.Storage.NCQ3.Internal.Memtable + HBS2.Storage.NCQ3.Internal.Index HBS2.Storage.NCQ HBS2.Storage.NCQ2 HBS2.Storage.NCQ2.Internal diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 5bc933d4..6f007aba 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -212,3 +212,10 @@ posixToTimeSpec pt = in TimeSpec (fromIntegral s) ns +megabytes :: forall a . Integral a => a +megabytes = 1024 ^ 2 + +gigabytes :: forall a . Integral a => a +gigabytes = 1024 ^ 3 + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 57eefdb5..f43f5a33 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -85,11 +85,12 @@ ncqStorageOpen2 fp upd = do let ncqFsync = 16 * megabytes let ncqWriteQLen = 1024 * 4 let ncqMinLog = 512 * megabytes - let ncqMaxLog = 16 * gigabytes -- ??? + -- let ncqMaxLog = 16 * gigabytes -- ??? + let ncqMaxLog = 2 * ncqMinLog -- * gigabytes -- ??? let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqMaxCached = 128 let ncqIdleThrsh = 50.00 - let ncqPostponeMerge = 30.00 + let ncqPostponeMerge = 300.00 let ncqPostponeSweep = 2 * ncqPostponeMerge let ncqLuckyNum = 2 @@ -493,7 +494,8 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do spawnActivity $ postponed 20 $ forever do ema <- readTVarIO ncqWriteEMA when (ema < 50 ) do - ncqKeyNumIntersectionProbe ncq + -- ncqKeyNumIntersectionProbe ncq + ncqTombCountProbe ncq pause @'Seconds 10 diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs index aaf24a15..6df1658d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Probes.hs @@ -23,29 +23,54 @@ import Lens.Micro.Platform import System.Random.MWC qualified as MWC import UnliftIO -ncqKeyNumIntersectionProbe :: MonadUnliftIO m => NCQStorage2 -> m () -ncqKeyNumIntersectionProbe me@NCQStorage2{..} = useVersion me $ const $ void $ runMaybeT do - -- Фильтруем pending - files0 <- lift (ncqListTrackedFiles me) +randomTrackedFile :: MonadUnliftIO m => NCQStorage2 -> m (Maybe FileKey) +randomTrackedFile ncq@NCQStorage2{..} = runMaybeT do + files0 <- lift (ncqListTrackedFiles ncq) let files = V.toList $ V.filter (isNotPending . view _2) files0 + guard (not (null files)) + i <- liftIO $ MWC.uniformRM (0, length files - 1) ncqRndGen + pure (view _1 (files !! i)) - when (length files < 2) mzero +randomTrackedFilePair :: MonadUnliftIO m => NCQStorage2 -> m (Maybe (FileKey, FileKey)) +randomTrackedFilePair ncq@NCQStorage2{..} = runMaybeT do + files0 <- lift (ncqListTrackedFiles ncq) + let files = V.toList $ V.filter (isNotPending . view _2) files0 + guard (length files >= 2) - (a,b) <- liftIO $ fix \next -> do + (a, b) <- liftIO $ fix \loop -> do i <- MWC.uniformRM (0, length files - 1) ncqRndGen j <- MWC.uniformRM (0, length files - 1) ncqRndGen - if i == j then next else pure (files !! min i j, files !! max i j) + if i == j then loop else pure (min i j, max i j) - let fka = view _1 a - let fkb = view _1 b + let fa = view _1 (files !! a) + let fb = view _1 (files !! b) + pure (fa, fb) + + +ncqTombCountProbeFor :: MonadUnliftIO m => NCQStorage2 -> FileKey -> m (Maybe Int) +ncqTombCountProbeFor ncq@NCQStorage2{..} fkey = runMaybeT do + let fIndex = ncqGetFileName ncq $ toFileName (IndexFile fkey) + + (bs, nh) <- liftIO (nwayHashMMapReadOnly fIndex) >>= toMPlus + + liftIO do + ref <- newTVarIO 0 + nwayHashScanAll nh bs $ \_ k v -> do + let NCQIdxEntry _ s = decodeEntry v + when (k /= ncqEmptyKey && s < 64) $ + atomically $ modifyTVar' ref (+1) + readTVarIO ref + +ncqKeyNumIntersectionProbeFor :: MonadUnliftIO m => NCQStorage2 -> (FileKey, FileKey) -> m (Maybe Int) +ncqKeyNumIntersectionProbeFor ncq@NCQStorage2{..} (fka, fkb) = runMaybeT do let key = FactKey $ coerce $ hashObject @HbSync $ serialise $ List.sort [fka, fkb] known <- lift (readTVarIO ncqFacts <&> HM.member key) - when known mzero + guard (not known) - let fIndexA = ncqGetFileName me (toFileName (IndexFile fka)) - let fIndexB = ncqGetFileName me (toFileName (IndexFile fkb)) + let fIndexA = ncqGetFileName ncq (toFileName (IndexFile fka)) + let fIndexB = ncqGetFileName ncq (toFileName (IndexFile fkb)) idxPair' <- liftIO $ try @_ @IOException do (,) <$> nwayHashMMapReadOnly fIndexA @@ -55,15 +80,23 @@ ncqKeyNumIntersectionProbe me@NCQStorage2{..} = useVersion me $ const $ void $ r Right (Just x, Just y) -> pure (x,y) _ -> warn ("can't load index pair" <+> pretty (fka, fkb)) >> mzero - n <- liftIO $ do + liftIO do ref <- newTVarIO 0 - nwayHashScanAll n1 bs1 $ \_ k _ -> when (k /= ncqEmptyKey ) do - here <- ncqLookupIndex (coerce k) (bs2,n2) - when (isJust here) $ atomically $ modifyTVar' ref (+1) - + nwayHashScanAll n1 bs1 $ \_ k _ -> when (k /= ncqEmptyKey) do + here <- ncqLookupIndex (coerce k) (bs2,n2) + when (isJust here) $ atomically $ modifyTVar' ref (+1) readTVarIO ref - debug $ yellow "ncqKeyNumIntersectionProbe" - <+> pretty fka <+> pretty fkb <+> pretty n +ncqTombCountProbe :: MonadUnliftIO m => NCQStorage2 -> m () +ncqTombCountProbe ncq = useVersion ncq $ const $ void $ runMaybeT do + fk <- MaybeT (randomTrackedFile ncq) + count <- MaybeT (ncqTombCountProbeFor ncq fk) + debug $ yellow "ncqTombCountProbe" <+> pretty fk <+> pretty count + +ncqKeyNumIntersectionProbe :: MonadUnliftIO m => NCQStorage2 -> m () +ncqKeyNumIntersectionProbe ncq = useVersion ncq $ const $ void $ runMaybeT do + (fa, fb) <- MaybeT (randomTrackedFilePair ncq) + n <- MaybeT (ncqKeyNumIntersectionProbeFor ncq (fa, fb)) + debug $ yellow "ncqKeyNumIntersectionProbe" <+> pretty fa <+> pretty fb <+> pretty n diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs index f7ced96d..4f45cff1 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2/Internal/Types.hs @@ -148,12 +148,6 @@ data NCQStorage2 = , ncqRndGen :: Gen RealWorld } -megabytes :: forall a . Integral a => a -megabytes = 1024 ^ 2 - -gigabytes :: forall a . Integral a => a -gigabytes = 1024 ^ 3 - ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs new file mode 100644 index 00000000..0228d427 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs @@ -0,0 +1,8 @@ +module HBS2.Storage.NCQ3 + ( module Exported ) + where + +import HBS2.Storage.NCQ3.Internal.Types as Exported + + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs new file mode 100644 index 00000000..942c411d --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -0,0 +1,136 @@ +{-# Language RecordWildCards #-} +module HBS2.Storage.NCQ3.Internal where + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Run + +import Control.Monad.Trans.Cont +import Network.ByteOrder qualified as N +import Data.HashPSQ qualified as PSQ +import Data.Vector qualified as V +import Data.HashMap.Strict qualified as HM +import Data.ByteString qualified as BS +import Data.Sequence qualified as Seq +import System.FilePath.Posix +import System.Posix.Files qualified as Posix +import System.Posix.IO as PosixBase +import System.Posix.Types as Posix +import System.Posix.Unistd +import System.Posix.IO.ByteString as Posix +import System.Posix.Files ( getFileStatus + , modificationTimeHiRes + , setFileTimesHiRes + , getFdStatus + , FileStatus(..) + , setFileMode + ) +import System.Posix.Files qualified as PFS +import System.IO.MMap as MMap + +ncqStorageOpen3 :: MonadIO m => FilePath -> (NCQStorage3 -> NCQStorage3) -> m NCQStorage3 +ncqStorageOpen3 fp upd = do + let ncqRoot = fp + let ncqGen = 0 + let ncqFsync = 16 * megabytes + let ncqWriteQLen = 1024 * 4 + let ncqMinLog = 512 * megabytes + let ncqMaxLog = 2 * ncqMinLog + let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 + let ncqMaxCached = 128 + let ncqIdleThrsh = 50.0 + let ncqPostponeMerge = 300.0 + let ncqPostponeSweep = 2 * ncqPostponeMerge + let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" + + cap <- getNumCapabilities + + let shardNum = fromIntegral cap + let wopNum = 2 + + ncqWriteQ <- newTVarIO mempty + ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty) + ncqMMapCache <- newTVarIO PSQ.empty + ncqStateFiles <- newTVarIO mempty + ncqStateIndex <- newTVarIO mempty + ncqStateFileSeq <- newTVarIO 0 + ncqStateVersion <- newTVarIO 0 + ncqStateUsage <- newTVarIO mempty + ncqWrites <- newTVarIO 0 + ncqWriteEMA <- newTVarIO 0.0 + ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO + ncqAlive <- newTVarIO False + ncqStopReq <- newTVarIO False + ncqSyncReq <- newTVarIO False + ncqOnRunWriteIdle <- newTVarIO none + ncqSyncNo <- newTVarIO 0 + + let ncq = NCQStorage3{..} & upd + + mkdir (ncqGetWorkDir ncq) + pure ncq + +ncqWithStorage3 :: MonadUnliftIO m => FilePath -> (NCQStorage3 -> m a) -> m a +ncqWithStorage3 fp action = flip runContT pure do + sto <- lift (ncqStorageOpen3 fp id) + w <- ContT $ withAsync (ncqStorageRun3 sto) -- TODO: implement run + link w + r <- lift (action sto) + lift (ncqStorageStop3 sto) + wait w + pure r + + +ncqShardIdx :: NCQStorage3 -> HashRef -> Int +ncqShardIdx NCQStorage3{..} h = + fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable +{-# INLINE ncqShardIdx #-} + +ncqGetShard :: NCQStorage3 -> HashRef -> Shard +ncqGetShard ncq@NCQStorage3{..} h = ncqMemTable ! ncqShardIdx ncq h +{-# INLINE ncqGetShard #-} + +ncqStorageSync3 :: forall m . MonadUnliftIO m => NCQStorage3 -> m () +ncqStorageSync3 NCQStorage3{..} = atomically $ writeTVar ncqSyncReq True + +ncqOperation :: MonadIO m => NCQStorage3 -> m a -> m a -> m a +ncqOperation ncq m0 m = do + alive <- readTVarIO (ncqAlive ncq) + if alive then m else m0 + + +ncqPutBS :: MonadUnliftIO m + => NCQStorage3 + -> Maybe NCQSectionType + -> Maybe HashRef + -> ByteString + -> m HashRef +ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe (HashRef (hashObject @HbSync bs')) mhref) do + waiter <- newEmptyTMVarIO + + let work = do + let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref + let bs = ncqMakeSectionBS mtp h bs' + let shard = ncqGetShard ncq h + zero <- newTVarIO Nothing + + atomically do + upd <- stateTVar shard $ flip HM.alterF h \case + Nothing -> (True, Just (NCQEntry bs zero)) + Just e | ncqEntryData e /= bs -> (True, Just (NCQEntry bs zero)) + | otherwise -> (False, Just e) + + when upd do + modifyTVar ncqWriteQ (|> h) + + putTMVar waiter h + + atomically do + nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) + modifyTVar ncqWrites succ + writeTQueue (ncqWriteOps ! nw) work + + atomically $ takeTMVar waiter + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs new file mode 100644 index 00000000..91ef6226 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -0,0 +1,69 @@ +module HBS2.Storage.NCQ3.Internal.Index where + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.State + +import Streaming.Prelude qualified as S +import Network.ByteOrder qualified as N +import Control.Monad.Trans.Cont +import Data.ByteString qualified as BS +import System.IO.MMap + +ncqIndexFile :: MonadUnliftIO m => NCQStorage3 -> DataFile FileKey -> m FilePath +ncqIndexFile n@NCQStorage3{} fk = do + + let fp = toFileName fk & ncqGetFileName n + let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n + + debug $ "INDEX" <+> pretty fp <+> pretty dest + + items <- S.toList_ do + ncqStorageScanDataFile n fp $ \o w k s -> case ncqIsMeta s of + Just M -> none + _ -> do + -- we need size in order to return block size faster + -- w/o search in fossil + let rs = (w + ncqSLen) & fromIntegral @_ @Word32 & N.bytestring32 + let os = fromIntegral @_ @Word64 o & N.bytestring64 + let record = os <> rs + S.yield (coerce k, record) + + let (dir,name) = splitFileName fp + let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" + + result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items + + mv result dest + + -- ncqStateUpdate n [F 0 (coerce fk)] + + pure dest + + +ncqStorageScanDataFile :: MonadIO m + => NCQStorage3 + -> FilePath + -> ( Integer -> Integer -> HashRef -> ByteString -> m () ) + -> m () +ncqStorageScanDataFile ncq fp' action = do + let fp = ncqGetFileName ncq fp' + mmaped <- liftIO (mmapFileByteString fp Nothing) + + flip runContT pure $ callCC \exit -> do + flip fix (0,mmaped) $ \next (o,bs) -> do + + when (BS.length bs < ncqSLen) $ exit () + + let w = BS.take ncqSLen bs & N.word32 & fromIntegral + + when (BS.length bs < ncqSLen + w) $ exit () + + let kv = BS.drop ncqSLen bs + + let k = BS.take ncqKeyLen kv & coerce @_ @HashRef + let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv + + lift (action o (fromIntegral w) k v) + + next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs new file mode 100644 index 00000000..45193b6e --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs @@ -0,0 +1,29 @@ +module HBS2.Storage.NCQ3.Internal.Memtable where + +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.Prelude + +import Data.ByteString qualified as BS +import Data.HashMap.Strict qualified as HM +import Data.Vector qualified as V + +ncqShardIdx :: NCQStorage3 -> HashRef -> Int +ncqShardIdx NCQStorage3{..} h = + fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable +{-# INLINE ncqShardIdx #-} + +ncqGetShard :: NCQStorage3 -> HashRef -> Shard +ncqGetShard ncq@NCQStorage3{..} h = ncqMemTable ! ncqShardIdx ncq h +{-# INLINE ncqGetShard #-} + + +ncqLookupEntrySTM :: NCQStorage3 -> HashRef -> STM (Maybe NCQEntry) +ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h + +ncqAlterEntrySTM :: NCQStorage3 + -> HashRef + -> (Maybe NCQEntry -> Maybe NCQEntry) + -> STM () +ncqAlterEntrySTM ncq h alterFn = do + let shard = ncqGetShard ncq h + modifyTVar shard (HM.alter alterFn h) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs new file mode 100644 index 00000000..cdb05e70 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs @@ -0,0 +1,53 @@ +module HBS2.Storage.NCQ3.Internal.Prelude + ( module Exported + , NCQSectionType(..) + , megabytes + , gigabytes + , ncqMakeSectionBS + , ncqSLen + , ncqKeyLen + , ncqPrefixLen + , ncqRefPrefix + , ncqBlockPrefix + , ncqMetaPrefix + , ncqIsMeta + , ncqFullDataLen + , NCQFullRecordLen(..) + , ToFileName(..) + , IndexFile(..) + , DataFile(..) + , ByteString + , Vector, (!) + , Seq(..), (|>),(<|) + , HashSet + , HashMap + , HashPSQ + , IntMap + ) where + +import HBS2.Prelude as Exported + +import HBS2.Data.Log.Structured.NCQ as Exported +import HBS2.Data.Types.Refs as Exported +import HBS2.Hash as Exported +import HBS2.Misc.PrettyStuff as Exported +import HBS2.Storage.NCQ.Types +import HBS2.System.Dir as Exported +import HBS2.System.Logger.Simple.ANSI as Exported + +import Data.ByteString (ByteString) + +import Data.Maybe as Exported +import Data.Coerce as Exported +import Data.Word as Exported +import Data.Vector (Vector,(!)) +import Data.Sequence (Seq(..),(|>),(<|)) +import Data.HashSet (HashSet) +import Data.HashMap.Strict (HashMap) +import Data.HashPSQ (HashPSQ) +import Data.IntMap (IntMap) + + +import UnliftIO as Exported +import UnliftIO.Concurrent as Exported + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs new file mode 100644 index 00000000..e1e542e9 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -0,0 +1,228 @@ +{-# Language MultiWayIf #-} +module HBS2.Storage.NCQ3.Internal.Run where + +import HBS2.Storage.NCQ.Types hiding (FileKey) +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Memtable + + +import Control.Monad.Trans.Cont +import Network.ByteOrder qualified as N +import Data.HashSet qualified as HS +import Data.HashPSQ qualified as PSQ +import Data.Vector qualified as V +import Data.HashMap.Strict qualified as HM +import Data.ByteString qualified as BS +import Data.Sequence qualified as Seq +import System.FilePath.Posix +import System.Posix.Files qualified as Posix +import System.Posix.IO as PosixBase +import System.Posix.Types as Posix +import System.Posix.Unistd +import System.Posix.IO.ByteString as Posix +import System.Posix.Files ( getFileStatus + , modificationTimeHiRes + , setFileTimesHiRes + , getFdStatus + , FileStatus(..) + , setFileMode + ) +import System.Posix.Files qualified as PFS +import System.IO.MMap as MMap +import Control.Concurrent.STM qualified as STM + +ncqStorageStop3 :: forall m . MonadUnliftIO m => NCQStorage3 -> m () +ncqStorageStop3 NCQStorage3{..} = atomically $ writeTVar ncqStopReq True + +ncqStorageRun3 :: forall m . MonadUnliftIO m + => NCQStorage3 + -> m () +ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do + ContT $ bracket setAlive (const unsetAlive) + + closeQ <- liftIO newTQueueIO + + closer <- spawnActivity $ liftIO $ fix \loop -> do + what <- atomically do + tryReadTQueue closeQ >>= \case + Just e -> pure $ Just e + Nothing -> do + stop <- readTVar ncqStopReq + if not stop then STM.retry else pure Nothing + + maybe1 what none $ \(fk :: FileKey, fh) -> do + debug $ red "CLOSE FILE" <+> pretty fk + closeFd fh + loop + + let shLast = V.length ncqWriteOps - 1 + spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do + let q = ncqWriteOps ! i + forever (liftIO $ join $ atomically (readTQueue q)) + + spawnActivity measureWPS + + flip fix RunNew $ \loop -> \case + RunFin -> do + debug "exit storage" + atomically $ pollSTM closer >>= maybe STM.retry (const none) + + RunNew -> do + alive <- readTVarIO ncqAlive + empty <- readTVarIO ncqWriteQ <&> Seq.null + if not alive && empty + then loop RunFin + else do + (fk, fhx) <- openNewDataFile + loop $ RunWrite (fk, fhx, 0, 0) + + + RunSync (fk, fh, w, total, continue) -> do + + stop <- readTVarIO ncqStopReq + sync <- readTVarIO ncqSyncReq + + let needClose = total >= ncqMinLog || stop + + rest <- if not (sync || needClose || w > ncqFsync) then + pure w + else do + appendTailSection fh >> liftIO (fileSynchronise fh) + atomically do + writeTVar ncqSyncReq False + modifyTVar ncqSyncNo succ + + pure 0 + + if | needClose && continue -> do + atomically $ writeTQueue closeQ (fk, fh) + loop RunNew + + | not continue -> loop RunFin + + | otherwise -> loop $ RunWrite (fk, fh, rest, total) + + + RunWrite (fk, fh, w, total') -> do + + let timeoutMicro = 10_000_000 + + chunk <- liftIO $ timeout timeoutMicro $ atomically do + stop <- readTVar ncqStopReq + sy <- readTVar ncqSyncReq + chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) + + if | Seq.null chunk && stop -> pure $ Left () + | Seq.null chunk && not (stop || sy) -> STM.retry + | otherwise -> pure $ Right chunk + + case chunk of + Nothing -> do + liftIO $ join $ readTVarIO ncqOnRunWriteIdle + if w == 0 then do + loop $ RunWrite (fk,fh,w,total') + else do + atomically $ writeTVar ncqSyncReq True + loop $ RunSync (fk, fh, w, total', True) -- exit () + + Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () + + Just (Right chu) -> do + ws <- for chu $ \h -> do + atomically (ncqLookupEntrySTM ncq h) >>= \case + Just (NCQEntry bs w) -> do + atomically (writeTVar w (Just fk)) + lift (appendSection fh bs) + + _ -> pure 0 + + let written = sum ws + loop $ RunSync (fk, fh, w + written, total' + written, True) + + + pure () + + where + setAlive = atomically $ writeTVar ncqAlive True + unsetAlive = atomically $ writeTVar ncqAlive False + + openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) + openNewDataFile = do + fname <- toFileName . DataFile <$> ncqGetNewFileKey ncq + touch fname + let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } + (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) + + spawnActivity m = do + a <- ContT $ withAsync m + link a + pure a + + + measureWPS = void $ flip fix Nothing \loop -> \case + Nothing -> do + w <- readTVarIO ncqWrites + t <- getTimeCoarse + pause @'Seconds step >> loop (Just (w,t)) + + Just (w0,t0) -> do + w1 <- readTVarIO ncqWrites + t1 <- getTimeCoarse + let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9 + dw = fromIntegral (w1 - w0) + atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema + pause @'Seconds step >> loop (Just (w1,t1)) + + where + alpha = 0.1 + step = 1.00 + +data RunSt = + RunNew + | RunWrite (FileKey, Fd, Int, Int) + | RunSync (FileKey, Fd, Int, Int, Bool) + | RunFin + + +zeroSyncEntry :: ByteString +zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload + where zeroPayload = N.bytestring64 0 + zeroHash = HashRef (hashObject zeroPayload) +{-# INLINE zeroSyncEntry #-} + +zeroSyncEntrySize :: Word64 +zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) +{-# INLINE zeroSyncEntrySize #-} + +-- 1. It's M-record +-- 2. It's last w64be == fileSize +-- 3. It's hash == hash (bytestring64be fileSize) +-- 4. recovery-strategy: start-to-end, end-to-start +fileTailRecord :: Integral a => a -> ByteString +fileTailRecord w = do + -- on open: last w64be == fileSize + let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) + let h = hashObject @HbSync paylo & coerce + ncqMakeSectionBS (Just M) h paylo +{-# INLINE fileTailRecord #-} + +appendSection :: forall m . MonadUnliftIO m + => Fd + -> ByteString + -> m Int -- (FOff, Int) + +appendSection fh sect = do + -- off <- liftIO $ fdSeek fh SeekFromEnd 0 + -- pure (fromIntegral off, fromIntegral len) + liftIO (Posix.fdWrite fh sect) <&> fromIntegral +{-# INLINE appendSection #-} + +appendTailSection :: MonadIO m => Fd -> m () +appendTailSection fh = liftIO do + s <- Posix.fileSize <$> Posix.getFdStatus fh + void (appendSection fh (fileTailRecord s)) +{-# INLINE appendTailSection #-} + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs new file mode 100644 index 00000000..c9cae517 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -0,0 +1,26 @@ +module HBS2.Storage.NCQ3.Internal.State where + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types + +import Data.ByteString.Char8 qualified as BS8 +import Text.Printf + +ncqGetFileName :: NCQStorage3 -> FilePath -> FilePath +ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp + +ncqGetWorkDir :: NCQStorage3 -> FilePath +ncqGetWorkDir NCQStorage3{..} = ncqRoot show ncqGen + +ncqGetLockFileName :: NCQStorage3 -> FilePath +ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" + +ncqGetNewFileKey :: forall m . MonadIO m + => NCQStorage3 + -> m FileKey +ncqGetNewFileKey me@NCQStorage3{..} = fix \next -> do + n <- atomically $ stateTVar ncqStateFileSeq (\x -> (x, succ x)) + let fname = ncqMakeFossilName n + here <- doesFileExist (ncqGetFileName me fname) + if here then next else pure n + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs new file mode 100644 index 00000000..f4774461 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -0,0 +1,71 @@ +module HBS2.Storage.NCQ3.Internal.Types where + +import HBS2.Storage.NCQ3.Internal.Prelude + +import Text.Printf + +data CachedMMap = + CachedData ByteString + | CachedIndex ByteString NWayHash + + +type CachePrio = Word64 + +type Shard = TVar (HashMap HashRef NCQEntry) + +type StateVersion = Word64 + +newtype FileKey = FileKey Word32 + deriving newtype (Eq,Ord,Show,Num,Enum,Pretty,Hashable) + +instance IsString FileKey where + fromString = FileKey . read + +instance ToFileName (DataFile FileKey) where + toFileName (DataFile fk) = ncqMakeFossilName fk + +instance ToFileName (IndexFile FileKey) where + toFileName (IndexFile fk) = printf "i-%08x.cq" (coerce @_ @Word32 fk) + +data NCQEntry = + NCQEntry + { ncqEntryData :: !ByteString + , ncqDumped :: !(TVar (Maybe FileKey)) + } + +data NCQStorage3 = + NCQStorage3 + { ncqRoot :: FilePath + , ncqGen :: Int + , ncqSalt :: HashRef + , ncqPostponeMerge :: Timeout 'Seconds + , ncqPostponeSweep :: Timeout 'Seconds + , ncqFsync :: Int + , ncqWriteQLen :: Int + , ncqWriteBlock :: Int + , ncqMinLog :: Int + , ncqMaxLog :: Int + , ncqMaxCached :: Int + , ncqIdleThrsh :: Double + , ncqMMapCache :: TVar (HashPSQ FileKey CachePrio CachedMMap) + , ncqStateFiles :: TVar (HashSet FileKey) + , ncqStateIndex :: TVar (HashSet FileKey) + , ncqStateFileSeq :: TVar FileKey + , ncqStateVersion :: TVar StateVersion + , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) + , ncqMemTable :: Vector Shard + , ncqWrites :: TVar Int + , ncqWriteEMA :: TVar Double -- for writes-per-seconds + , ncqWriteQ :: TVar (Seq HashRef) + , ncqWriteOps :: Vector (TQueue (IO ())) + , ncqAlive :: TVar Bool + , ncqStopReq :: TVar Bool + , ncqSyncReq :: TVar Bool + , ncqOnRunWriteIdle :: TVar (IO ()) + , ncqSyncNo :: TVar Int + } + + +ncqMakeFossilName :: FileKey -> FilePath +ncqMakeFossilName = printf "f-%08x.data" . coerce @_ @Word32 + diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 906f73ed..6faa6860 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1223,5 +1223,3 @@ executable test-ncq , unix , mwc-random - - diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index a116f46f..735539dc 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -1788,7 +1788,7 @@ main = do g <- liftIO MWC.createSystemRandom let dir = testEnvDir let n = 100000 - let p = 0.45 + let p = 0.25 sizes <- replicateM n (uniformRM (4096, 256*1024) g) @@ -1798,7 +1798,7 @@ main = do notice $ "write" <+> pretty (List.length sizes) <+> pretty "random blocks" ContT $ withAsync $ forever do - pause @'Seconds 0.10 + pause @'Seconds 0.01 p1 <- uniformRM (0,1) g when (p1 <= p) do hss <- readTVarIO hashes