This commit is contained in:
Dmitry Zuikov 2023-01-22 22:32:08 +03:00
parent 762fc12de9
commit 7c51ab4e85
6 changed files with 69 additions and 47 deletions

View File

@ -301,7 +301,7 @@ runPeerM s bus p f = do
<*> liftIO (newTVarIO mempty) <*> liftIO (newTVarIO mempty)
let de = view envDeferred env let de = view envDeferred env
as <- liftIO $ async $ runPipeline de as <- liftIO $ replicateM 1 $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout pause defSweepTimeout
@ -311,7 +311,7 @@ runPeerM s bus p f = do
void $ runReaderT (fromPeerM f) env void $ runReaderT (fromPeerM f) env
void $ liftIO $ stopPipeline de void $ liftIO $ stopPipeline de
liftIO $ cancel as liftIO $ mapM_ cancel as
withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m ()
withPeerM env action = void $ runReaderT (fromPeerM action) env withPeerM env action = void $ runReaderT (fromPeerM action) env

View File

@ -11,7 +11,7 @@ import Data.String(IsString)
import GHC.Generics import GHC.Generics
import Prettyprinter import Prettyprinter
newtype HashRef = HashRef (Hash HbSync) newtype HashRef = HashRef { fromHashRef :: Hash HbSync }
deriving newtype (Eq,Ord,IsString,Pretty) deriving newtype (Eq,Ord,IsString,Pretty)
deriving stock (Data,Generic,Show) deriving stock (Data,Generic,Show)

View File

@ -14,20 +14,20 @@ defStorePath :: IsString a => a
defStorePath = "hbs2" defStorePath = "hbs2"
defPipelineSize :: Int defPipelineSize :: Int
defPipelineSize = 100 defPipelineSize = 1000
defChunkWriterQ :: Integral a => a defChunkWriterQ :: Integral a => a
defChunkWriterQ = 100 defChunkWriterQ = 1000
defBlockDownloadQ :: Integral a => a defBlockDownloadQ :: Integral a => a
defBlockDownloadQ = 100 defBlockDownloadQ = 65536*4
defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold :: Integral a => a
defBlockDownloadThreshold = 2 defBlockDownloadThreshold = 2
-- typical block hash 530+ chunks * parallel wip blocks amount -- typical block hash 530+ chunks * parallel wip blocks amount
defProtoPipelineSize :: Int defProtoPipelineSize :: Int
defProtoPipelineSize = 65536 defProtoPipelineSize = 65536*4
defCookieTimeout :: TimeSpec defCookieTimeout :: TimeSpec
defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)

View File

@ -100,7 +100,7 @@ blockChunksProto adapter (BlockChunks c p) =
chunk <- blkChunk adapter h o sz chunk <- blkChunk adapter h o sz
maybe (pure ()) (response_ . BlockChunk @e i) chunk maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockChunk n bs -> do BlockChunk n bs -> deferred proto do
who <- thatPeer proto who <- thatPeer proto
me <- ownPeer @e me <- ownPeer @e
h <- blkGetHash adapter (who, c) h <- blkGetHash adapter (who, c)

View File

@ -51,9 +51,9 @@ common shared-properties
-- -Werror=missing-methods -- -Werror=missing-methods
-- -Werror=incomplete-patterns -- -Werror=incomplete-patterns
-- -fno-warn-unused-binds -- -fno-warn-unused-binds
-- -threaded -threaded
-- -rtsopts -rtsopts
-- "-with-rtsopts=-N4 -A64m -AL256m -I0" "-with-rtsopts=-N8 -A64m -AL256m -I0"
default-language: Haskell2010 default-language: Haskell2010
@ -85,7 +85,21 @@ common shared-properties
, TypeFamilies , TypeFamilies
test-suite test-peer -- test-suite test-peer
-- import: shared-properties
-- import: common-deps
-- default-language: Haskell2010
-- other-modules:
-- -- other-extensions:
-- type: exitcode-stdio-1.0
-- hs-source-dirs: test
-- main-is: Peer2Main.hs
executable test-peer-run
import: shared-properties import: shared-properties
import: common-deps import: common-deps
default-language: Haskell2010 default-language: Haskell2010
@ -94,25 +108,7 @@ test-suite test-peer
-- other-extensions: -- other-extensions:
type: exitcode-stdio-1.0 -- type: exitcode-stdio-1.0
hs-source-dirs: test hs-source-dirs: test
main-is: Peer2Main.hs main-is: Peer2Main.hs
test-suite test-hmap
import: shared-properties
import: common-deps
default-language: Haskell2010
other-modules:
build-depends: HMap
, data-default
-- other-extensions:
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: HmapMain.hs

View File

