checking if block is here

This commit is contained in:
Dmitry Zuikov 2023-01-22 18:56:19 +03:00
parent 76e977327f
commit 7bac05bfd5
2 changed files with 49 additions and 30 deletions

View File

@ -1,6 +1,6 @@
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-} -- {-# Language AllowAmbiguousTypes #-}
module HBS2.Actors.Peer where module HBS2.Actors.Peer where
import HBS2.Actors import HBS2.Actors

View File

@ -27,6 +27,8 @@ import Test.Tasty.HUnit
import Codec.Serialise hiding (encode,decode) import Codec.Serialise hiding (encode,decode)
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as Q
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
@ -35,6 +37,7 @@ import Data.Default
import Data.Foldable hiding (find) import Data.Foldable hiding (find)
import Data.Map (Map) import Data.Map (Map)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Maybe
import Data.Word import Data.Word
import Lens.Micro.Platform import Lens.Micro.Platform
import Prettyprinter hiding (pipe) import Prettyprinter hiding (pipe)
@ -42,8 +45,6 @@ import System.Directory
import System.Exit import System.Exit
import System.FilePath.Posix import System.FilePath.Posix
import System.IO import System.IO
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as Q
debug :: (MonadIO m) => Doc ann -> m () debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p debug p = liftIO $ hPrint stderr p
@ -170,27 +171,28 @@ handleBlockInfo (p, h, sz') = do
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, HasProtocol e (BlockInfo e)
, HasProtocol e (BlockChunks e)
, Request e (BlockInfo e) m , Request e (BlockInfo e) m
, Request e (BlockChunks e) m , Request e (BlockChunks e) m
, EventListener e (BlockInfo e) m , EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m , EventListener e (BlockChunks e) m
, EventListener e (BlockAnnounce e) m , EventListener e (BlockAnnounce e) m
, EventEmitter e (BlockChunks e) m -- , EventEmitter e (BlockChunks e) m
, EventEmitter e (BlockInfo e) m -- , EventEmitter e (BlockInfo e) m
, Sessions e (BlockInfo e) m , Sessions e (BlockInfo e) m
, Sessions e (BlockChunks e) m , Sessions e (BlockChunks e) m
, HasStorage m
, Num (Peer e) , Num (Peer e)
, Pretty (Peer e) , Pretty (Peer e)
) => PeerM e IO () -- , Key HbSync ~ Hash HbSync
) => m ()
blockDownloadLoop = do blockDownloadLoop = do
let blks = [] stor <- getStorage
-- let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
-- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
-- , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
-- ] , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
]
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b
@ -198,6 +200,7 @@ blockDownloadLoop = do
subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do
let h = view biHash ann let h = view biHash ann
let s = view biSize ann let s = view biSize ann
debug $ "BLOCK ANNOUNCE!" <+> pretty p debug $ "BLOCK ANNOUNCE!" <+> pretty p
<+> pretty h <+> pretty h
<+> pretty (view biSize ann) <+> pretty (view biSize ann)
@ -208,35 +211,51 @@ blockDownloadLoop = do
h <- liftIO $ atomically $ Q.readTBQueue blq h <- liftIO $ atomically $ Q.readTBQueue blq
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do here <- liftIO $ hasBlock stor h <&> isJust
initDownload p h s
peers <- getPeerLocator @e >>= knownPeers @e unless here $ do
for_ peers $ \p -> do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do
debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p initDownload p h s
request p (GetBlockSize @e h)
peers <- getPeerLocator @e >>= knownPeers @e
for_ peers $ \p -> do
debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p
request p (GetBlockSize @e h)
liftIO $ print "piu!" liftIO $ print "piu!"
next next
where where
initDownload p h s = do initDownload p h s = do
coo <- genCookie (p,h) sto <- getStorage
let key = DownloadSessionKey (p, coo) here <- liftIO $ hasBlock sto h <&> isJust
let chusz = defChunkSize
let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral s)
$ newBlockDownload h
update @e new key id if not here then do
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do coo <- genCookie (p,h)
debug $ "GOT BLOCK!" <+> pretty h let key = DownloadSessionKey (p, coo)
pure () let chusz = defChunkSize
let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral s)
$ newBlockDownload h
update @e new key id
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
processBlock h
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
else do
processBlock h
processBlock h = do
debug $ "GOT BLOCK!" <+> pretty h
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
-- NOTE: this is an adapter for a ResponseM monad -- NOTE: this is an adapter for a ResponseM monad