From ebde013e90aca32bf080220056d594ee0e0f55e4 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Thu, 20 Dec 2018 15:28:32 -0600 Subject: [PATCH] convert readany, readexact, copy, and readuntil --- src/multipart.rs | 14 +-- src/payload.rs | 271 +++++++++++++++++++++++++++++------------------ src/ws/frame.rs | 10 +- 3 files changed, 179 insertions(+), 116 deletions(-) diff --git a/src/multipart.rs b/src/multipart.rs index 862f60ecb..20b1aec52 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -134,7 +134,7 @@ where S: Stream, { fn read_headers(payload: &mut PayloadBuffer) -> Poll { - match payload.read_until(b"\r\n\r\n")? { + match payload.read_until(b"\r\n\r\n").poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(Some(bytes)) => { @@ -167,7 +167,7 @@ where payload: &mut PayloadBuffer, boundary: &str, ) -> Poll { // TODO: need to read epilogue - match payload.readline()? { + match payload.readline().poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(Some(chunk)) => { @@ -194,7 +194,7 @@ where ) -> Poll { let mut eof = false; loop { - match payload.readline()? { + match payload.readline().poll()? { Async::Ready(Some(chunk)) => { if chunk.is_empty() { //ValueError("Could not find starting boundary %r" @@ -495,7 +495,7 @@ where if *size == 0 { Ok(Async::Ready(None)) } else { - match payload.readany() { + match payload.readany().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => Err(MultipartError::Incomplete), Ok(Async::Ready(Some(mut chunk))) => { @@ -517,13 +517,13 @@ where fn read_stream( payload: &mut PayloadBuffer, boundary: &str, ) -> Poll, MultipartError> { - match payload.read_until(b"\r")? { + match payload.read_until(b"\r").poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(Some(mut chunk)) => { if chunk.len() == 1 { payload.unprocessed(chunk); - match payload.read_exact(boundary.len() + 4)? { + match payload.read_exact(boundary.len() + 4).poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(Some(mut chunk)) => { @@ -568,7 +568,7 @@ where Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), Async::Ready(None) => { self.eof = true; - match payload.readline()? { + match payload.readline().poll()? { Async::NotReady => Async::NotReady, Async::Ready(None) => Async::Ready(None), Async::Ready(Some(line)) => { diff --git a/src/payload.rs b/src/payload.rs index 2131e3c3c..81604323e 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -284,9 +284,9 @@ impl Inner { /// Payload buffer pub struct PayloadBuffer { - len: usize, - items: VecDeque, - stream: S, + pub(crate) len: usize, + pub(crate) items: VecDeque, + pub(crate) stream: S, } impl PayloadBuffer @@ -308,7 +308,7 @@ where } #[inline] - fn poll_stream(&mut self) -> Poll { + pub(crate) fn poll_stream(&mut self) -> Poll { self.stream.poll().map(|res| match res { Async::Ready(Some(data)) => { self.len += data.len(); @@ -322,17 +322,8 @@ where /// Read first available chunk of bytes #[inline] - pub fn readany(&mut self) -> Poll, PayloadError> { - if let Some(data) = self.items.pop_front() { - self.len -= data.len(); - Ok(Async::Ready(Some(data))) - } else { - match self.poll_stream()? { - Async::Ready(true) => self.readany(), - Async::Ready(false) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } - } + pub fn readany(&mut self) -> ReadAny { + ReadAny { inner: self } } /// Check if buffer contains enough bytes @@ -367,37 +358,8 @@ where /// Read exact number of bytes #[inline] - pub fn read_exact(&mut self, size: usize) -> Poll, PayloadError> { - if size <= self.len { - self.len -= size; - let mut chunk = self.items.pop_front().unwrap(); - if size < chunk.len() { - let buf = chunk.split_to(size); - self.items.push_front(chunk); - Ok(Async::Ready(Some(buf))) - } else if size == chunk.len() { - Ok(Async::Ready(Some(chunk))) - } else { - let mut buf = BytesMut::with_capacity(size); - buf.extend_from_slice(&chunk); - - while buf.len() < size { - let mut chunk = self.items.pop_front().unwrap(); - let rem = cmp::min(size - buf.len(), chunk.len()); - buf.extend_from_slice(&chunk.split_to(rem)); - if !chunk.is_empty() { - self.items.push_front(chunk); - } - } - Ok(Async::Ready(Some(buf.freeze()))) - } - } else { - match self.poll_stream()? { - Async::Ready(true) => self.read_exact(size), - Async::Ready(false) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } - } + pub fn read_exact(&mut self, size: usize) -> ReadExact { + ReadExact { inner: self, size } } /// Remove specified amount if bytes from buffer @@ -420,42 +382,164 @@ where } /// Copy buffered data - pub fn copy(&mut self, size: usize) -> Poll, PayloadError> { - if size <= self.len { - let mut buf = BytesMut::with_capacity(size); - for chunk in &self.items { - if buf.len() < size { - let rem = cmp::min(size - buf.len(), chunk.len()); + pub fn copy(&mut self, size: usize) -> Copy { + Copy { inner: self, size } + } + + /// Read until specified ending + pub fn read_until(&mut self, line: &[u8]) -> ReadUntil { + ReadUntil { inner: self, line: line.to_vec() } + } + + /// Read bytes until new line delimiter + pub fn readline(&mut self) -> ReadUntil { + self.read_until(b"\n") + } + + /// Put unprocessed data back to the buffer + pub fn unprocessed(&mut self, data: Bytes) { + self.len += data.len(); + self.items.push_front(data); + } + + /// Get remaining data from the buffer + pub fn remaining(&mut self) -> Bytes { + self.items + .iter_mut() + .fold(BytesMut::new(), |mut b, c| { + b.extend_from_slice(c); + b + }).freeze() + } +} + +pub struct ReadAny<'a, S> { + inner: &'a mut PayloadBuffer, +} + +impl<'a, S: 'a> Stream for ReadAny<'a, S> +where + S: Stream, +{ + type Item = Bytes; + type Error = PayloadError; + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(data) = self.inner.items.pop_front() { + self.inner.len -= data.len(); + Ok(Async::Ready(Some(data))) + } else { + match self.inner.poll_stream()? { + Async::Ready(true) => self.poll(), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + } +} + +pub struct ReadExact<'a, S> { + inner: &'a mut PayloadBuffer, + size: usize, +} + +impl<'a, S: 'a> Stream for ReadExact<'a, S> +where + S: Stream, +{ + type Item = Bytes; + type Error = PayloadError; + fn poll(&mut self) -> Poll, Self::Error> { + if self.size <= self.inner.len { + self.inner.len -= self.size; + let mut chunk = self.inner.items.pop_front().unwrap(); + if self.size < chunk.len() { + let buf = chunk.split_to(self.size); + self.inner.items.push_front(chunk); + Ok(Async::Ready(Some(buf))) + } else if self.size == chunk.len() { + Ok(Async::Ready(Some(chunk))) + } else { + let mut buf = BytesMut::with_capacity(self.size); + buf.extend_from_slice(&chunk); + + while buf.len() < self.size { + let mut chunk = self.inner.items.pop_front().unwrap(); + let rem = cmp::min(self.size - buf.len(), chunk.len()); + buf.extend_from_slice(&chunk.split_to(rem)); + if !chunk.is_empty() { + self.inner.items.push_front(chunk); + } + } + Ok(Async::Ready(Some(buf.freeze()))) + } + } else { + match self.inner.poll_stream()? { + Async::Ready(true) => self.poll(), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + } +} + +pub struct Copy<'a, S> { + inner: &'a mut PayloadBuffer, + size: usize, +} + +impl<'a, S: 'a> Stream for Copy<'a, S> +where + S: Stream +{ + type Item = BytesMut; + type Error = PayloadError; + fn poll(&mut self) -> Poll, Self::Error> { + if self.size <= self.inner.len { + let mut buf = BytesMut::with_capacity(self.size); + for chunk in &self.inner.items { + if buf.len() < self.size { + let rem = cmp::min(self.size - buf.len(), chunk.len()); buf.extend_from_slice(&chunk[..rem]); } - if buf.len() == size { + if buf.len() == self.size { return Ok(Async::Ready(Some(buf))); } } } - match self.poll_stream()? { - Async::Ready(true) => self.copy(size), + match self.inner.poll_stream()? { + Async::Ready(true) => self.poll(), Async::Ready(false) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), } } +} - /// Read until specified ending - pub fn read_until(&mut self, line: &[u8]) -> Poll, PayloadError> { +pub struct ReadUntil<'a, S> { + inner: &'a mut PayloadBuffer, + line: Vec, +} + +impl<'a, S: 'a> Stream for ReadUntil<'a, S> +where + S: Stream, +{ + type Item = Bytes; + type Error = PayloadError; + fn poll(&mut self) -> Poll, Self::Error> { let mut idx = 0; let mut num = 0; let mut offset = 0; let mut found = false; let mut length = 0; - for no in 0..self.items.len() { + for no in 0..self.inner.items.len() { { - let chunk = &self.items[no]; + let chunk = &self.inner.items[no]; for (pos, ch) in chunk.iter().enumerate() { - if *ch == line[idx] { + if *ch == self.line[idx] { idx += 1; - if idx == line.len() { + if idx == self.line.len() { num = no; offset = pos + 1; length += pos + 1; @@ -475,48 +559,27 @@ where let mut buf = BytesMut::with_capacity(length); if num > 0 { for _ in 0..num { - buf.extend_from_slice(&self.items.pop_front().unwrap()); + buf.extend_from_slice(&self.inner.items.pop_front().unwrap()); } } if offset > 0 { - let mut chunk = self.items.pop_front().unwrap(); + let mut chunk = self.inner.items.pop_front().unwrap(); buf.extend_from_slice(&chunk.split_to(offset)); if !chunk.is_empty() { - self.items.push_front(chunk) + self.inner.items.push_front(chunk) } } - self.len -= length; + self.inner.len -= length; return Ok(Async::Ready(Some(buf.freeze()))); } } - match self.poll_stream()? { - Async::Ready(true) => self.read_until(line), + match self.inner.poll_stream()? { + Async::Ready(true) => self.poll(), Async::Ready(false) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), } } - - /// Read bytes until new line delimiter - pub fn readline(&mut self) -> Poll, PayloadError> { - self.read_until(b"\n") - } - - /// Put unprocessed data back to the buffer - pub fn unprocessed(&mut self, data: Bytes) { - self.len += data.len(); - self.items.push_front(data); - } - - /// Get remaining data from the buffer - pub fn remaining(&mut self) -> Bytes { - self.items - .iter_mut() - .fold(BytesMut::new(), |mut b, c| { - b.extend_from_slice(c); - b - }).freeze() - } } #[cfg(test)] @@ -550,7 +613,7 @@ mod tests { let mut payload = PayloadBuffer::new(payload); assert_eq!(payload.len, 0); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) @@ -565,16 +628,16 @@ mod tests { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); sender.feed_data(Bytes::from("data")); sender.feed_eof(); assert_eq!( Async::Ready(Some(Bytes::from("data"))), - payload.readany().ok().unwrap() + payload.readany().poll().ok().unwrap() ); assert_eq!(payload.len, 0); - assert_eq!(Async::Ready(None), payload.readany().ok().unwrap()); + assert_eq!(Async::Ready(None), payload.readany().poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) @@ -589,10 +652,10 @@ mod tests { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); sender.set_error(PayloadError::Incomplete); - payload.readany().err().unwrap(); + payload.readany().poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -611,13 +674,13 @@ mod tests { assert_eq!( Async::Ready(Some(Bytes::from("line1"))), - payload.readany().ok().unwrap() + payload.readany().poll().ok().unwrap() ); assert_eq!(payload.len, 0); assert_eq!( Async::Ready(Some(Bytes::from("line2"))), - payload.readany().ok().unwrap() + payload.readany().poll().ok().unwrap() ); assert_eq!(payload.len, 0); @@ -634,25 +697,25 @@ mod tests { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap()); + assert_eq!(Async::NotReady, payload.read_exact(2).poll().ok().unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); assert_eq!( Async::Ready(Some(Bytes::from_static(b"li"))), - payload.read_exact(2).ok().unwrap() + payload.read_exact(2).poll().ok().unwrap() ); assert_eq!(payload.len, 3); assert_eq!( Async::Ready(Some(Bytes::from_static(b"ne1l"))), - payload.read_exact(4).ok().unwrap() + payload.read_exact(4).poll().ok().unwrap() ); assert_eq!(payload.len, 4); sender.set_error(PayloadError::Incomplete); - payload.read_exact(10).err().unwrap(); + payload.read_exact(10).poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) @@ -667,25 +730,25 @@ mod tests { let (mut sender, payload) = Payload::new(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap()); + assert_eq!(Async::NotReady, payload.read_until(b"ne").poll().ok().unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); assert_eq!( Async::Ready(Some(Bytes::from("line"))), - payload.read_until(b"ne").ok().unwrap() + payload.read_until(b"ne").poll().ok().unwrap() ); assert_eq!(payload.len, 1); assert_eq!( Async::Ready(Some(Bytes::from("1line2"))), - payload.read_until(b"2").ok().unwrap() + payload.read_until(b"2").poll().ok().unwrap() ); assert_eq!(payload.len, 0); sender.set_error(PayloadError::Incomplete); - payload.read_until(b"b").err().unwrap(); + payload.read_until(b"b").poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) diff --git a/src/ws/frame.rs b/src/ws/frame.rs index 5e4fd8290..228830453 100644 --- a/src/ws/frame.rs +++ b/src/ws/frame.rs @@ -54,7 +54,7 @@ impl Frame { S: Stream, { let mut idx = 2; - let buf = match pl.copy(2)? { + let buf = match pl.copy(2).poll()? { Async::Ready(Some(buf)) => buf, Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => return Ok(Async::NotReady), @@ -80,7 +80,7 @@ impl Frame { let len = second & 0x7F; let length = if len == 126 { - let buf = match pl.copy(4)? { + let buf = match pl.copy(4).poll()? { Async::Ready(Some(buf)) => buf, Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => return Ok(Async::NotReady), @@ -89,7 +89,7 @@ impl Frame { idx += 2; len } else if len == 127 { - let buf = match pl.copy(10)? { + let buf = match pl.copy(10).poll()? { Async::Ready(Some(buf)) => buf, Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => return Ok(Async::NotReady), @@ -110,7 +110,7 @@ impl Frame { } let mask = if server { - let buf = match pl.copy(idx + 4)? { + let buf = match pl.copy(idx + 4).poll()? { Async::Ready(Some(buf)) => buf, Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => return Ok(Async::NotReady), @@ -241,7 +241,7 @@ impl Frame { }))); } - let data = match pl.read_exact(length)? { + let data = match pl.read_exact(length).poll()? { Async::Ready(Some(buf)) => buf, Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => panic!(),