Add support for WebSocket fragmentation.

This adds support for fragmentation in the WebSocket server and client.
See https://tools.ietf.org/html/rfc6455#section-5.4 for specifications.
This commit is contained in:
Sebastian Urban 2018-10-03 13:39:12 +02:00
parent 62f1c90c8d
commit 36bac9433e
2 changed files with 153 additions and 28 deletions

View File

@ -273,10 +273,21 @@ impl Client {
} }
} }
enum ContinuationOpCode {
Binary,
Text
}
struct Continuation {
opcode: ContinuationOpCode,
buffer: Vec<u8>,
}
struct Inner { struct Inner {
tx: UnboundedSender<Bytes>, tx: UnboundedSender<Bytes>,
rx: PayloadBuffer<Box<Pipeline>>, rx: PayloadBuffer<Box<Pipeline>>,
closed: bool, closed: bool,
continuation: Option<Continuation>,
} }
/// Future that implementes client websocket handshake process. /// Future that implementes client websocket handshake process.
@ -433,6 +444,7 @@ impl Future for ClientHandshake {
tx: self.tx.take().unwrap(), tx: self.tx.take().unwrap(),
rx: PayloadBuffer::new(resp.payload()), rx: PayloadBuffer::new(resp.payload()),
closed: false, closed: false,
continuation: None,
}; };
let inner = Rc::new(RefCell::new(inner)); let inner = Rc::new(RefCell::new(inner));
@ -475,13 +487,46 @@ impl Stream for ClientReader {
// read // read
match Frame::parse(&mut inner.rx, no_masking, max_size) { match Frame::parse(&mut inner.rx, no_masking, max_size) {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
let (_finished, opcode, payload) = frame.unpack(); let (finished, opcode, payload) = frame.unpack();
match opcode { match opcode {
// continuation is not supported
OpCode::Continue => { OpCode::Continue => {
if !finished {
let inner = &mut *inner;
match inner.continuation {
Some(ref mut continuation) => {
continuation.buffer.append(&mut Vec::from(payload.as_ref()));
Ok(Async::NotReady)
}
None => {
inner.closed = true; inner.closed = true;
Err(ProtocolError::NoContinuation) Err(ProtocolError::BadContinuation)
}
}
} else {
match inner.continuation.take() {
Some(Continuation {opcode, mut buffer}) => {
buffer.append(&mut Vec::from(payload.as_ref()));
match opcode {
ContinuationOpCode::Binary =>
Ok(Async::Ready(Some(Message::Binary(Binary::from(buffer))))),
ContinuationOpCode::Text => {
match String::from_utf8(buffer) {
Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))),
Err(_) => {
inner.closed = true;
Err(ProtocolError::BadEncoding)
}
}
}
}
}
None => {
inner.closed = true;
Err(ProtocolError::BadContinuation)
}
}
}
} }
OpCode::Bad => { OpCode::Bad => {
inner.closed = true; inner.closed = true;
@ -498,8 +543,19 @@ impl Stream for ClientReader {
OpCode::Pong => Ok(Async::Ready(Some(Message::Pong( OpCode::Pong => Ok(Async::Ready(Some(Message::Pong(
String::from_utf8_lossy(payload.as_ref()).into(), String::from_utf8_lossy(payload.as_ref()).into(),
)))), )))),
OpCode::Binary => Ok(Async::Ready(Some(Message::Binary(payload)))), OpCode::Binary => {
if finished {
Ok(Async::Ready(Some(Message::Binary(payload))))
} else {
inner.continuation = Some(Continuation {
opcode: ContinuationOpCode::Binary,
buffer: Vec::from(payload.as_ref())
});
Ok(Async::NotReady)
}
}
OpCode::Text => { OpCode::Text => {
if finished {
let tmp = Vec::from(payload.as_ref()); let tmp = Vec::from(payload.as_ref());
match String::from_utf8(tmp) { match String::from_utf8(tmp) {
Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))),
@ -508,6 +564,13 @@ impl Stream for ClientReader {
Err(ProtocolError::BadEncoding) Err(ProtocolError::BadEncoding)
} }
} }
} else {
inner.continuation = Some(Continuation {
opcode: ContinuationOpCode::Text,
buffer: Vec::from(payload.as_ref())
});
Ok(Async::NotReady)
}
} }
} }
} }

View File

