diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 17fd29c7..6ff1096b 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -68,6 +68,7 @@ library HBS2.Storage.NCQ3.Internal.Run HBS2.Storage.NCQ3.Internal.Memtable HBS2.Storage.NCQ3.Internal.Index + HBS2.Storage.NCQ3.Internal.MMapCache HBS2.Storage.NCQ HBS2.Storage.NCQ2 HBS2.Storage.NCQ2.Internal diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs index 089b8b15..afa8292a 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs @@ -14,5 +14,6 @@ import HBS2.Storage.NCQ3.Internal.Prelude as Exported import HBS2.Storage.NCQ3.Internal import HBS2.Storage.NCQ3.Internal.Run import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Memtable diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 942c411d..d457dd42 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -5,6 +5,7 @@ import HBS2.Storage.NCQ3.Internal.Prelude import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Run +import HBS2.Storage.NCQ3.Internal.Memtable import Control.Monad.Trans.Cont import Network.ByteOrder qualified as N @@ -38,7 +39,7 @@ ncqStorageOpen3 fp upd = do let ncqMinLog = 512 * megabytes let ncqMaxLog = 2 * ncqMinLog let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 - let ncqMaxCached = 128 + let ncqMaxCachedIndex = 16 let ncqIdleThrsh = 50.0 let ncqPostponeMerge = 300.0 let ncqPostponeSweep = 2 * ncqPostponeMerge @@ -49,22 +50,23 @@ ncqStorageOpen3 fp upd = do let shardNum = fromIntegral cap let wopNum = 2 - ncqWriteQ <- newTVarIO mempty - ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty) - ncqMMapCache <- newTVarIO PSQ.empty - ncqStateFiles <- newTVarIO mempty - ncqStateIndex <- newTVarIO mempty - ncqStateFileSeq <- newTVarIO 0 - ncqStateVersion <- newTVarIO 0 - ncqStateUsage <- newTVarIO mempty - ncqWrites <- newTVarIO 0 - ncqWriteEMA <- newTVarIO 0.0 - ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO - ncqAlive <- newTVarIO False - ncqStopReq <- newTVarIO False - ncqSyncReq <- newTVarIO False + ncqWriteQ <- newTVarIO mempty + ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty) + ncqMMapCachedIdx <- newTVarIO PSQ.empty + ncqStateFiles <- newTVarIO mempty + ncqStateIndex <- newTVarIO mempty + ncqStateFileSeq <- newTVarIO 0 + ncqStateVersion <- newTVarIO 0 + ncqStateUsage <- newTVarIO mempty + ncqWrites <- newTVarIO 0 + ncqWriteEMA <- newTVarIO 0.0 + ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO + ncqReadReq <- newTQueueIO + ncqAlive <- newTVarIO False + ncqStopReq <- newTVarIO False + ncqSyncReq <- newTVarIO False ncqOnRunWriteIdle <- newTVarIO none - ncqSyncNo <- newTVarIO 0 + ncqSyncNo <- newTVarIO 0 let ncq = NCQStorage3{..} & upd @@ -81,25 +83,7 @@ ncqWithStorage3 fp action = flip runContT pure do wait w pure r - -ncqShardIdx :: NCQStorage3 -> HashRef -> Int -ncqShardIdx NCQStorage3{..} h = - fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable -{-# INLINE ncqShardIdx #-} - -ncqGetShard :: NCQStorage3 -> HashRef -> Shard -ncqGetShard ncq@NCQStorage3{..} h = ncqMemTable ! ncqShardIdx ncq h -{-# INLINE ncqGetShard #-} - -ncqStorageSync3 :: forall m . MonadUnliftIO m => NCQStorage3 -> m () -ncqStorageSync3 NCQStorage3{..} = atomically $ writeTVar ncqSyncReq True - -ncqOperation :: MonadIO m => NCQStorage3 -> m a -> m a -> m a -ncqOperation ncq m0 m = do - alive <- readTVarIO (ncqAlive ncq) - if alive then m else m0 - - +-- FIXME: maybe-on-storage-closed ncqPutBS :: MonadUnliftIO m => NCQStorage3 -> Maybe NCQSectionType @@ -133,4 +117,13 @@ ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe atomically $ takeTMVar waiter +ncqLocate :: MonadUnliftIO m => NCQStorage3 -> HashRef -> m (Maybe Location) +ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do + answ <- newEmptyTMVarIO + + atomically do + modifyTVar ncqWrites succ + writeTQueue ncqReadReq (href, answ) + + atomically $ takeTMVar answ diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index 33d1bffd..0f778e65 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -3,16 +3,44 @@ module HBS2.Storage.NCQ3.Internal.Index where import HBS2.Storage.NCQ3.Internal.Prelude import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Memtable import System.Posix.Files qualified as PFS import Streaming.Prelude qualified as S import Network.ByteOrder qualified as N import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe import Data.ByteString qualified as BS import System.IO.MMap -ncqIndexFile :: MonadUnliftIO m => NCQStorage3 -> DataFile FileKey -> m FilePath -ncqIndexFile n@NCQStorage3{} fk = do + +data IndexEntry = IndexEntry {-# UNPACK #-} !FileKey !Word64 !Word32 + +unpackIndexEntry :: ByteString -> IndexEntry +unpackIndexEntry entryBs = do + let (fks,rest1) = BS.splitAt 4 entryBs + let (offs,rest2) = BS.splitAt 8 rest1 + let ss = BS.take 4 rest2 + let fk = FileKey (N.word32 fks) + let off = N.word64 offs + let size = N.word32 ss + IndexEntry fk off size +{-# INLINE unpackIndexEntry #-} + +emptyKey :: ByteString +emptyKey = BS.replicate 32 0 + +ncqLookupIndex :: MonadUnliftIO m + => HashRef + -> (ByteString, NWayHash) + -> m (Maybe IndexEntry ) +ncqLookupIndex hx (mmaped, nway) = do + fmap unpackIndexEntry <$> nwayHashLookup nway mmaped (coerce hx) +{-# INLINE ncqLookupIndex #-} + + +ncqIndexFile :: MonadUnliftIO m => NCQStorage3 -> DataFile FileKey -> m (Maybe FilePath) +ncqIndexFile n@NCQStorage3{..} fk = runMaybeT do let fp = toFileName fk & ncqGetFileName n fki <- ncqGetNewFileKey n IndexFile @@ -36,17 +64,31 @@ ncqIndexFile n@NCQStorage3{} fk = do let (dir,name) = splitFileName fp let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" - result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items + result <- lift $ nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items mv result dest stat <- liftIO $ PFS.getFileStatus dest let ts = PFS.modificationTimeHiRes stat + midx <- liftIO (nwayHashMMapReadOnly dest) + + unless (isJust midx) do + err $ "can't mmap index" <+> pretty dest + ncqStateUpdate n do ncqStateAddIndexFile ts fki ncqStateAddDataFile (coerce fk) + (bs,nw) <- toMPlus midx + + nwayHashScanAll nw bs $ \_ k _ -> do + unless (k == emptyKey) $ atomically $ void $ runMaybeT do + NCQEntry _ tfk <- MaybeT $ ncqLookupEntrySTM n (coerce k) + fk' <- MaybeT $ readTVar tfk + guard (coerce fk == fk') -- remove only own stuff + lift $ ncqAlterEntrySTM n (coerce k) (const Nothing) + pure dest diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs new file mode 100644 index 00000000..f967168f --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs @@ -0,0 +1,46 @@ +module HBS2.Storage.NCQ3.Internal.MMapCache where + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.State + +import Data.HashPSQ as HPSQ + + +ncqGetCachedIndex :: forall m . MonadUnliftIO m + => NCQStorage3 + -> FileKey + -> m CachedIndex +ncqGetCachedIndex ncq@NCQStorage3{..} fk = do + now <- getTimeCoarse + + atomically (HPSQ.lookup fk <$> readTVar ncqMMapCachedIdx) >>= \case + Just (_, idx) -> do + atomically $ modifyTVar' ncqMMapCachedIdx (HPSQ.insert fk now idx) + pure idx + + Nothing -> do + let path = ncqGetFileName ncq (toFileName (IndexFile fk)) + nwayHashMMapReadOnly path >>= \case + Nothing -> throwIO $ NCQStorageCantMapFile path + Just (bs, nway) -> do + let new = CachedIndex bs nway + atomically do + cache <- readTVar ncqMMapCachedIdx + let cache' = + if HPSQ.size cache >= ncqMaxCachedIndex + then HPSQ.deleteMin cache + else cache + writeTVar ncqMMapCachedIdx (HPSQ.insert fk now new cache') + pure new + + +ncqDelCachedIndex :: forall m . MonadUnliftIO m + => NCQStorage3 + -> FileKey + -> m () + +ncqDelCachedIndex NCQStorage3{..} fk = + atomically (modifyTVar ncqMMapCachedIdx$ HPSQ.delete fk) + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs index 45193b6e..19fae103 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs @@ -27,3 +27,14 @@ ncqAlterEntrySTM :: NCQStorage3 ncqAlterEntrySTM ncq h alterFn = do let shard = ncqGetShard ncq h modifyTVar shard (HM.alter alterFn h) + +ncqStorageSync3 :: forall m . MonadUnliftIO m => NCQStorage3 -> m () +ncqStorageSync3 NCQStorage3{..} = atomically $ writeTVar ncqSyncReq True + +ncqOperation :: MonadIO m => NCQStorage3 -> m a -> m a -> m a +ncqOperation ncq m0 m = do + alive <- readTVarIO (ncqAlive ncq) + if alive then m else m0 + + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs index 44d40325..8cbe2323 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Prelude.hs @@ -18,6 +18,7 @@ module HBS2.Storage.NCQ3.Internal.Prelude , DataFile(..) , StateFile(..) , FilePrio(..) + , NCQStorageException(..) , ByteString , Vector, (!) , Seq(..), (|>),(<|) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 4e2a5fb5..e21e0658 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -7,6 +7,7 @@ import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Memtable import HBS2.Storage.NCQ3.Internal.Index +import HBS2.Storage.NCQ3.Internal.MMapCache import Control.Monad.Trans.Cont @@ -64,6 +65,26 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do let q = ncqWriteOps ! i forever (liftIO $ join $ atomically (readTQueue q)) + + replicateM_ 2 $ spawnActivity $ fix \next -> do + (h, answ) <- atomically $ readTQueue ncqReadReq + let answer l = atomically (putTMVar answ l) + + atomically (ncqLookupEntrySTM ncq h) >>= \case + Nothing -> none + Just e -> answer (Just (InMemory (ncqEntryData e))) >> next + + tracked <- readTVarIO ncqStateIndex + + for_ tracked $ \(_, fk) -> do + CachedIndex bs nw <- ncqGetCachedIndex ncq fk + ncqLookupIndex h (bs, nw) >>= \case + Just (IndexEntry fk o s) -> undefined >> next + Nothing -> none + + answer Nothing >> next + + spawnActivity measureWPS flip fix RunNew $ \loop -> \case diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index 723ed76b..9ba5af95 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -4,12 +4,11 @@ import HBS2.Storage.NCQ3.Internal.Prelude import Text.Printf -data CachedMMap = - CachedData ByteString - | CachedIndex ByteString NWayHash +data CachedData = CachedData !ByteString +data CachedIndex = CachedIndex !ByteString !NWayHash -type CachePrio = Word64 +type CachePrio = TimeSpec type Shard = TVar (HashMap HashRef NCQEntry) @@ -36,6 +35,18 @@ data NCQEntry = , ncqDumped :: !(TVar (Maybe FileKey)) } +type NCQOffset = Word64 +type NCQSize = Word32 + +data Location = + InFossil {-# UNPACK #-} !FileKey !NCQOffset !NCQSize + | InMemory {-# UNPACK #-} !ByteString + +instance Pretty Location where + pretty = \case + InFossil k o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s + InMemory _ -> "in-memory" + data NCQStorage3 = NCQStorage3 { ncqRoot :: FilePath @@ -48,11 +59,11 @@ data NCQStorage3 = , ncqWriteBlock :: Int , ncqMinLog :: Int , ncqMaxLog :: Int - , ncqMaxCached :: Int + , ncqMaxCachedIndex :: Int , ncqIdleThrsh :: Double - , ncqMMapCache :: TVar (HashPSQ FileKey CachePrio CachedMMap) + , ncqMMapCachedIdx :: TVar (HashPSQ FileKey CachePrio CachedIndex) , ncqStateFiles :: TVar (HashSet FileKey) - , ncqStateIndex :: TVar [(Down POSIXTime, FileKey)] -- backward timestamp order + , ncqStateIndex :: TVar [(Down POSIXTime, FileKey)] -- backward timestamp orde , ncqStateFileSeq :: TVar FileKey , ncqStateVersion :: TVar StateVersion , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) @@ -61,6 +72,7 @@ data NCQStorage3 = , ncqWriteEMA :: TVar Double -- for writes-per-seconds , ncqWriteQ :: TVar (Seq HashRef) , ncqWriteOps :: Vector (TQueue (IO ())) + , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) , ncqAlive :: TVar Bool , ncqStopReq :: TVar Bool , ncqSyncReq :: TVar Bool