From 7c51ab4e8591310c69623e6e7adc195121e49c17 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sun, 22 Jan 2023 22:32:08 +0300 Subject: [PATCH] SLOW --- hbs2-core/lib/HBS2/Actors/Peer.hs | 4 +- hbs2-core/lib/HBS2/Data/Types/Refs.hs | 2 +- hbs2-core/lib/HBS2/Defaults.hs | 8 +-- hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs | 2 +- hbs2-tests/hbs2-tests.cabal | 42 +++++++-------- hbs2-tests/test/Peer2Main.hs | 58 +++++++++++++++------ 6 files changed, 69 insertions(+), 47 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 693f1d7a..56f3edc0 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -301,7 +301,7 @@ runPeerM s bus p f = do <*> liftIO (newTVarIO mempty) let de = view envDeferred env - as <- liftIO $ async $ runPipeline de + as <- liftIO $ replicateM 1 $ async $ runPipeline de sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout @@ -311,7 +311,7 @@ runPeerM s bus p f = do void $ runReaderT (fromPeerM f) env void $ liftIO $ stopPipeline de - liftIO $ cancel as + liftIO $ mapM_ cancel as withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () withPeerM env action = void $ runReaderT (fromPeerM action) env diff --git a/hbs2-core/lib/HBS2/Data/Types/Refs.hs b/hbs2-core/lib/HBS2/Data/Types/Refs.hs index 9b1435d6..1defcb45 100644 --- a/hbs2-core/lib/HBS2/Data/Types/Refs.hs +++ b/hbs2-core/lib/HBS2/Data/Types/Refs.hs @@ -11,7 +11,7 @@ import Data.String(IsString) import GHC.Generics import Prettyprinter -newtype HashRef = HashRef (Hash HbSync) +newtype HashRef = HashRef { fromHashRef :: Hash HbSync } deriving newtype (Eq,Ord,IsString,Pretty) deriving stock (Data,Generic,Show) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 4bd09e5f..67a8b8ab 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -14,20 +14,20 @@ defStorePath :: IsString a => a defStorePath = "hbs2" defPipelineSize :: Int -defPipelineSize = 100 +defPipelineSize = 1000 defChunkWriterQ :: Integral a => a -defChunkWriterQ = 100 +defChunkWriterQ = 1000 defBlockDownloadQ :: Integral a => a -defBlockDownloadQ = 100 +defBlockDownloadQ = 65536*4 defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold = 2 -- typical block hash 530+ chunks * parallel wip blocks amount defProtoPipelineSize :: Int -defProtoPipelineSize = 65536 +defProtoPipelineSize = 65536*4 defCookieTimeout :: TimeSpec defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index 0fdd4218..a1c4a1fe 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -100,7 +100,7 @@ blockChunksProto adapter (BlockChunks c p) = chunk <- blkChunk adapter h o sz maybe (pure ()) (response_ . BlockChunk @e i) chunk - BlockChunk n bs -> do + BlockChunk n bs -> deferred proto do who <- thatPeer proto me <- ownPeer @e h <- blkGetHash adapter (who, c) diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 2965d814..4542828f 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -51,9 +51,9 @@ common shared-properties -- -Werror=missing-methods -- -Werror=incomplete-patterns -- -fno-warn-unused-binds - -- -threaded - -- -rtsopts - -- "-with-rtsopts=-N4 -A64m -AL256m -I0" + -threaded + -rtsopts + "-with-rtsopts=-N8 -A64m -AL256m -I0" default-language: Haskell2010 @@ -85,7 +85,21 @@ common shared-properties , TypeFamilies -test-suite test-peer +-- test-suite test-peer +-- import: shared-properties +-- import: common-deps +-- default-language: Haskell2010 + +-- other-modules: + +-- -- other-extensions: + +-- type: exitcode-stdio-1.0 +-- hs-source-dirs: test +-- main-is: Peer2Main.hs + + +executable test-peer-run import: shared-properties import: common-deps default-language: Haskell2010 @@ -94,25 +108,7 @@ test-suite test-peer -- other-extensions: - type: exitcode-stdio-1.0 + -- type: exitcode-stdio-1.0 hs-source-dirs: test main-is: Peer2Main.hs - -test-suite test-hmap - import: shared-properties - import: common-deps - default-language: Haskell2010 - - other-modules: - - build-depends: HMap - , data-default - - -- other-extensions: - - type: exitcode-stdio-1.0 - hs-source-dirs: test - main-is: HmapMain.hs - - diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 43089961..9d87b602 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -5,8 +5,11 @@ module Main where import HBS2.Actors.ChunkWriter +import HBS2.Actors import HBS2.Actors.Peer import HBS2.Clock +import HBS2.Data.Detect +import HBS2.Data.Types import HBS2.Defaults import HBS2.Events import HBS2.Hash @@ -143,15 +146,15 @@ runTestPeer p zu = do stor <- simpleStorageInit opts cww <- newChunkWriterIO stor (Just chDir) - sw <- liftIO $ async $ simpleStorageWorker stor - cw <- liftIO $ async $ runChunkWriter cww + sw <- liftIO $ replicateM 1 $ async $ simpleStorageWorker stor + cw <- liftIO $ replicateM 1 $ async $ runChunkWriter cww zu stor cww simpleStorageStop stor stopChunkWriter cww - mapM_ cancel [sw,cw] + mapM_ cancel $ sw <> cw handleBlockInfo :: forall e m . ( MonadIO m @@ -171,6 +174,7 @@ handleBlockInfo (p, h, sz') = do update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) blockDownloadLoop :: forall e m . ( m ~ PeerM e IO + , MonadIO m , Request e (BlockInfo e) m , Request e (BlockChunks e) m , EventListener e (BlockInfo e) m @@ -191,7 +195,8 @@ blockDownloadLoop = do let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + , "ECWYwWXiLgNvCkN1EFpSYqsPcWfnL4bAQADsyZgy1Cbr" ] blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ @@ -205,7 +210,7 @@ blockDownloadLoop = do <+> pretty h <+> pretty (view biSize ann) - initDownload p h s -- FIXME: don't trust everybody + initDownload blq p h s -- FIXME: don't trust everybody fix \next -> do @@ -216,7 +221,7 @@ blockDownloadLoop = do unless here $ do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do - initDownload p h s + initDownload blq p h s peers <- getPeerLocator @e >>= knownPeers @e @@ -224,13 +229,11 @@ blockDownloadLoop = do debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p request p (GetBlockSize @e h) - liftIO $ print "piu!" - next where - initDownload p h s = do + initDownload q p h s = do sto <- getStorage here <- liftIO $ hasBlock sto h <&> isJust @@ -246,17 +249,39 @@ blockDownloadLoop = do update @e new key id subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do - processBlock h + processBlock q p h request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction else do - processBlock h + processBlock q p h - processBlock h = do - sto <- getStorage - debug $ "GOT BLOCK!" <+> pretty h + processBlock q _ h = do + env <- ask + pip <- asks (view envDeferred) + liftIO $ addJob pip $ withPeerM env $ do + -- void $ liftIO $ async $ withPeerM env $ do + + sto <- getStorage + debug $ "GOT BLOCK!" <+> pretty h + bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) + -- debug $ pretty (show bt) + + case bt of + Nothing -> pure () + + Just (AnnRef{}) -> do + pure () + + Just (Merkle{}) -> liftIO do + debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h + walkMerkle h (getBlock sto) $ \(hr :: [HashRef]) -> do + for_ hr $ \h -> debug $ "for-block" <+> pretty h + for_ hr ( atomically . Q.writeTBQueue q . fromHashRef) + + Just (Blob{}) -> do + pure () -- NOTE: this is an adapter for a ResponseM monad @@ -319,6 +344,7 @@ mkAdapter cww = do deferred (Proxy @(BlockChunks e)) $ do h1 <- liftIO $ getHash cww cKey h + -- h1 <- pure h-- liftIO $ getHash cww cKey h -- ПОСЧИТАТЬ ХЭШ -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК @@ -343,7 +369,7 @@ main :: IO () main = do hSetBuffering stderr LineBuffering - void $ race (pause (10 :: Timeout 'Seconds)) $ do + void $ race (pause (300 :: Timeout 'Seconds)) $ do fake <- newFakeP2P True <&> Fabriq @@ -402,7 +428,7 @@ main = do liftIO $ cancel as - pause ( 8 :: Timeout 'Seconds) + pause ( 300 :: Timeout 'Seconds) mapM_ cancel (our:others)