diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index c1888fdc..3372f461 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -319,8 +319,6 @@ flush w fn = do let scache = semFlush w liftIO $ do - print "flush" - q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 100) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index e502896c..3219d29b 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -303,7 +303,7 @@ runPeerM s bus p f = do <*> liftIO (newTVarIO mempty) let de = view envDeferred env - as <- liftIO $ replicateM 2 $ async $ runPipeline de + as <- liftIO $ replicateM 8 $ async $ runPipeline de sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 3d3f7229..37ffbd7d 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -35,6 +35,13 @@ defCookieTimeout = toTimeSpec ( 60 :: Timeout 'Minutes) defBlockInfoTimeout :: TimeSpec defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes) +-- how much time wait for block from peer? +defBlockWaitMax :: Timeout 'Seconds +defBlockWaitMax = 10 :: Timeout 'Seconds + +defBlockWaitSleep :: Timeout 'Seconds +defBlockWaitSleep = 0.01 :: Timeout 'Seconds + defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 5 -- FIXME: only for debug! diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index e304c1c7..a1c4a1fe 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -86,8 +86,6 @@ blockChunksProto adapter (BlockChunks c p) = case p of BlockGetAllChunks h size -> deferred proto do - liftIO $ print $ "BlockGetAllChunks" <+> pretty h - me <- ownPeer @e who <- thatPeer proto diff --git a/hbs2-tests/hie.yaml b/hbs2-tests/hie.yaml deleted file mode 100644 index ace9c439..00000000 --- a/hbs2-tests/hie.yaml +++ /dev/null @@ -1,4 +0,0 @@ -cradle: - cabal: - - path: "test/Peer2Main.hs" - component: "hbs2-tests:exe:test-peer-run" diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index c8136bc3..641e3032 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -49,6 +49,7 @@ import System.Directory import System.Exit import System.FilePath.Posix import System.IO +import Safe import System.Random.MWC import qualified Data.Vector.Unboxed as U @@ -65,13 +66,16 @@ data BlockDownload = , _sBlockChunkSize :: ChunkSize , _sBlockOffset :: Offset , _sBlockWritten :: Size + , _sBlockWrittenT :: TVar Size } deriving stock (Typeable) makeLenses 'BlockDownload -newBlockDownload :: Hash HbSync -> BlockDownload -newBlockDownload h = BlockDownload h 0 0 0 0 +newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload +newBlockDownload h = do + t <- liftIO $ newTVarIO 0 + pure $ BlockDownload h 0 0 0 0 t instance HasPeer Fake where newtype instance Peer Fake = FakePeer Word8 @@ -243,19 +247,20 @@ blockDownloadLoop cw = do for_ peers $ \peer -> do subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do - debug $ "got block size for" <+> pretty h liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s))) - debug $ "requesting size for" <+> pretty h + -- debug $ "requesting size for" <+> pretty h request @e peer (GetBlockSize @e h) next where - initDownload anyway q p h s = do + initDownload anyway q p h thisBkSize = do - debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty s + env <- ask + + -- debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty thisBkSize sto <- getStorage here <- liftIO $ hasBlock sto h <&> isJust @@ -265,15 +270,35 @@ blockDownloadLoop cw = do coo <- genCookie (p,h) let key = DownloadSessionKey (p, coo) let chusz = defChunkSize + dnwld <- newBlockDownload h let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral s) - $ newBlockDownload h + . set sBlockSize (fromIntegral thisBkSize) + $ dnwld update @e new key id subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do processBlock q h + -- liftIO $ async $ do + -- void $ race (pause defBlockWaitMax) $ withPeerM env $ fix \next -> do + -- pause defBlockWaitSleep + -- wl <- find key (view sBlockWrittenL) + + -- let w = sum (fromMaybe [] wl) + + -- debug $ "WTF?" <+> pretty (w, thisBkSize) + + -- -- maybe1 w (pure ()) $ \z -> do + -- if fromIntegral w >= thisBkSize then do + -- debug "THE BLOCK IS ABOUT TO BE READY" + -- -- write to disk and so on + -- else do + -- pause defBlockWaitSleep + -- next + + -- FIXME: block is not downloaded, return it to the Q + request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction | anyway -> processBlock q h @@ -385,48 +410,14 @@ mkAdapter cww = do . over sBlockWritten (+ bslen) ) + wrt <- MaybeT $ find cKey (view sBlockWrittenT) - -- debug $ "gotShit" <+> pretty (B8.length bs) <+> pretty (writtenLast) <+> pretty (wr - -- debug $ "writtenLast" <+> pretty writtenLast + liftIO $ atomically $ modifyTVar wrt (+bslen) - -- теперь, что бы нас дидосить -- можно - -- после 2/3 пакетов посылать пакеты из этих двух третей - -- или вообще мусор, а мы тут будем считать хэши. - -- - -- кстати, если посылать мусор - то мы еще и на диск - -- его будем бесконечно писать. - -- - -- допустим, пришло 4 события. в обработчике каждого --- - -- мы не знаем, что там еще 3 события. - -- - -- таким образом, ни про какое событие нельзя понять, - -- что оно последнее, если у нас 4 потока. - -- - -- как же нам узнать, что пришёл последний блок и можно - -- его коммитить? - -- - -- - -- вариант N1. повесить "добивающий монитор". - -- Здесь только апдейтить счётчики, а вот монитор будет уже - -- смотреть что как и коммитить. - -- - -- Монитор может быть протухающим. - -- Как это сделать? + wrActually <- liftIO $ readTVarIO wrt - -- this is updating concurrently, - -- so collect last data from all posible threads - writtenLast <- MaybeT $ find cKey (view sBlockWritten) - maxOffLast <- MaybeT $ find cKey (view sBlockOffset) - - let mbDone = (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize - && writtenLast >= ( (mbSize * 2) `div` 3 ) - - debug $ "blkAcceptChunk" <+> pretty (p,c) - <+> pretty maxOffLast - -- <+> pretty n - -- <+> pretty (B8.length bs) - -- <+> pretty - -- debug $ "written:" <+> pretty written <+> "/" <+> pretty mbSize + let mbDone = wrActually >= mbSize + -- && (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize when mbDone $ lift do @@ -469,7 +460,7 @@ main = do let findBlk = hasBlock s -- let size = 1024*1024*1 - let size = 256*1024 + let size = 1024*1024*10 g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] bytes <- replicateM size $ uniformM g :: IO [Char] @@ -502,10 +493,10 @@ main = do our <- async $ runTestPeer p0 $ \s cw -> do let blk = hasBlock s - void $ async $ forever $ do - pause ( 1 :: Timeout 'Seconds ) - wip <- blocksInProcess cw - debug $ "blocks wip:" <+> pretty wip + -- void $ async $ forever $ do + -- pause ( 1 :: Timeout 'Seconds ) + -- wip <- blocksInProcess cw + -- debug $ "blocks wip:" <+> pretty wip runPeerM (AnyStorage s) fake p0 $ do adapter <- mkAdapter cw