Configure continuation buffer size for Websocket client.

This commit is contained in:
Sebastian Urban 2018-11-08 16:59:20 +01:00
parent 48b8ea6fa3
commit 9579ec5cf9
2 changed files with 21 additions and 6 deletions

View File

@ -127,6 +127,7 @@ pub struct Client {
protocols: Option<String>, protocols: Option<String>,
conn: Addr<ClientConnector>, conn: Addr<ClientConnector>,
max_size: usize, max_size: usize,
max_continuation_size: usize,
no_masking: bool, no_masking: bool,
} }
@ -145,6 +146,7 @@ impl Client {
origin: None, origin: None,
protocols: None, protocols: None,
max_size: 65_536, max_size: 65_536,
max_continuation_size: 1_048_576,
no_masking: false, no_masking: false,
conn, conn,
}; };
@ -192,6 +194,14 @@ impl Client {
self self
} }
/// Set max continuation size
///
/// By default max size is set to 1Mb
pub fn max_continuation_size(mut self, size: usize) -> Self {
self.max_continuation_size = size;
self
}
/// Set write buffer capacity /// Set write buffer capacity
/// ///
/// Default buffer capacity is 32kb /// Default buffer capacity is 32kb
@ -268,7 +278,7 @@ impl Client {
} }
// start handshake // start handshake
ClientHandshake::new(request, self.max_size, self.no_masking) ClientHandshake::new(request, self.max_size, self.max_continuation_size, self.no_masking)
} }
} }
} }
@ -288,6 +298,7 @@ struct Inner {
rx: PayloadBuffer<Box<Pipeline>>, rx: PayloadBuffer<Box<Pipeline>>,
closed: bool, closed: bool,
continuation: Option<Continuation>, continuation: Option<Continuation>,
max_continuation_size: usize,
} }
/// Future that implementes client websocket handshake process. /// Future that implementes client websocket handshake process.
@ -300,12 +311,13 @@ pub struct ClientHandshake {
key: String, key: String,
error: Option<ClientError>, error: Option<ClientError>,
max_size: usize, max_size: usize,
max_continuation_size: usize,
no_masking: bool, no_masking: bool,
} }
impl ClientHandshake { impl ClientHandshake {
fn new( fn new(
mut request: ClientRequest, max_size: usize, no_masking: bool, mut request: ClientRequest, max_size: usize, max_continuation_size: usize, no_masking: bool,
) -> ClientHandshake { ) -> ClientHandshake {
// Generate a random key for the `Sec-WebSocket-Key` header. // Generate a random key for the `Sec-WebSocket-Key` header.
// a base64-encoded (see Section 4 of [RFC4648]) value that, // a base64-encoded (see Section 4 of [RFC4648]) value that,
@ -326,6 +338,7 @@ impl ClientHandshake {
ClientHandshake { ClientHandshake {
key, key,
max_size, max_size,
max_continuation_size,
no_masking, no_masking,
request: Some(request.send()), request: Some(request.send()),
tx: Some(tx), tx: Some(tx),
@ -340,6 +353,7 @@ impl ClientHandshake {
tx: None, tx: None,
error: Some(err), error: Some(err),
max_size: 0, max_size: 0,
max_continuation_size: 0,
no_masking: false, no_masking: false,
} }
} }
@ -445,6 +459,7 @@ impl Future for ClientHandshake {
rx: PayloadBuffer::new(resp.payload()), rx: PayloadBuffer::new(resp.payload()),
closed: false, closed: false,
continuation: None, continuation: None,
max_continuation_size: self.max_continuation_size,
}; };
let inner = Rc::new(RefCell::new(inner)); let inner = Rc::new(RefCell::new(inner));
@ -497,13 +512,13 @@ impl Stream for ClientReader {
if !finished { if !finished {
let inner = &mut *inner; let inner = &mut *inner;
match inner.continuation { match inner.continuation {
Some(ref mut continuation) => { Some(ref mut continuation) if continuation.buffer.len() <= inner.max_continuation_size => {
continuation continuation
.buffer .buffer
.append(&mut Vec::from(payload.as_ref())); .append(&mut Vec::from(payload.as_ref()));
Ok(Async::NotReady) Ok(Async::NotReady)
} }
None => { _ => {
inner.closed = true; inner.closed = true;
Err(ProtocolError::BadContinuation) Err(ProtocolError::BadContinuation)
} }

View File

@ -88,8 +88,8 @@ pub enum ProtocolError {
/// A payload reached size limit. /// A payload reached size limit.
#[fail(display = "A payload reached size limit.")] #[fail(display = "A payload reached size limit.")]
Overflow, Overflow,
/// Bad continuation frame sequence. /// Bad continuation frame sequence or overflow of continuation buffer.
#[fail(display = "Bad continuation frame sequence.")] #[fail(display = "Bad continuation frame sequence or overflow of continuation buffer.")]
BadContinuation, BadContinuation,
/// Bad utf-8 encoding /// Bad utf-8 encoding
#[fail(display = "Bad utf-8 encoding.")] #[fail(display = "Bad utf-8 encoding.")]