diff --git a/.gitignore b/.gitignore index 0479bb49..48ee1944 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ cabal.project.local .backup/ .hbs2-git/ +bin/ diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index 05f4adb2..769f5340 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -656,8 +656,8 @@ refChanWorker env brains = do let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ] - -- FIXME: process-in-parallel - forM_ (HashMap.toList byChan) $ \(c,new) -> do + -- FIXME: thread-num-hardcode-to-remove + pooledForConcurrentlyN_ 4 (HashMap.toList byChan) $ \(c,new) -> do mbLog <- liftIO $ getRef sto c hashes <- maybe1 mbLog (pure mempty) $ readLog (getBlock sto) . HashRef diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 6c6c7722..e6ba7a3e 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -995,3 +995,41 @@ executable test-pipe-mess +executable test-merge-limits + import: shared-properties + default-language: Haskell2010 + + -- other-extensions: + + hs-source-dirs: test + main-is: TestMergeLimits.hs + build-depends: + base, hbs2-core, hbs2-storage-simple + , async + , bytestring + , cache + , containers + , hashable + , microlens-platform + , mtl + , prettyprinter + , QuickCheck + , quickcheck-instances + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-quickcheck + , tasty-hunit + , tasty-quickcheck + , transformers + , uniplate + , vector + , filepath + , temporary + , unliftio + , unordered-containers + , unix + , timeit diff --git a/hbs2-tests/test/TestMergeLimits.hs b/hbs2-tests/test/TestMergeLimits.hs new file mode 100644 index 00000000..4b8eae60 --- /dev/null +++ b/hbs2-tests/test/TestMergeLimits.hs @@ -0,0 +1,67 @@ +{-# Language NumericUnderscores #-} +module Main where + +import HBS2.Prelude.Plated + +import HBS2.Hash +import HBS2.Data.Types.Refs + +import Data.ByteString.Lazy qualified as LBS +import Data.Function +import Streaming.Prelude qualified as S +import System.TimeIt +import Data.HashSet qualified as HS +import Data.HashSet (HashSet) +import Data.List qualified as List +import UnliftIO +import System.Random +import Data.Word +import Control.Monad + +rndHash :: IO HashRef +rndHash = do + w1 <- replicateM 4 $ randomIO @Word64 + pure $ HashRef $ hashObject @HbSync (serialise w1) + +main :: IO () +main = do + rnd <- openFile "/dev/random" ReadMode + + lbs <- LBS.hGetNonBlocking rnd $ 32_000_000 * 10 + + hashList <- S.toList_ do + flip fix lbs $ \next rest -> do + let (a,rest') = LBS.splitAt 32 rest + S.yield $ HashRef $! HbSyncHash (LBS.toStrict a) + unless (LBS.null rest') $ next rest' + + chunks <- S.toList_ do + flip fix hashList $ \next rest -> do + let (c, rest') = List.splitAt 1_000_000 rest + S.yield c + unless (List.null rest') $ next rest' + + pieces <- forConcurrently chunks (pure . HS.fromList) + + hs <- timeItNamed "rebuild index" do + let hashSet = HS.unions pieces + print $ length hashSet + pure hashSet + + void $ timeItNamed "calculate hash" do + let bs = serialise hs + let hx = hashObject @HbSync bs + print $ pretty hx + + putStrLn "now we have partially sorted index" + + hashes <- replicateM 100 rndHash + + timeItNamed "add new items" do + let hs2 = HS.union hs (HS.fromList hashes) + -- let hx = hashObject @HbSync (serialise hs2) + print $ pretty (HS.size hs2) -- <+> pretty hx + + pure () + +