From 8b1e3fbbfd1b9db79172f3563864e8a714b2c723 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 24 Jan 2023 07:13:14 +0300 Subject: [PATCH] works on 10m per peer, explodes on 40m --- hbs2-core/lib/HBS2/Actors/ChunkWriter.hs | 2 +- .../lib/HBS2/Storage/Simple/Extra.hs | 2 +- hbs2-tests/hbs2-tests.cabal | 1 + hbs2-tests/test/Peer2Main.hs | 83 ++++++++++++------- 4 files changed, 55 insertions(+), 33 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index 0c56a282..3ddf1488 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -107,7 +107,7 @@ runChunkWriter2 w = do if stop then do ks <- liftIO $ take 20 <$> Cache.keys cache - for_ ks $ \k -> flush w k + liftIO $ for_ ks $ \k -> flush w k else do ks <- liftIO $ Cache.keys cache diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index f35a023d..3225a82a 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -23,7 +23,7 @@ pieces :: Integral a => a pieces = 8192 class SimpleStorageExtra a where - putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString, Block ByteString ~ ByteString) => SimpleStorage h -> a -> IO MerkleHash + putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m () readChunked handle size = fuu diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 4542828f..73170c33 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -41,6 +41,7 @@ common common-deps , uniplate , vector , data-default + , mwc-random common shared-properties ghc-options: diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 13a4b243..07be97d1 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -2,6 +2,7 @@ {-# Language UndecidableInstances #-} {-# Language RankNTypes #-} {-# Language AllowAmbiguousTypes #-} +{-# LANGUAGE MultiWayIf #-} module Main where import HBS2.Actors.ChunkWriter @@ -49,6 +50,10 @@ import System.Exit import System.FilePath.Posix import System.IO +import System.Random.MWC +import System.Random.Stateful +import qualified Data.Vector.Unboxed as U + debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p @@ -193,11 +198,13 @@ blockDownloadLoop = do stor <- getStorage - let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" - , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - , "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" - ] + let blks = [] + + -- let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + -- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + -- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + -- , "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" + -- ] blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b @@ -210,7 +217,7 @@ blockDownloadLoop = do <+> pretty h <+> pretty (view biSize ann) - initDownload blq p h s -- FIXME: don't trust everybody + initDownload False blq p h s -- FIXME: don't trust everybody fix \next -> do @@ -220,8 +227,8 @@ blockDownloadLoop = do unless here $ do - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do - initDownload blq p h s + subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do + initDownload True blq p hx s peers <- getPeerLocator @e >>= knownPeers @e @@ -233,28 +240,31 @@ blockDownloadLoop = do where - initDownload q p h s = do + initDownload anyway q p h s = do sto <- getStorage here <- liftIO $ hasBlock sto h <&> isJust - if not here then do + if | not here -> do - coo <- genCookie (p,h) - let key = DownloadSessionKey (p, coo) - let chusz = defChunkSize - let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral s) - $ newBlockDownload h + coo <- genCookie (p,h) + let key = DownloadSessionKey (p, coo) + let chusz = defChunkSize + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral s) + $ newBlockDownload h - update @e new key id + update @e new key id - subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do - processBlock q p h + subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do + processBlock q p h - request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction + request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction - else do - processBlock q p h + | anyway -> processBlock q p h + + | otherwise -> do + debug $ "already got " <+> pretty h <+> " so relax" + pure () processBlock q _ h = do @@ -262,7 +272,6 @@ blockDownloadLoop = do pip <- asks (view envDeferred) -- debug "process block!" liftIO $ addJob pip $ withPeerM env $ do - -- void $ liftIO $ async $ withPeerM env $ do sto <- getStorage liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h @@ -278,8 +287,19 @@ blockDownloadLoop = do Just (Merkle{}) -> liftIO do debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h walkMerkle h (getBlock sto) $ \(hr :: [HashRef]) -> do - for_ hr $ \h -> debug $ "for-block" <+> pretty h - for_ hr ( atomically . Q.writeTBQueue q . fromHashRef) + + for_ hr $ \(HashRef blk) -> do + + here <- liftIO $ hasBlock sto blk <&> isJust + + if here then do + debug $ "block" <+> pretty blk <+> "is already here" + pure () -- we don't need to recurse, cause walkMerkle will recurse + + else do + -- if block is missed, then + -- block to download q + atomically $ Q.writeTBQueue q blk Just (Blob{}) -> do pure () @@ -371,7 +391,7 @@ main :: IO () main = do hSetBuffering stderr LineBuffering - void $ race (pause (300 :: Timeout 'Seconds)) $ do + void $ race (pause (30 :: Timeout 'Seconds)) $ do fake <- newFakeP2P True <&> Fabriq @@ -381,11 +401,12 @@ main = do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do let findBlk = hasBlock s - let size = 1024*1024 + let size = 1024*1024*10 + g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] - let blk = B8.concat [ fromString (take 1 $ show x) - | x <- replicate size (fromIntegral p :: Int) - ] + bytes <- replicateM size $ uniformM g :: IO [Char] + + let blk = B8.pack bytes root <- putAsMerkle s blk @@ -430,7 +451,7 @@ main = do liftIO $ cancel as - pause ( 300 :: Timeout 'Seconds) + pause ( 29.9 :: Timeout 'Seconds ) mapM_ cancel (our:others)