@ -88,9 +88,9 @@ pub enum ProtocolError {
/// A payload reached size limit. /// A payload reached size limit.
#[fail(display = "A payload reached size limit.")] #[fail(display = "A payload reached size limit.")]
Overflow, Overflow,
/// Continuation is not supported /// Bad continuation frame sequence.
#[fail(display = "Continuation is not supported.")] #[fail(display = "Bad continuation frame sequence.")]
NoContinuation, BadContinuation,
/// Bad utf-8 encoding /// Bad utf-8 encoding
#[fail(display = "Bad utf-8 encoding.")] #[fail(display = "Bad utf-8 encoding.")]
BadEncoding, BadEncoding,
@ -250,11 +250,22 @@ pub fn handshake<S>(
.take()) .take())
} }
enum ContinuationOpCode {
Binary,
Text
}
struct Continuation {
opcode: ContinuationOpCode,
buffer: Vec<u8>,
}
/// Maps `Payload` stream into stream of `ws::Message` items /// Maps `Payload` stream into stream of `ws::Message` items
pub struct WsStream<S> { pub struct WsStream<S> {
rx: PayloadBuffer<S>, rx: PayloadBuffer<S>,
closed: bool, closed: bool,
max_size: usize, max_size: usize,
continuation: Option<Continuation>,
} }
impl<S> WsStream<S> impl<S> WsStream<S>
@ -267,6 +278,7 @@ where
rx: PayloadBuffer::new(stream), rx: PayloadBuffer::new(stream),
closed: false, closed: false,
max_size: 65_536, max_size: 65_536,
continuation: None
} }
} }
@ -279,6 +291,8 @@ where
} }
} }
impl<S> Stream for WsStream<S> impl<S> Stream for WsStream<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Bytes, Error = PayloadError>,
@ -295,14 +309,44 @@ where
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
let (finished, opcode, payload) = frame.unpack(); let (finished, opcode, payload) = frame.unpack();
// continuation is not supported
if !finished {
self.closed = true;
return Err(ProtocolError::NoContinuation);
}
match opcode { match opcode {
OpCode::Continue => Err(ProtocolError::NoContinuation), OpCode::Continue => {
if !finished {
match self.continuation {
Some(ref mut continuation) => {
continuation.buffer.append(&mut Vec::from(payload.as_ref()));
Ok(Async::NotReady)
}
None => {
self.closed = true;
Err(ProtocolError::BadContinuation)
}
}
} else {
match self.continuation.take() {
Some(Continuation {opcode, mut buffer}) => {
buffer.append(&mut Vec::from(payload.as_ref()));
match opcode {
ContinuationOpCode::Binary =>
Ok(Async::Ready(Some(Message::Binary(Binary::from(buffer))))),
ContinuationOpCode::Text => {
match String::from_utf8(buffer) {
Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))),
Err(_) => {
self.closed = true;
Err(ProtocolError::BadEncoding)
}
}
}
}
}
None => {
self.closed = true;
Err(ProtocolError::BadContinuation)
}
}
}
}
OpCode::Bad => { OpCode::Bad => {
self.closed = true; self.closed = true;
Err(ProtocolError::BadOpCode) Err(ProtocolError::BadOpCode)
@ -318,8 +362,19 @@ where
OpCode::Pong => Ok(Async::Ready(Some(Message::Pong( OpCode::Pong => Ok(Async::Ready(Some(Message::Pong(
String::from_utf8_lossy(payload.as_ref()).into(), String::from_utf8_lossy(payload.as_ref()).into(),
)))), )))),
OpCode::Binary => Ok(Async::Ready(Some(Message::Binary(payload)))), OpCode::Binary => {
if finished {
Ok(Async::Ready(Some(Message::Binary(payload))))
} else {
self.continuation = Some(Continuation {
opcode: ContinuationOpCode::Binary,
buffer: Vec::from(payload.as_ref())
});
Ok(Async::NotReady)
}
}
OpCode::Text => { OpCode::Text => {
if finished {
let tmp = Vec::from(payload.as_ref()); let tmp = Vec::from(payload.as_ref());
match String::from_utf8(tmp) { match String::from_utf8(tmp) {
Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))), Ok(s) => Ok(Async::Ready(Some(Message::Text(s)))),
@ -328,6 +383,13 @@ where
Err(ProtocolError::BadEncoding) Err(ProtocolError::BadEncoding)
} }
} }
} else {
self.continuation = Some(Continuation {
opcode: ContinuationOpCode::Text,
buffer: Vec::from(payload.as_ref())
});
Ok(Async::NotReady)
}
} }
} }
} }