Add support for sending Websocket continuation frames.

This commit is contained in:
Sebastian Urban 2018-11-08 16:24:45 +01:00
parent 36bac9433e
commit 2c0ef6b5fe
1 changed files with 85 additions and 16 deletions

View File

@ -275,19 +275,19 @@ impl Client {
enum ContinuationOpCode { enum ContinuationOpCode {
Binary, Binary,
Text Text,
} }
struct Continuation { struct Continuation {
opcode: ContinuationOpCode, opcode: ContinuationOpCode,
buffer: Vec<u8>, buffer: Vec<u8>,
} }
struct Inner { struct Inner {
tx: UnboundedSender<Bytes>, tx: UnboundedSender<Bytes>,
rx: PayloadBuffer<Box<Pipeline>>, rx: PayloadBuffer<Box<Pipeline>>,
closed: bool, closed: bool,
continuation: Option<Continuation>, continuation: Option<Continuation>,
} }
/// Future that implementes client websocket handshake process. /// Future that implementes client websocket handshake process.
@ -454,7 +454,10 @@ impl Future for ClientHandshake {
max_size: self.max_size, max_size: self.max_size,
no_masking: self.no_masking, no_masking: self.no_masking,
}, },
ClientWriter { inner }, ClientWriter {
inner,
continuation: ClientWriterContinuation::Inactive,
},
))) )))
} }
} }
@ -495,7 +498,9 @@ impl Stream for ClientReader {
let inner = &mut *inner; let inner = &mut *inner;
match inner.continuation { match inner.continuation {
Some(ref mut continuation) => { Some(ref mut continuation) => {
continuation.buffer.append(&mut Vec::from(payload.as_ref())); continuation
.buffer
.append(&mut Vec::from(payload.as_ref()));
Ok(Async::NotReady) Ok(Async::NotReady)
} }
None => { None => {
@ -505,19 +510,22 @@ impl Stream for ClientReader {
} }
} else { } else {
match inner.continuation.take() { match inner.continuation.take() {
Some(Continuation {opcode, mut buffer}) => { Some(Continuation { opcode, mut buffer }) => {
buffer.append(&mut Vec::from(payload.as_ref())); buffer.append(&mut Vec::from(payload.as_ref()));
match opcode { match opcode {
ContinuationOpCode::Binary => ContinuationOpCode::Binary => Ok(Async::Ready(
Ok(Async::Ready(Some(Message::Binary(Binary::from(buffer))))), Some(Message::Binary(Binary::from(buffer))),
)),
ContinuationOpCode::Text => { ContinuationOpCode::Text => {
match String::from_utf8(buffer) { match String::from_utf8(buffer) {
Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), Ok(s) => Ok(Async::Ready(Some(
Message::Text(s),
))),
Err(_) => { Err(_) => {
inner.closed = true; inner.closed = true;
Err(ProtocolError::BadEncoding) Err(ProtocolError::BadEncoding)
} }
} }
} }
} }
} }
@ -549,13 +557,13 @@ impl Stream for ClientReader {
} else { } else {
inner.continuation = Some(Continuation { inner.continuation = Some(Continuation {
opcode: ContinuationOpCode::Binary, opcode: ContinuationOpCode::Binary,
buffer: Vec::from(payload.as_ref()) buffer: Vec::from(payload.as_ref()),
}); });
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
OpCode::Text => { OpCode::Text => {
if finished { if finished {
let tmp = Vec::from(payload.as_ref()); let tmp = Vec::from(payload.as_ref());
match String::from_utf8(tmp) { match String::from_utf8(tmp) {
Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))),
@ -566,10 +574,10 @@ impl Stream for ClientReader {
} }
} else { } else {
inner.continuation = Some(Continuation { inner.continuation = Some(Continuation {
opcode: ContinuationOpCode::Text, opcode: ContinuationOpCode::Text,
buffer: Vec::from(payload.as_ref()) buffer: Vec::from(payload.as_ref()),
}); });
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
} }
@ -584,9 +592,17 @@ impl Stream for ClientReader {
} }
} }
#[derive(PartialEq)]
enum ClientWriterContinuation {
Inactive,
Text,
Binary,
}
/// Websocket writer client /// Websocket writer client
pub struct ClientWriter { pub struct ClientWriter {
inner: Rc<RefCell<Inner>>, inner: Rc<RefCell<Inner>>,
continuation: ClientWriterContinuation,
} }
impl ClientWriter { impl ClientWriter {
@ -604,15 +620,68 @@ impl ClientWriter {
/// Send text frame /// Send text frame
#[inline] #[inline]
pub fn text<T: Into<Binary>>(&mut self, text: T) { pub fn text<T: Into<Binary>>(&mut self, text: T) {
assert!(
self.continuation == ClientWriterContinuation::Inactive,
"Cannot send text frame while continuation is active."
);
self.write(Frame::message(text.into(), OpCode::Text, true, true)); self.write(Frame::message(text.into(), OpCode::Text, true, true));
} }
/// Send a part of a text frame.
/// The receiver will concatenate all parts when receiving the final
/// frame with `finished == true`.
#[inline]
pub fn text_part<T: Into<Binary>>(&mut self, text: T, finished: bool) {
match self.continuation {
ClientWriterContinuation::Inactive => {
self.write(Frame::message(text.into(), OpCode::Text, finished, true))
}
ClientWriterContinuation::Text => self.write(Frame::message(
text.into(),
OpCode::Continue,
finished,
true,
)),
_ => panic!("Cannot send text part while binary continuation is active."),
};
self.continuation = if !finished {
ClientWriterContinuation::Text
} else {
ClientWriterContinuation::Inactive
};
}
/// Send binary frame /// Send binary frame
#[inline] #[inline]
pub fn binary<B: Into<Binary>>(&mut self, data: B) { pub fn binary<B: Into<Binary>>(&mut self, data: B) {
assert!(
self.continuation == ClientWriterContinuation::Inactive,
"Cannot send binary frame while continuation is active."
);
self.write(Frame::message(data, OpCode::Binary, true, true)); self.write(Frame::message(data, OpCode::Binary, true, true));
} }
/// Send a part of a binary frame.
/// The receiver will concatenate all parts when receiving the final
/// frame with `finished == true`.
#[inline]
pub fn binary_part<B: Into<Binary>>(&mut self, data: B, finished: bool) {
match self.continuation {
ClientWriterContinuation::Inactive => {
self.write(Frame::message(data, OpCode::Binary, finished, true))
}
ClientWriterContinuation::Binary => {
self.write(Frame::message(data, OpCode::Continue, finished, true))
}
_ => panic!("Cannot send binary part while text continuation is active."),
};
self.continuation = if !finished {
ClientWriterContinuation::Binary
} else {
ClientWriterContinuation::Inactive
};
}
/// Send ping frame /// Send ping frame
#[inline] #[inline]
pub fn ping(&mut self, message: &str) { pub fn ping(&mut self, message: &str) {