From 9579ec5cf9b699771296d7881f7c294cfb000e27 Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Thu, 8 Nov 2018 16:59:20 +0100 Subject: [PATCH] Configure continuation buffer size for Websocket client. --- src/ws/client.rs | 23 +++++++++++++++++++---- src/ws/mod.rs | 4 ++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/ws/client.rs b/src/ws/client.rs index 66df7b38a..4b90f5f63 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -127,6 +127,7 @@ pub struct Client { protocols: Option, conn: Addr, max_size: usize, + max_continuation_size: usize, no_masking: bool, } @@ -145,6 +146,7 @@ impl Client { origin: None, protocols: None, max_size: 65_536, + max_continuation_size: 1_048_576, no_masking: false, conn, }; @@ -192,6 +194,14 @@ impl Client { self } + /// Set max continuation size + /// + /// By default max size is set to 1Mb + pub fn max_continuation_size(mut self, size: usize) -> Self { + self.max_continuation_size = size; + self + } + /// Set write buffer capacity /// /// Default buffer capacity is 32kb @@ -268,7 +278,7 @@ impl Client { } // start handshake - ClientHandshake::new(request, self.max_size, self.no_masking) + ClientHandshake::new(request, self.max_size, self.max_continuation_size, self.no_masking) } } } @@ -288,6 +298,7 @@ struct Inner { rx: PayloadBuffer>, closed: bool, continuation: Option, + max_continuation_size: usize, } /// Future that implementes client websocket handshake process. @@ -300,12 +311,13 @@ pub struct ClientHandshake { key: String, error: Option, max_size: usize, + max_continuation_size: usize, no_masking: bool, } impl ClientHandshake { fn new( - mut request: ClientRequest, max_size: usize, no_masking: bool, + mut request: ClientRequest, max_size: usize, max_continuation_size: usize, no_masking: bool, ) -> ClientHandshake { // Generate a random key for the `Sec-WebSocket-Key` header. // a base64-encoded (see Section 4 of [RFC4648]) value that, @@ -326,6 +338,7 @@ impl ClientHandshake { ClientHandshake { key, max_size, + max_continuation_size, no_masking, request: Some(request.send()), tx: Some(tx), @@ -340,6 +353,7 @@ impl ClientHandshake { tx: None, error: Some(err), max_size: 0, + max_continuation_size: 0, no_masking: false, } } @@ -445,6 +459,7 @@ impl Future for ClientHandshake { rx: PayloadBuffer::new(resp.payload()), closed: false, continuation: None, + max_continuation_size: self.max_continuation_size, }; let inner = Rc::new(RefCell::new(inner)); @@ -497,13 +512,13 @@ impl Stream for ClientReader { if !finished { let inner = &mut *inner; match inner.continuation { - Some(ref mut continuation) => { + Some(ref mut continuation) if continuation.buffer.len() <= inner.max_continuation_size => { continuation .buffer .append(&mut Vec::from(payload.as_ref())); Ok(Async::NotReady) } - None => { + _ => { inner.closed = true; Err(ProtocolError::BadContinuation) } diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 95ecfdffb..9cdb4d990 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -88,8 +88,8 @@ pub enum ProtocolError { /// A payload reached size limit. #[fail(display = "A payload reached size limit.")] Overflow, - /// Bad continuation frame sequence. - #[fail(display = "Bad continuation frame sequence.")] + /// Bad continuation frame sequence or overflow of continuation buffer. + #[fail(display = "Bad continuation frame sequence or overflow of continuation buffer.")] BadContinuation, /// Bad utf-8 encoding #[fail(display = "Bad utf-8 encoding.")]