diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 56f3edc0..b9c96c91 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -222,12 +222,12 @@ instance ( HasProtocol e p liftIO $ atomically $ modifyTVar' ev (HashMap.insertWith (<>) sk [dyn]) -- FIXME: add a sweeping routine or else everything will be fucked! addSweeper (expiresIn (Proxy @(EventKey e p))) sk $ do - liftIO $ print $ "sweep smth with key" <+> pretty (hash sk) + -- liftIO $ print $ "sweep smth with key" <+> pretty (hash sk) liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk) addSweeper :: forall e . Maybe (Timeout 'Seconds) -> SKey -> PeerM e IO () -> PeerM e IO () addSweeper t k sweeper = do - liftIO $ print $ "adding sweeper for key" <+> pretty (hash k) + -- liftIO $ print $ "adding sweeper for key" <+> pretty (hash k) ex <- asks (view envExpireTimes) sw <- asks (view envSweepers) liftIO $ Cache.insert' ex (toTimeSpec <$> t) k () @@ -238,6 +238,8 @@ sweep = do ex <- asks (view envExpireTimes) sw <- asks (view envSweepers) + liftIO $ print "sweep" + liftIO $ Cache.purgeExpired ex toSweep <- HashMap.toList <$> liftIO (readTVarIO sw) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index c88ff895..fca13857 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -30,13 +30,13 @@ defProtoPipelineSize :: Int defProtoPipelineSize = 65536*4 defCookieTimeout :: TimeSpec -defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) +defCookieTimeout = toTimeSpec ( 1 :: Timeout 'Minutes) defBlockInfoTimeout :: TimeSpec -defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) +defBlockInfoTimeout = toTimeSpec ( 1 :: Timeout 'Minutes) defSweepTimeout :: Timeout 'Seconds -defSweepTimeout = 600 -- FIXME: only for debug! +defSweepTimeout = 5 -- FIXME: only for debug! diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 4c931bc2..5e97b591 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -178,23 +178,31 @@ handleBlockInfo (p, h, sz') = do let bsz = fromIntegral sz update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) +data DownloadTask e = DownloadTask (Hash HbSync) (Maybe (Peer e, Integer)) + blockDownloadLoop :: forall e m . ( m ~ PeerM e IO - , MonadIO m - , Request e (BlockInfo e) m - , Request e (BlockChunks e) m - , EventListener e (BlockInfo e) m - , EventListener e (BlockChunks e) m - , EventListener e (BlockAnnounce e) m - -- , EventEmitter e (BlockChunks e) m - -- , EventEmitter e (BlockInfo e) m - , Sessions e (BlockInfo e) m - , Sessions e (BlockChunks e) m - , HasStorage m - , Num (Peer e) - , Pretty (Peer e) - -- , Key HbSync ~ Hash HbSync - ) => m () -blockDownloadLoop = do + -- , e ~ Fake + , Serialise (Encoded e) + , MonadIO m + , Request e (BlockInfo e) m + , Request e (BlockAnnounce e) m + , HasProtocol e (BlockInfo e) + , HasProtocol e (BlockAnnounce e) + , EventListener e (BlockInfo e) m + , EventListener e (BlockChunks e) m + , EventListener e (BlockAnnounce e) m + -- , EventEmitter e (BlockChunks e) m + -- , EventEmitter e (BlockInfo e) m + , Sessions e (BlockInfo e) m + , Sessions e (BlockChunks e) m + , HasStorage m + , Num (Peer e) + , Pretty (Peer e) + , Block ByteString ~ ByteString + -- , Key HbSync ~ Hash HbSync + ) + => ChunkWriter HbSync IO -> m () +blockDownloadLoop cw = do stor <- getStorage @@ -207,7 +215,7 @@ blockDownloadLoop = do -- ] blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ - for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b + for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask b Nothing) subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do let h = view biHash ann @@ -217,27 +225,30 @@ blockDownloadLoop = do <+> pretty h <+> pretty (view biSize ann) - initDownload False blq p h s -- FIXME: don't trust everybody + liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask h (Just (p,s))) fix \next -> do - h <- liftIO $ atomically $ Q.readTBQueue blq + job <- liftIO $ atomically $ Q.readTBQueue blq + wip <- liftIO $ blocksInProcess cw - here <- liftIO $ hasBlock stor h <&> isJust - - if not here then do - - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do - initDownload True blq p hx s - - peers <- getPeerLocator @e >>= knownPeers @e - - for_ peers $ \p -> do - debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p - request p (GetBlockSize @e h) + debug $ "WIP:" <+> pretty wip + if wip > 10 then do + pause ( 1 :: Timeout 'Seconds ) else do - processBlock blq h + case job of + DownloadTask hx (Just (p,s)) -> do + initDownload False blq p hx s + + DownloadTask h Nothing -> do + peers <- getPeerLocator @e >>= knownPeers @e + + for_ peers $ \peer -> do + subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do + liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s))) + + request @e peer (GetBlockSize @e h) next @@ -261,7 +272,7 @@ blockDownloadLoop = do subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do processBlock q h - request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction + request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction | anyway -> processBlock q h @@ -302,7 +313,7 @@ blockDownloadLoop = do else do -- if block is missed, then -- block to download q - atomically $ Q.writeTBQueue q blk + liftIO $ atomically $ Q.writeTBQueue q (DownloadTask blk Nothing) Just (Blob{}) -> do pure () @@ -404,7 +415,7 @@ main = do others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do let findBlk = hasBlock s - let size = 1024*1024*10 + let size = 1024*1024*40 g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] bytes <- replicateM size $ uniformM g :: IO [Char] @@ -444,7 +455,7 @@ main = do addPeers @Fake pl ps - as <- liftIO $ async $ withPeerM env blockDownloadLoop + as <- liftIO $ async $ withPeerM env (blockDownloadLoop cw) runProto @Fake [ makeResponse (blockSizeProto blk handleBlockInfo)