diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs index afa8292a..d96417d7 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs @@ -6,6 +6,7 @@ module HBS2.Storage.NCQ3 , ncqStorageOpen3 , ncqStorageRun3 , ncqPutBS + , ncqLocate ) where diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 383b7f4c..2361666d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -16,6 +16,7 @@ import Data.Vector qualified as V import Data.HashMap.Strict qualified as HM import Data.List qualified as List import Data.Set qualified as Set +import Lens.Micro.Platform import Data.ByteString qualified as BS import Data.Sequence qualified as Seq import System.FilePath.Posix @@ -95,7 +96,7 @@ ncqPutBS :: MonadUnliftIO m -> Maybe HashRef -> ByteString -> m HashRef -ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe (HashRef (hashObject @HbSync bs')) mhref) do +ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe hash0 mhref) do waiter <- newEmptyTMVarIO let work = do @@ -122,6 +123,8 @@ ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe atomically $ takeTMVar waiter + where hash0 = HashRef (hashObject @HbSync bs') + ncqLocate :: MonadUnliftIO m => NCQStorage3 -> HashRef -> m (Maybe Location) ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do answ <- newEmptyTMVarIO @@ -132,13 +135,11 @@ ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do atomically $ takeTMVar answ - - ncqTryLoadState :: forall m. MonadUnliftIO m => NCQStorage3 -> m () -ncqTryLoadState me = do +ncqTryLoadState me@NCQStorage3{..} = do stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" ) @@ -155,7 +156,9 @@ ncqTryLoadState me = do else next (s : l, s0, ss) - let (bad, NCQState{..}, rest) = r + let (bad, new@NCQState{..}, rest) = r + + atomically $ modifyTVar ncqState (<> new) for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do let path = ncqGetFileName me dataFile @@ -172,9 +175,10 @@ ncqTryLoadState me = do ncqIndexFile me dataFile - for_ (bad <> drop 3 (fmap snd rest)) $ \f -> do - rm (ncqGetFileName me (StateFile f)) - + for_ (bad <> drop 3 (fmap snd rest)) $ \f -> do + let old = ncqGetFileName me (StateFile f) + debug $ "rm old state" <+> pretty old + rm old where diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index a2d34bcb..8e92ef9e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -16,6 +16,7 @@ import System.IO.MMap data IndexEntry = IndexEntry {-# UNPACK #-} !FileKey !Word64 !Word32 + deriving stock (Eq,Show) unpackIndexEntry :: ByteString -> IndexEntry unpackIndexEntry entryBs = do @@ -60,12 +61,13 @@ ncqIndexFile n@NCQStorage3{..} fk = runMaybeT do let rs = (w + ncqSLen) & fromIntegral @_ @Word32 & N.bytestring32 let os = fromIntegral @_ @Word64 offset & N.bytestring64 let record = fks <> os <> rs + -- debug $ "WRITE INDEX ENTRY" <+> pretty (BS.length record) S.yield (coerce key, record) let (dir,name) = splitFileName fp let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" - result <- lift $ nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items + result <- lift $ nwayWriteBatch (nwayAllocDef 1.10 32 8 16) dir idxTemp items mv result dest diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs index 19fae103..a1cef933 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Memtable.hs @@ -1,3 +1,4 @@ +{-# Language MultiWayIf #-} module HBS2.Storage.NCQ3.Internal.Memtable where import HBS2.Storage.NCQ3.Internal.Types @@ -6,6 +7,7 @@ import HBS2.Storage.NCQ3.Internal.Prelude import Data.ByteString qualified as BS import Data.HashMap.Strict qualified as HM import Data.Vector qualified as V +import Control.Concurrent.STM qualified as STM ncqShardIdx :: NCQStorage3 -> HashRef -> Int ncqShardIdx NCQStorage3{..} h = @@ -32,9 +34,16 @@ ncqStorageSync3 :: forall m . MonadUnliftIO m => NCQStorage3 -> m () ncqStorageSync3 NCQStorage3{..} = atomically $ writeTVar ncqSyncReq True ncqOperation :: MonadIO m => NCQStorage3 -> m a -> m a -> m a -ncqOperation ncq m0 m = do - alive <- readTVarIO (ncqAlive ncq) - if alive then m else m0 +ncqOperation NCQStorage3{..} m0 m = do + what <- atomically do + alive <- readTVar ncqAlive + stop <- readTVar ncqStopReq + + if | not alive && not stop -> STM.retry + | not alive && stop -> pure False + | otherwise -> pure True + + if what then m else m0 diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index c3367679..27d0b55e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -67,10 +67,13 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do forever (liftIO $ join $ atomically (readTQueue q)) - replicateM_ 2 $ spawnActivity $ fix \next -> do + replicateM_ 1 $ spawnActivity $ fix \next -> do + (h, answ) <- atomically $ readTQueue ncqReadReq let answer l = atomically (putTMVar answ l) + -- debug $ "REQ" <+> pretty h + atomically (ncqLookupEntrySTM ncq h) >>= \case Nothing -> none Just e -> answer (Just (InMemory (ncqEntryData e))) >> next @@ -83,6 +86,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> next Nothing -> none + -- debug $ "NOT FOUND SHIT" <+> pretty h answer Nothing >> next spawnActivity measureWPS diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs index 810ab427..13bfefbd 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -8,6 +8,7 @@ import HBS2.Storage.NCQ3.Internal.Files import Data.Config.Suckless.Script import Data.Generics.Product +import Data.Generics.Labels import Data.List qualified as List import Control.Monad.Reader import Control.Monad.Trans.Maybe @@ -48,19 +49,19 @@ ncqStateAddDataFile :: FileKey -> StateOP () ncqStateAddDataFile fk = do NCQStorage3{..} <- ask StateOP $ lift do - modifyTVar ncqState (over (field @"ncqStateFiles") (HS.insert fk)) + modifyTVar ncqState (over #ncqStateFiles (HS.insert fk)) ncqStateAddFact :: Fact -> StateOP () ncqStateAddFact fact = do NCQStorage3{..} <- ask StateOP $ lift do - modifyTVar ncqState (over (field @"ncqStateFacts") (Set.insert fact)) + modifyTVar ncqState (over #ncqStateFacts (Set.insert fact)) ncqStateDelFact :: Fact -> StateOP () ncqStateDelFact fact = do NCQStorage3{..} <- ask StateOP $ lift do - modifyTVar ncqState (over (field @"ncqStateFacts") (Set.delete fact)) + modifyTVar ncqState (over #ncqStateFacts (Set.delete fact)) ncqStateAddIndexFile :: POSIXTime -> FileKey @@ -68,10 +69,10 @@ ncqStateAddIndexFile :: POSIXTime ncqStateAddIndexFile ts fk = do NCQStorage3{..} <- ask - StateOP $ lift $ modifyTVar' ncqState sortIndexes + StateOP $ lift $ modifyTVar' ncqState (sortIndexes . over #ncqStateIndex ((Down ts, fk) :)) sortIndexes :: NCQState -> NCQState -sortIndexes = over (field @"ncqStateIndex") (List.sortOn fst) +sortIndexes = over #ncqStateIndex (List.sortOn fst) ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () ncqFileFastCheck fp = do diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 2944a06c..9be2692d 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -29,10 +29,12 @@ import Data.Config.Suckless.System import NCQTestCommon +import Test.Tasty.HUnit import Data.ByteString qualified as BS import Data.Ord import Data.Set qualified as Set import System.Random.MWC as MWC +import Control.Concurrent.STM qualified as STM import UnliftIO @@ -79,3 +81,29 @@ ncq3Tests = do pause @'Seconds 2 notice $ "done" + + entry $ bindMatch "test:ncq3:write:simple" $ nil_ $ \e ->do + let (opts,args) = splitOpts [] e + let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ] + g <- liftIO MWC.createSystemRandom + runTest $ \TestEnv{..} -> do + hq <- newTQueueIO + ncqWithStorage3 testEnvDir $ \sto -> do + notice $ "write/lookup" <+> pretty num + replicateM_ num do + n <- liftIO $ uniformRM (1024, 256*1024) g + bs <- liftIO $ genRandomBS g n + h <- ncqPutBS sto (Just B) Nothing bs + found <- ncqLocate sto h <&> isJust + liftIO $ assertBool (show $ "found" <+> pretty h) found + atomically $ writeTQueue hq h + + ncqWithStorage3 testEnvDir $ \sto -> do + notice $ "reopen/lookup" <+> pretty num + hh <- atomically $ STM.flushTQueue hq + for_ hh $ \h -> do + found <- ncqLocate sto h <&> isJust + liftIO $ assertBool (show $ "found2" <+> pretty h) found + + notice $ "done" +