From 3cc8490c82f9f35d968d6ed01f9d5655cab674c8 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 5 Nov 2019 21:14:00 +0100 Subject: [PATCH] feat: WIP on frame continuation --- actix-http/src/ws/codec.rs | 42 ++++++++++++++++++++++++++++---------- actix-http/src/ws/frame.rs | 6 ------ 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index 2e2741724..7b5b58009 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -37,11 +37,12 @@ pub enum Frame { Close(Option), } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] /// WebSockets protocol codec pub struct Codec { max_size: usize, server: bool, + collector: Option, } impl Codec { @@ -50,6 +51,7 @@ impl Codec { Codec { max_size: 65_536, server: true, + collector: None, } } @@ -102,16 +104,22 @@ impl Decoder for Codec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match Parser::parse(src, self.server, self.max_size) { Ok(Some((finished, opcode, payload))) => { - // continuation is not supported - if !finished { - error!("No continuation 1"); - return Err(ProtocolError::NoContinuation); - } - match opcode { OpCode::Continue => { - error!("No continuation 2"); - Err(ProtocolError::NoContinuation) + match self.collector { + Some(ref mut prev) => { + if let Some(ref payload) = payload { + prev.extend_from_slice(payload); + } + } + None => self.collector = payload, + } + + if finished { + Ok(Some(Frame::Binary(self.collector.take()))) + } else { + Ok(None) + } } OpCode::Bad => { error!("Bad opcode"); @@ -140,9 +148,21 @@ impl Decoder for Codec { Ok(Some(Frame::Pong(String::new()))) } } - OpCode::Binary => Ok(Some(Frame::Binary(payload))), + OpCode::Binary => { + if finished { + Ok(Some(Frame::Binary(payload))) + } else { + self.collector = payload; + Ok(None) + } + } OpCode::Text => { - Ok(Some(Frame::Text(payload))) + if finished { + Ok(Some(Frame::Text(payload))) + } else { + self.collector = payload; + Ok(None) + } //let tmp = Vec::from(payload.as_ref()); //match String::from_utf8(tmp) { // Ok(s) => Ok(Some(Message::Text(s))), diff --git a/actix-http/src/ws/frame.rs b/actix-http/src/ws/frame.rs index c45785807..46e9f36db 100644 --- a/actix-http/src/ws/frame.rs +++ b/actix-http/src/ws/frame.rs @@ -32,10 +32,8 @@ impl Parser { // check masking let masked = second & 0x80 != 0; if !masked && server { - error!("Protocol unmasked frame"); return Err(ProtocolError::UnmaskedFrame); } else if masked && !server { - error!("Protocol masked frame"); return Err(ProtocolError::MaskedFrame); } @@ -43,7 +41,6 @@ impl Parser { let opcode = OpCode::from(first & 0x0F); if let OpCode::Bad = opcode { - error!("Protocol invalid opcode"); return Err(ProtocolError::InvalidOpcode(first & 0x0F)); } @@ -63,7 +60,6 @@ impl Parser { } let len = u64::from_be_bytes(TryFrom::try_from(&src[idx..idx + 8]).unwrap()); if len > max_size as u64 { - error!("Protocol overflow 1"); return Err(ProtocolError::Overflow); } idx += 8; @@ -74,7 +70,6 @@ impl Parser { // check for max allowed size if length > max_size { - error!("Protocol overflow 2"); return Err(ProtocolError::Overflow); } @@ -125,7 +120,6 @@ impl Parser { // control frames must have length <= 125 match opcode { OpCode::Ping | OpCode::Pong if length > 125 => { - error!("Protocol invalid length"); return Err(ProtocolError::InvalidLength(length)); } OpCode::Close if length > 125 => {