From 2c0ef6b5fef27bbd422302c51360829b0b64ed46 Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Thu, 8 Nov 2018 16:24:45 +0100 Subject: [PATCH] Add support for sending Websocket continuation frames. --- src/ws/client.rs | 101 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 16 deletions(-) diff --git a/src/ws/client.rs b/src/ws/client.rs index c79451c92..66df7b38a 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -275,19 +275,19 @@ impl Client { enum ContinuationOpCode { Binary, - Text + Text, } struct Continuation { opcode: ContinuationOpCode, buffer: Vec, -} +} struct Inner { tx: UnboundedSender, rx: PayloadBuffer>, closed: bool, - continuation: Option, + continuation: Option, } /// Future that implementes client websocket handshake process. @@ -454,7 +454,10 @@ impl Future for ClientHandshake { max_size: self.max_size, no_masking: self.no_masking, }, - ClientWriter { inner }, + ClientWriter { + inner, + continuation: ClientWriterContinuation::Inactive, + }, ))) } } @@ -495,7 +498,9 @@ impl Stream for ClientReader { let inner = &mut *inner; match inner.continuation { Some(ref mut continuation) => { - continuation.buffer.append(&mut Vec::from(payload.as_ref())); + continuation + .buffer + .append(&mut Vec::from(payload.as_ref())); Ok(Async::NotReady) } None => { @@ -505,19 +510,22 @@ impl Stream for ClientReader { } } else { match inner.continuation.take() { - Some(Continuation {opcode, mut buffer}) => { + Some(Continuation { opcode, mut buffer }) => { buffer.append(&mut Vec::from(payload.as_ref())); match opcode { - ContinuationOpCode::Binary => - Ok(Async::Ready(Some(Message::Binary(Binary::from(buffer))))), + ContinuationOpCode::Binary => Ok(Async::Ready( + Some(Message::Binary(Binary::from(buffer))), + )), ContinuationOpCode::Text => { match String::from_utf8(buffer) { - Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), + Ok(s) => Ok(Async::Ready(Some( + Message::Text(s), + ))), Err(_) => { inner.closed = true; Err(ProtocolError::BadEncoding) } - } + } } } } @@ -549,13 +557,13 @@ impl Stream for ClientReader { } else { inner.continuation = Some(Continuation { opcode: ContinuationOpCode::Binary, - buffer: Vec::from(payload.as_ref()) + buffer: Vec::from(payload.as_ref()), }); Ok(Async::NotReady) } - } + } OpCode::Text => { - if finished { + if finished { let tmp = Vec::from(payload.as_ref()); match String::from_utf8(tmp) { Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), @@ -566,10 +574,10 @@ impl Stream for ClientReader { } } else { inner.continuation = Some(Continuation { - opcode: ContinuationOpCode::Text, - buffer: Vec::from(payload.as_ref()) + opcode: ContinuationOpCode::Text, + buffer: Vec::from(payload.as_ref()), }); - Ok(Async::NotReady) + Ok(Async::NotReady) } } } @@ -584,9 +592,17 @@ impl Stream for ClientReader { } } +#[derive(PartialEq)] +enum ClientWriterContinuation { + Inactive, + Text, + Binary, +} + /// Websocket writer client pub struct ClientWriter { inner: Rc>, + continuation: ClientWriterContinuation, } impl ClientWriter { @@ -604,15 +620,68 @@ impl ClientWriter { /// Send text frame #[inline] pub fn text>(&mut self, text: T) { + assert!( + self.continuation == ClientWriterContinuation::Inactive, + "Cannot send text frame while continuation is active." + ); self.write(Frame::message(text.into(), OpCode::Text, true, true)); } + /// Send a part of a text frame. + /// The receiver will concatenate all parts when receiving the final + /// frame with `finished == true`. + #[inline] + pub fn text_part>(&mut self, text: T, finished: bool) { + match self.continuation { + ClientWriterContinuation::Inactive => { + self.write(Frame::message(text.into(), OpCode::Text, finished, true)) + } + ClientWriterContinuation::Text => self.write(Frame::message( + text.into(), + OpCode::Continue, + finished, + true, + )), + _ => panic!("Cannot send text part while binary continuation is active."), + }; + self.continuation = if !finished { + ClientWriterContinuation::Text + } else { + ClientWriterContinuation::Inactive + }; + } + /// Send binary frame #[inline] pub fn binary>(&mut self, data: B) { + assert!( + self.continuation == ClientWriterContinuation::Inactive, + "Cannot send binary frame while continuation is active." + ); self.write(Frame::message(data, OpCode::Binary, true, true)); } + /// Send a part of a binary frame. + /// The receiver will concatenate all parts when receiving the final + /// frame with `finished == true`. + #[inline] + pub fn binary_part>(&mut self, data: B, finished: bool) { + match self.continuation { + ClientWriterContinuation::Inactive => { + self.write(Frame::message(data, OpCode::Binary, finished, true)) + } + ClientWriterContinuation::Binary => { + self.write(Frame::message(data, OpCode::Continue, finished, true)) + } + _ => panic!("Cannot send binary part while text continuation is active."), + }; + self.continuation = if !finished { + ClientWriterContinuation::Binary + } else { + ClientWriterContinuation::Inactive + }; + } + /// Send ping frame #[inline] pub fn ping(&mut self, message: &str) {