From 7cd6afd165640c0907228be3b35b4304d6bef3b5 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 21 Jan 2023 06:01:38 +0300 Subject: [PATCH] wip --- hbs2-tests/test/PeerMain.hs | 233 +++++++++++++++++++----------------- 1 file changed, 122 insertions(+), 111 deletions(-) diff --git a/hbs2-tests/test/PeerMain.hs b/hbs2-tests/test/PeerMain.hs index 835fdfcf..44bbbbad 100644 --- a/hbs2-tests/test/PeerMain.hs +++ b/hbs2-tests/test/PeerMain.hs @@ -17,6 +17,7 @@ import HBS2.Net.Messaging.Fake import HBS2.Actors.Peer import HBS2.Defaults +import HBS2.Data.Types.Refs import HBS2.Storage import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra @@ -332,6 +333,124 @@ runFakePeer ev p0 bus work = do mapM_ cancel [w,cw,peer] +-- TODO: замутить мап/кэш со статистикой по блоку: +-- сколько блок там маринуется и т.п. +-- Если блок в этом кэше и еще не скачан, то +-- ... пробуем качать повторно? +-- ... увеличиваем время +-- ... если не появилось новых пиров +-- ... запоминать, у какого пира уже спрашивали и стараться +-- ... спрашивать у других? +-- ... для каждого блока - вести список, у кого лучше спрашивать? +-- ... и там whilelist, blacklist +-- ... не дохрена ли это будет занимать? +-- +-- ... и тут, короче, еще кэш WiP +-- ... и еще один поток, который это всё хэндлит, например: +-- ... берём статистику блоков, берём wip +-- ... если блок не wip и до сих пор в мапе --- то то добавляем +-- ... в очередь. +-- +-- ... блоку пишем, у каких пиров уже спрашивали (Set) +-- ... блоку пишем, когда стартовал процесс +-- +-- + + +blockDownloadLoop ev0 p1 = do + + + let ini = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + ] + + blkQ <- liftIO $ do + b <- newTBQueueIO defBlockDownloadQ + traverse_ (atomically . TBQ.writeTBQueue b) ini + pure b + + + -- TODO: random shuffle and take X + -- подтягиваем новых пиров откуда можем + -- для каждого блока решаем, откуда брать: + -- shuffle (white-list) <> shuffle (black-list) + -- + -- + let knownPeers = [p1] + + fix \next -> do + + -- Вечно ждём. Это и правильно и неправильно + + blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ + + -- TODO: check is this block is already here + -- maybe emit event to continue -> parse/seek for content + + -- TODO: убивать нотификации, если блок скачан или что-то с ним еще + -- приключилось + -- + -- добавляем сюда экшоны на почистить: + -- добавили нотификацию --- экшон на + -- почистить нотификацию + -- + -- добавили еще какую парашу -- экшон на + -- её очистку + -- + -- у каждого экшона - дедлайн + -- и там процесс, который берёт тех, у кого дедлайн + -- истёк и вызывает их + -- ? + + addBlockReadyEventNotify ev0 blkHash $ \hash -> do + debug $ "DOWNLOADED BLOCK" <+> pretty hash <+> "NOW WHAT?" + + -- ВЫКОВЫРЯТЬ СТОРЕЙДЖ (как?) + -- ЗАСУНУТЫЙ В READER? + + obj <- undefined -- getBlock ss hash + + let mbLink = deserialiseOrFail @AnnotatedHashRef obj + + pure () + + + -- -- TODO: смотрим, что за блок + -- -- если Merkle - то качаем рекурсивно + -- -- если ссылка - то смотрим, что за ссылка + -- -- проверяем пруфы + -- -- качаем рекурсивно + + -- TODO: надо трекать, может блок-то и найден + -- либо по всем пирам спросить + + addBlockSizeEventNotify ev0 blkHash $ \case + (p, h, Just size) -> do + coo <- genCookie (p,blkHash) + let key = DownloadSessionKey (p, coo) + let chusz = defChunkSize + + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral size) + $ newBlockDownload blkHash + + update @Fake new key id + request p (BlockChunks coo (BlockGetAllChunks @Fake blkHash chusz)) -- FIXME: nice construction + + _ -> pure () + + -- TODO: смотрим, может у нас уже есть block-size + -- тогда ловим случайного пира, у которого оно есть + -- и ставим на закачку + + -- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ + for_ knownPeers $ \who -> + request who (GetBlockSize @Fake blkHash) + + next + + + test1 :: IO () test1 = do @@ -339,7 +458,7 @@ test1 = do fake <- newFakeP2P True - void $ race (pause (2 :: Timeout 'Seconds)) $ do + void $ race (pause (10 :: Timeout 'Seconds)) $ do let p0 = 0 :: Peer Fake let p1 = 1 :: Peer Fake @@ -349,122 +468,14 @@ test1 = do p1Thread <- async $ runFakePeer ev1 p1 fake $ forever $ liftIO yield - -- TODO: замутить мап/кэш со статистикой по блоку: - -- сколько блок там маринуется и т.п. - -- Если блок в этом кэше и еще не скачан, то - -- ... пробуем качать повторно? - -- ... увеличиваем время - -- ... если не появилось новых пиров - -- ... запоминать, у какого пира уже спрашивали и стараться - -- ... спрашивать у других? - -- ... для каждого блока - вести список, у кого лучше спрашивать? - -- ... и там whilelist, blacklist - -- ... не дохрена ли это будет занимать? - -- - -- ... и тут, короче, еще кэш WiP - -- ... и еще один поток, который это всё хэндлит, например: - -- ... берём статистику блоков, берём wip - -- ... если блок не wip и до сих пор в мапе --- то то добавляем - -- ... в очередь. - -- - -- ... блоку пишем, у каких пиров уже спрашивали (Set) - -- ... блоку пишем, когда стартовал процесс - -- - -- - - let ini = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" - , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - ] - - blkQ <- liftIO $ do - b <- newTBQueueIO defBlockDownloadQ - traverse_ (atomically . TBQ.writeTBQueue b) ini - pure b - p0Thread <- async $ runFakePeer ev0 p0 fake $ do - -- TODO: random shuffle and take X - -- подтягиваем новых пиров откуда можем - -- для каждого блока решаем, откуда брать: - -- shuffle (white-list) <> shuffle (black-list) - -- - -- - let knownPeers = [p1] - - fix \next -> do - - blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ - - -- TODO: check is this block is already here - -- maybe emit event to continue -> parse/seek for content - - -- TODO: убивать нотификации, если блок скачан или что-то с ним еще - -- приключилось - -- - -- добавляем сюда экшоны на почистить: - -- добавили нотификацию --- экшон на - -- почистить нотификацию - -- - -- добавили еще какую парашу -- экшон на - -- её очистку - -- - -- у каждого экшона - дедлайн - -- и там процесс, который берёт тех, у кого дедлайн - -- истёк и вызывает их - -- ? - - addBlockReadyEventNotify ev0 blkHash $ \h -> do - debug $ "DOWNLOADED BLOCK" <+> pretty h <+> "NOW WHAT?" - - -- -- TODO: смотрим, что за блок - -- -- если Merkle - то качаем рекурсивно - -- -- если ссылка - то смотрим, что за ссылка - -- -- проверяем пруфы - -- -- качаем рекурсивно - - -- TODO: надо трекать, может блок-то и найден - -- либо по всем пирам спросить - - addBlockSizeEventNotify ev0 blkHash $ \case - (p, h, Just size) -> do - coo <- genCookie (p,blkHash) - let key = DownloadSessionKey (p, coo) - let chusz = defChunkSize - - let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral size) - $ newBlockDownload blkHash - - update @Fake new key id - request p (BlockChunks coo (BlockGetAllChunks @Fake blkHash chusz)) -- FIXME: nice construction - liftIO $ print $ "DAVAI BLOCK!" <+> pretty h - - _ -> pure () - - -- TODO: смотрим, может у нас уже есть block-size - -- тогда ловим случайного пира, у которого оно есть - -- и ставим на закачку - - -- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ - for_ knownPeers $ \who -> - request who (GetBlockSize @Fake blkHash) - - next + blockDownloadLoop ev0 p1 let peerz = p0Thread : [p1Thread] - -- -- TODO: смотрим, что за блок - -- -- если Merkle - то качаем рекурсивно - -- -- если ссылка - то смотрим, что за ссылка - -- -- проверяем пруфы - -- -- качаем рекурсивно - - -- -- let mbLink = deserialiseOrFail @Merkle obj - - -- pure () - - pause ( 1 :: Timeout 'Seconds) + pause ( 5 :: Timeout 'Seconds) mapM_ cancel peerz