hbs2/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs

137 lines
4.4 KiB
Haskell

{-# Language RecordWildCards #-}
module HBS2.Storage.NCQ3.Internal where
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Run
import Control.Monad.Trans.Cont
import Network.ByteOrder qualified as N
import Data.HashPSQ qualified as PSQ
import Data.Vector qualified as V
import Data.HashMap.Strict qualified as HM
import Data.ByteString qualified as BS
import Data.Sequence qualified as Seq
import System.FilePath.Posix
import System.Posix.Files qualified as Posix
import System.Posix.IO as PosixBase
import System.Posix.Types as Posix
import System.Posix.Unistd
import System.Posix.IO.ByteString as Posix
import System.Posix.Files ( getFileStatus
, modificationTimeHiRes
, setFileTimesHiRes
, getFdStatus
, FileStatus(..)
, setFileMode
)
import System.Posix.Files qualified as PFS
import System.IO.MMap as MMap
ncqStorageOpen3 :: MonadIO m => FilePath -> (NCQStorage3 -> NCQStorage3) -> m NCQStorage3
ncqStorageOpen3 fp upd = do
let ncqRoot = fp
let ncqGen = 0
let ncqFsync = 16 * megabytes
let ncqWriteQLen = 1024 * 4
let ncqMinLog = 512 * megabytes
let ncqMaxLog = 2 * ncqMinLog
let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2
let ncqMaxCached = 128
let ncqIdleThrsh = 50.0
let ncqPostponeMerge = 300.0
let ncqPostponeSweep = 2 * ncqPostponeMerge
let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk"
cap <- getNumCapabilities
let shardNum = fromIntegral cap
let wopNum = 2
ncqWriteQ <- newTVarIO mempty
ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty)
ncqMMapCache <- newTVarIO PSQ.empty
ncqStateFiles <- newTVarIO mempty
ncqStateIndex <- newTVarIO mempty
ncqStateFileSeq <- newTVarIO 0
ncqStateVersion <- newTVarIO 0
ncqStateUsage <- newTVarIO mempty
ncqWrites <- newTVarIO 0
ncqWriteEMA <- newTVarIO 0.0
ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO
ncqAlive <- newTVarIO False
ncqStopReq <- newTVarIO False
ncqSyncReq <- newTVarIO False
ncqOnRunWriteIdle <- newTVarIO none
ncqSyncNo <- newTVarIO 0
let ncq = NCQStorage3{..} & upd
mkdir (ncqGetWorkDir ncq)
pure ncq
ncqWithStorage3 :: MonadUnliftIO m => FilePath -> (NCQStorage3 -> m a) -> m a
ncqWithStorage3 fp action = flip runContT pure do
sto <- lift (ncqStorageOpen3 fp id)
w <- ContT $ withAsync (ncqStorageRun3 sto) -- TODO: implement run
link w
r <- lift (action sto)
lift (ncqStorageStop3 sto)
wait w
pure r
ncqShardIdx :: NCQStorage3 -> HashRef -> Int
ncqShardIdx NCQStorage3{..} h =
fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable
{-# INLINE ncqShardIdx #-}
ncqGetShard :: NCQStorage3 -> HashRef -> Shard
ncqGetShard ncq@NCQStorage3{..} h = ncqMemTable ! ncqShardIdx ncq h
{-# INLINE ncqGetShard #-}
ncqStorageSync3 :: forall m . MonadUnliftIO m => NCQStorage3 -> m ()
ncqStorageSync3 NCQStorage3{..} = atomically $ writeTVar ncqSyncReq True
ncqOperation :: MonadIO m => NCQStorage3 -> m a -> m a -> m a
ncqOperation ncq m0 m = do
alive <- readTVarIO (ncqAlive ncq)
if alive then m else m0
ncqPutBS :: MonadUnliftIO m
=> NCQStorage3
-> Maybe NCQSectionType
-> Maybe HashRef
-> ByteString
-> m HashRef
ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe (HashRef (hashObject @HbSync bs')) mhref) do
waiter <- newEmptyTMVarIO
let work = do
let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref
let bs = ncqMakeSectionBS mtp h bs'
let shard = ncqGetShard ncq h
zero <- newTVarIO Nothing
atomically do
upd <- stateTVar shard $ flip HM.alterF h \case
Nothing -> (True, Just (NCQEntry bs zero))
Just e | ncqEntryData e /= bs -> (True, Just (NCQEntry bs zero))
| otherwise -> (False, Just e)
when upd do
modifyTVar ncqWriteQ (|> h)
putTMVar waiter h
atomically do
nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps)
modifyTVar ncqWrites succ
writeTQueue (ncqWriteOps ! nw) work
atomically $ takeTMVar waiter