From adcfbf5be2ef76d62ad5ea0616d67c33283fd7ea Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 19 Jan 2023 09:42:15 +0300 Subject: [PATCH] wip --- hbs2-tests/test/Main.hs | 77 +++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/hbs2-tests/test/Main.hs b/hbs2-tests/test/Main.hs index 41b382b9..09f54cac 100644 --- a/hbs2-tests/test/Main.hs +++ b/hbs2-tests/test/Main.hs @@ -40,6 +40,9 @@ import Data.Map (Map) import Data.Map qualified as Map import Control.Monad.Trans.Maybe +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue qualified as Q + debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p @@ -57,27 +60,33 @@ newtype ChunkNum = ChunkNum Word16 -- therefore, key is ( p | cookie ) -- but client's cookie in protocol should be just ( cookie :: Word32 ) -data BlockDownload = +type OnBlockReady h m = Hash h -> m () + +data BlockDownload m = BlockDownload { _sBlockHash :: Hash HbSync , _sBlockChunkSize :: ChunkSize , _sBlockOffset :: Offset , _sBlockWritten :: Size + , _sOnBlockReady :: OnBlockReady HbSync m } -data Sessions e = +data Sessions e m = Sessions - { _sBlockDownload :: Cache (Peer e, Cookie e) BlockDownload + { _sBlockDownload :: Cache (Peer e, Cookie e) (BlockDownload m) , _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size) , _sBlockSize :: Cache (Hash HbSync) Size } - makeLenses 'Sessions makeLenses 'BlockDownload -newBlockDownload :: Hash HbSync -> BlockDownload +newBlockDownload :: forall m . MonadIO m + => Hash HbSync + -> OnBlockReady HbSync m + -> BlockDownload m + newBlockDownload h = BlockDownload h 0 0 0 type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString) @@ -191,7 +200,7 @@ main = do -- ] -emptySessions :: forall e m . MonadIO m => m (Sessions e) +emptySessions :: forall e m . MonadIO m => m (Sessions e m) emptySessions = liftIO $ Sessions <$> Cache.newCache (Just defCookieTimeout) <*> Cache.newCache (Just defBlockInfoTimeout) @@ -225,7 +234,7 @@ delSession se l k = liftIO do expireSession se l = liftIO do Cache.purgeExpired (view l se) -runFakePeer :: forall e . e ~ Fake => Sessions e -> EngineEnv e -> IO () +runFakePeer :: forall e m . (e ~ Fake, m ~ IO) => Sessions e m -> EngineEnv e -> m () runFakePeer se env = do let pid = fromIntegral (hash (env ^. self)) :: Word8 @@ -281,9 +290,13 @@ runFakePeer se env = do -- УДАЛЯЕМ КУКУ? , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do - let def = newBlockDownload h let cKey = (p,c) + -- check if there is a session + void $ MaybeT $ getSession' se sBlockDownload cKey id + + let def = newBlockDownload h dontHandle + let bslen = fromIntegral $ B8.length bs -- TODO: log this situation mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing @@ -297,8 +310,11 @@ runFakePeer se env = do writeChunk cww cKey h offset bs updSession se def sBlockDownload cKey (over sBlockWritten (+bslen)) - maxOff <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockOffset) - written <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockWritten) + dwnld <- MaybeT $ getSession' se sBlockDownload cKey id + + let maxOff = view sBlockOffset dwnld + let written = view sBlockWritten dwnld + let notify = view sOnBlockReady dwnld let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize && written >= mbSize @@ -311,18 +327,16 @@ runFakePeer se env = do -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ when ( h1 == h ) $ do - debug $ "THIS BLOCK IS DEFINITELY DONE" <+> pretty h1 - liftIO $ commitBlock cww cKey h + lift $ commitBlock cww cKey h + lift $ notify h delSession se sBlockDownload cKey - -- TODO: #ASAP - -- NOTIFY BLOCK IS DONE when (written > mbSize * defBlockDownloadThreshold) $ do debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p delSession se sBlockDownload cKey -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ - -- ЧАНКИ, ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. + -- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. -- ТАК НЕ ПОЙДЕТ -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся } @@ -336,8 +350,6 @@ runFakePeer se env = do stopChunkWriter cww - pause ( 0.25 :: Timeout 'Seconds) - mapM_ cancel [w,cw] @@ -360,11 +372,6 @@ test1 = do peerz <- mapM (async . uncurry runFakePeer) ee runEngineM e0 $ do - request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) - request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) - - request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) - request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) let h = fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" @@ -374,18 +381,36 @@ test1 = do let s0 = (fst . head) ee let cKey@(_, cookie) = (p1, 0) -- <<~~~ FIXME: generate a good session id! let chsz = defChunkSize - let def = newBlockDownload h + + qblk <- liftIO Q.newTQueueIO + + let onBlockReady bh = do + liftIO $ atomically $ Q.writeTQueue qblk bh + + let def = newBlockDownload h onBlockReady + + -- create sessions before sequesting anything updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz) + request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) + request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + + request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) + request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + + -- TODO: #ASAP block ready notification + debug $ "REQUEST BLOCK:" <+> pretty h <+> "from" <+> pretty p1 + request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz)) + blk <- liftIO $ atomically $ Q.readTQueue qblk + + debug $ "BLOCK READY:" <+> pretty blk + pure () - - pause ( 1 :: Timeout 'Seconds) - mapM_ cancel peerz (_, e) <- waitAnyCatchCancel peerz