@ -5,8 +5,11 @@
module Main where module Main where
import HBS2.Actors.ChunkWriter import HBS2.Actors.ChunkWriter
import HBS2.Actors
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Clock import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Data.Types
import HBS2.Defaults import HBS2.Defaults
import HBS2.Events import HBS2.Events
import HBS2.Hash import HBS2.Hash
@ -143,15 +146,15 @@ runTestPeer p zu = do
stor <- simpleStorageInit opts stor <- simpleStorageInit opts
cww <- newChunkWriterIO stor (Just chDir) cww <- newChunkWriterIO stor (Just chDir)
sw <- liftIO $ async $ simpleStorageWorker stor sw <- liftIO $ replicateM 1 $ async $ simpleStorageWorker stor
cw <- liftIO $ async $ runChunkWriter cww cw <- liftIO $ replicateM 1 $ async $ runChunkWriter cww
zu stor cww zu stor cww
simpleStorageStop stor simpleStorageStop stor
stopChunkWriter cww stopChunkWriter cww
mapM_ cancel [sw,cw] mapM_ cancel $ sw <> cw
handleBlockInfo :: forall e m . ( MonadIO m handleBlockInfo :: forall e m . ( MonadIO m
@ -171,6 +174,7 @@ handleBlockInfo (p, h, sz') = do
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, MonadIO m
, Request e (BlockInfo e) m , Request e (BlockInfo e) m
, Request e (BlockChunks e) m , Request e (BlockChunks e) m
, EventListener e (BlockInfo e) m , EventListener e (BlockInfo e) m
@ -191,7 +195,8 @@ blockDownloadLoop = do
let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
, "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
, "ECWYwWXiLgNvCkN1EFpSYqsPcWfnL4bAQADsyZgy1Cbr"
] ]
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
@ -205,7 +210,7 @@ blockDownloadLoop = do
<+> pretty h <+> pretty h
<+> pretty (view biSize ann) <+> pretty (view biSize ann)
initDownload p h s -- FIXME: don't trust everybody initDownload blq p h s -- FIXME: don't trust everybody
fix \next -> do fix \next -> do
@ -216,7 +221,7 @@ blockDownloadLoop = do
unless here $ do unless here $ do
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do
initDownload p h s initDownload blq p h s
peers <- getPeerLocator @e >>= knownPeers @e peers <- getPeerLocator @e >>= knownPeers @e
@ -224,13 +229,11 @@ blockDownloadLoop = do
debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p
request p (GetBlockSize @e h) request p (GetBlockSize @e h)
liftIO $ print "piu!"
next next
where where
initDownload p h s = do initDownload q p h s = do
sto <- getStorage sto <- getStorage
here <- liftIO $ hasBlock sto h <&> isJust here <- liftIO $ hasBlock sto h <&> isJust
@ -246,17 +249,39 @@ blockDownloadLoop = do
update @e new key id update @e new key id
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
processBlock h processBlock q p h
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
else do else do
processBlock h processBlock q p h
processBlock h = do processBlock q _ h = do
sto <- getStorage
debug $ "GOT BLOCK!" <+> pretty h
env <- ask
pip <- asks (view envDeferred)
liftIO $ addJob pip $ withPeerM env $ do
-- void $ liftIO $ async $ withPeerM env $ do
sto <- getStorage
debug $ "GOT BLOCK!" <+> pretty h
bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h)
-- debug $ pretty (show bt)
case bt of
Nothing -> pure ()
Just (AnnRef{}) -> do
pure ()
Just (Merkle{}) -> liftIO do
debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h
walkMerkle h (getBlock sto) $ \(hr :: [HashRef]) -> do
for_ hr $ \h -> debug $ "for-block" <+> pretty h
for_ hr ( atomically . Q.writeTBQueue q . fromHashRef)
Just (Blob{}) -> do
pure ()
-- NOTE: this is an adapter for a ResponseM monad -- NOTE: this is an adapter for a ResponseM monad
@ -319,6 +344,7 @@ mkAdapter cww = do
deferred (Proxy @(BlockChunks e)) $ do deferred (Proxy @(BlockChunks e)) $ do
h1 <- liftIO $ getHash cww cKey h h1 <- liftIO $ getHash cww cKey h
-- h1 <- pure h-- liftIO $ getHash cww cKey h
-- ПОСЧИТАТЬ ХЭШ -- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
@ -343,7 +369,7 @@ main :: IO ()
main = do main = do
hSetBuffering stderr LineBuffering hSetBuffering stderr LineBuffering
void $ race (pause (10 :: Timeout 'Seconds)) $ do void $ race (pause (300 :: Timeout 'Seconds)) $ do
fake <- newFakeP2P True <&> Fabriq fake <- newFakeP2P True <&> Fabriq
@ -402,7 +428,7 @@ main = do
liftIO $ cancel as liftIO $ cancel as
pause ( 8 :: Timeout 'Seconds) pause ( 300 :: Timeout 'Seconds)
mapM_ cancel (our:others) mapM_ cancel (our:others)