works on 10m per peer, explodes on 40m

This commit is contained in:
Dmitry Zuikov 2023-01-24 07:13:14 +03:00
parent 2d06149e25
commit 8b1e3fbbfd
4 changed files with 55 additions and 33 deletions

View File

@ -107,7 +107,7 @@ runChunkWriter2 w = do
if stop then do if stop then do
ks <- liftIO $ take 20 <$> Cache.keys cache ks <- liftIO $ take 20 <$> Cache.keys cache
for_ ks $ \k -> flush w k liftIO $ for_ ks $ \k -> flush w k
else do else do
ks <- liftIO $ Cache.keys cache ks <- liftIO $ Cache.keys cache

View File

@ -23,7 +23,7 @@ pieces :: Integral a => a
pieces = 8192 pieces = 8192
class SimpleStorageExtra a where class SimpleStorageExtra a where
putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString, Block ByteString ~ ByteString) => SimpleStorage h -> a -> IO MerkleHash putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash
readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m () readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m ()
readChunked handle size = fuu readChunked handle size = fuu

View File

@ -41,6 +41,7 @@ common common-deps
, uniplate , uniplate
, vector , vector
, data-default , data-default
, mwc-random
common shared-properties common shared-properties
ghc-options: ghc-options:

View File

@ -2,6 +2,7 @@
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language RankNTypes #-} {-# Language RankNTypes #-}
{-# Language AllowAmbiguousTypes #-} {-# Language AllowAmbiguousTypes #-}
{-# LANGUAGE MultiWayIf #-}
module Main where module Main where
import HBS2.Actors.ChunkWriter import HBS2.Actors.ChunkWriter
@ -49,6 +50,10 @@ import System.Exit
import System.FilePath.Posix import System.FilePath.Posix
import System.IO import System.IO
import System.Random.MWC
import System.Random.Stateful
import qualified Data.Vector.Unboxed as U
debug :: (MonadIO m) => Doc ann -> m () debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p debug p = liftIO $ hPrint stderr p
@ -193,11 +198,13 @@ blockDownloadLoop = do
stor <- getStorage stor <- getStorage
let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" let blks = []
, "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
, "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" -- let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
, "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" -- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
] -- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
-- , "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg"
-- ]
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b
@ -210,7 +217,7 @@ blockDownloadLoop = do
<+> pretty h <+> pretty h
<+> pretty (view biSize ann) <+> pretty (view biSize ann)
initDownload blq p h s -- FIXME: don't trust everybody initDownload False blq p h s -- FIXME: don't trust everybody
fix \next -> do fix \next -> do
@ -220,8 +227,8 @@ blockDownloadLoop = do
unless here $ do unless here $ do
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do
initDownload blq p h s initDownload True blq p hx s
peers <- getPeerLocator @e >>= knownPeers @e peers <- getPeerLocator @e >>= knownPeers @e
@ -233,28 +240,31 @@ blockDownloadLoop = do
where where
initDownload q p h s = do initDownload anyway q p h s = do
sto <- getStorage sto <- getStorage
here <- liftIO $ hasBlock sto h <&> isJust here <- liftIO $ hasBlock sto h <&> isJust
if not here then do if | not here -> do
coo <- genCookie (p,h) coo <- genCookie (p,h)
let key = DownloadSessionKey (p, coo) let key = DownloadSessionKey (p, coo)
let chusz = defChunkSize let chusz = defChunkSize
let new = set sBlockChunkSize chusz let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral s) . set sBlockSize (fromIntegral s)
$ newBlockDownload h $ newBlockDownload h
update @e new key id update @e new key id
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
processBlock q p h processBlock q p h
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
else do | anyway -> processBlock q p h
processBlock q p h
| otherwise -> do
debug $ "already got " <+> pretty h <+> " so relax"
pure ()
processBlock q _ h = do processBlock q _ h = do
@ -262,7 +272,6 @@ blockDownloadLoop = do
pip <- asks (view envDeferred) pip <- asks (view envDeferred)
-- debug "process block!" -- debug "process block!"
liftIO $ addJob pip $ withPeerM env $ do liftIO $ addJob pip $ withPeerM env $ do
-- void $ liftIO $ async $ withPeerM env $ do
sto <- getStorage sto <- getStorage
liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h
@ -278,8 +287,19 @@ blockDownloadLoop = do
Just (Merkle{}) -> liftIO do Just (Merkle{}) -> liftIO do
debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h
walkMerkle h (getBlock sto) $ \(hr :: [HashRef]) -> do walkMerkle h (getBlock sto) $ \(hr :: [HashRef]) -> do
for_ hr $ \h -> debug $ "for-block" <+> pretty h
for_ hr ( atomically . Q.writeTBQueue q . fromHashRef) for_ hr $ \(HashRef blk) -> do
here <- liftIO $ hasBlock sto blk <&> isJust
if here then do
debug $ "block" <+> pretty blk <+> "is already here"
pure () -- we don't need to recurse, cause walkMerkle will recurse
else do
-- if block is missed, then
-- block to download q
atomically $ Q.writeTBQueue q blk
Just (Blob{}) -> do Just (Blob{}) -> do
pure () pure ()
@ -371,7 +391,7 @@ main :: IO ()
main = do main = do
hSetBuffering stderr LineBuffering hSetBuffering stderr LineBuffering
void $ race (pause (300 :: Timeout 'Seconds)) $ do void $ race (pause (30 :: Timeout 'Seconds)) $ do
fake <- newFakeP2P True <&> Fabriq fake <- newFakeP2P True <&> Fabriq
@ -381,11 +401,12 @@ main = do
others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do
let findBlk = hasBlock s let findBlk = hasBlock s
let size = 1024*1024 let size = 1024*1024*10
g <- initialize $ U.fromList [fromIntegral p, fromIntegral size]
let blk = B8.concat [ fromString (take 1 $ show x) bytes <- replicateM size $ uniformM g :: IO [Char]
| x <- replicate size (fromIntegral p :: Int)
] let blk = B8.pack bytes
root <- putAsMerkle s blk root <- putAsMerkle s blk
@ -430,7 +451,7 @@ main = do
liftIO $ cancel as liftIO $ cancel as
pause ( 300 :: Timeout 'Seconds) pause ( 29.9 :: Timeout 'Seconds )
mapM_ cancel (our:others) mapM_ cancel (our:others)