Add support for fragmented messages

This commit is contained in:
Heinz Gies 2019-09-10 15:58:50 +02:00 committed by Heinz N. Gies
parent 3e99e0cc27
commit 46ec5b77f9
2 changed files with 73 additions and 9 deletions

View File

@ -35,13 +35,17 @@ pub enum Frame {
Pong(Bytes), Pong(Bytes),
/// Close message with optional reason /// Close message with optional reason
Close(Option<CloseReason>), Close(Option<CloseReason>),
/// Active continuation
Continue,
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug, Clone)]
/// WebSockets protocol codec /// WebSockets protocol codec
pub struct Codec { pub struct Codec {
max_size: usize, max_size: usize,
server: bool, server: bool,
cont_code: Option<OpCode>,
buf: Vec<BytesMut>,
} }
impl Codec { impl Codec {
@ -50,6 +54,8 @@ impl Codec {
Codec { Codec {
max_size: 65_536, max_size: 65_536,
server: true, server: true,
buf: vec![],
cont_code: None,
} }
} }
@ -68,6 +74,27 @@ impl Codec {
self.server = false; self.server = false;
self self
} }
fn combine_payload(&mut self, payload: Option<BytesMut>) -> Option<BytesMut> {
let mut size: usize = if let Some(ref pl) = payload {
pl.len()
} else {
0
};
size += self.buf.iter().map(|pl| pl.len()).sum::<usize>();
if size > 0 {
let mut res = BytesMut::with_capacity(size);
for pl in self.buf.drain(..) {
res.extend_from_slice(&pl)
}
if let Some(pl) = payload {
res.extend_from_slice(&pl)
}
Some(res)
} else {
None
}
}
} }
impl Encoder for Codec { impl Encoder for Codec {
@ -101,12 +128,7 @@ impl Decoder for Codec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match Parser::parse(src, self.server, self.max_size) { match Parser::parse(src, self.server, self.max_size) {
Ok(Some((finished, rsv, opcode, payload))) => { Ok(Some((finished, rsv, mut opcode, mut payload))) => {
// continuation is not supported
if !finished {
return Err(ProtocolError::NoContinuation);
}
// Since this is the default codec we have no extension // Since this is the default codec we have no extension
// and should fail if rsv is set. // and should fail if rsv is set.
// In an async context this will cause a NON-STRICT // In an async context this will cause a NON-STRICT
@ -116,8 +138,49 @@ impl Decoder for Codec {
return Err(ProtocolError::RSVSet); return Err(ProtocolError::RSVSet);
} }
if !finished {
if (opcode == OpCode::Text || opcode == OpCode::Binary)
&& self.cont_code.is_none()
{
// We are starting a new continuation
self.cont_code = Some(opcode);
if let Some(pl) = payload {
self.buf.push(pl);
}
return Ok(Some(Frame::Continue));
} else if opcode == OpCode::Continue && self.cont_code.is_some() {
// We continue a continuation
if let Some(pl) = payload {
self.buf.push(pl);
};
return Ok(Some(Frame::Continue));
} else {
return Err(ProtocolError::NoContinuation);
}
} else if opcode == OpCode::Continue {
// We finish a continuation
if let Some(orig_opcode) = self.cont_code {
// reset saved opcode
self.cont_code = None;
// put cached code into current opciode
opcode = orig_opcode;
// Collect the payload
payload = self.combine_payload(payload)
} else {
// We have a continuation finish op code but nothing to continue,
// this is an error
return Err(ProtocolError::NoContinuation);
}
} else if self.cont_code.is_some()
&& (opcode == OpCode::Binary || opcode == OpCode::Text)
{
// We are finished but this isn't a continuation and
// we still have a started continuation
return Err(ProtocolError::NoContinuation);
}
match opcode { match opcode {
OpCode::Continue => Err(ProtocolError::NoContinuation), OpCode::Continue => unreachable!(),
OpCode::Bad => Err(ProtocolError::BadOpCode), OpCode::Bad => Err(ProtocolError::BadOpCode),
OpCode::Close => { OpCode::Close => {
if let Some(ref pl) = payload { if let Some(ref pl) = payload {

View File

@ -288,7 +288,7 @@ where
inner: ContextParts::new(mb.sender_producer()), inner: ContextParts::new(mb.sender_producer()),
messages: VecDeque::new(), messages: VecDeque::new(),
}; };
ctx.add_stream(WsStream::new(stream, codec)); ctx.add_stream(WsStream::new(stream, codec.clone()));
WebsocketContextFut::new(ctx, actor, mb, codec) WebsocketContextFut::new(ctx, actor, mb, codec)
} }
@ -530,6 +530,7 @@ where
Frame::Ping(s) => Message::Ping(s), Frame::Ping(s) => Message::Ping(s),
Frame::Pong(s) => Message::Pong(s), Frame::Pong(s) => Message::Pong(s),
Frame::Close(reason) => Message::Close(reason), Frame::Close(reason) => Message::Close(reason),
Frame::Continue => Message::Nop,
}; };
Ok(Async::Ready(Some(msg))) Ok(Async::Ready(Some(msg)))
} }