convert readany, readexact, copy, and readuntil

This commit is contained in:
Michael Zhang 2018-12-20 15:28:32 -06:00
parent 1a940d4c18
commit ebde013e90
No known key found for this signature in database
GPG Key ID: A1B65B603268116B
3 changed files with 179 additions and 116 deletions

View File

@ -134,7 +134,7 @@ where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Bytes, Error = PayloadError>,
{ {
fn read_headers(payload: &mut PayloadBuffer<S>) -> Poll<HeaderMap, MultipartError> { fn read_headers(payload: &mut PayloadBuffer<S>) -> Poll<HeaderMap, MultipartError> {
match payload.read_until(b"\r\n\r\n")? { match payload.read_until(b"\r\n\r\n").poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(bytes)) => { Async::Ready(Some(bytes)) => {
@ -167,7 +167,7 @@ where
payload: &mut PayloadBuffer<S>, boundary: &str, payload: &mut PayloadBuffer<S>, boundary: &str,
) -> Poll<bool, MultipartError> { ) -> Poll<bool, MultipartError> {
// TODO: need to read epilogue // TODO: need to read epilogue
match payload.readline()? { match payload.readline().poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(chunk)) => { Async::Ready(Some(chunk)) => {
@ -194,7 +194,7 @@ where
) -> Poll<bool, MultipartError> { ) -> Poll<bool, MultipartError> {
let mut eof = false; let mut eof = false;
loop { loop {
match payload.readline()? { match payload.readline().poll()? {
Async::Ready(Some(chunk)) => { Async::Ready(Some(chunk)) => {
if chunk.is_empty() { if chunk.is_empty() {
//ValueError("Could not find starting boundary %r" //ValueError("Could not find starting boundary %r"
@ -495,7 +495,7 @@ where
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.readany() { match payload.readany().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Err(MultipartError::Incomplete), Ok(Async::Ready(None)) => Err(MultipartError::Incomplete),
Ok(Async::Ready(Some(mut chunk))) => { Ok(Async::Ready(Some(mut chunk))) => {
@ -517,13 +517,13 @@ where
fn read_stream( fn read_stream(
payload: &mut PayloadBuffer<S>, boundary: &str, payload: &mut PayloadBuffer<S>, boundary: &str,
) -> Poll<Option<Bytes>, MultipartError> { ) -> Poll<Option<Bytes>, MultipartError> {
match payload.read_until(b"\r")? { match payload.read_until(b"\r").poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => { Async::Ready(Some(mut chunk)) => {
if chunk.len() == 1 { if chunk.len() == 1 {
payload.unprocessed(chunk); payload.unprocessed(chunk);
match payload.read_exact(boundary.len() + 4)? { match payload.read_exact(boundary.len() + 4).poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => { Async::Ready(Some(mut chunk)) => {
@ -568,7 +568,7 @@ where
Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)),
Async::Ready(None) => { Async::Ready(None) => {
self.eof = true; self.eof = true;
match payload.readline()? { match payload.readline().poll()? {
Async::NotReady => Async::NotReady, Async::NotReady => Async::NotReady,
Async::Ready(None) => Async::Ready(None), Async::Ready(None) => Async::Ready(None),
Async::Ready(Some(line)) => { Async::Ready(Some(line)) => {

View File

@ -284,9 +284,9 @@ impl Inner {
/// Payload buffer /// Payload buffer
pub struct PayloadBuffer<S> { pub struct PayloadBuffer<S> {
len: usize, pub(crate) len: usize,
items: VecDeque<Bytes>, pub(crate) items: VecDeque<Bytes>,
stream: S, pub(crate) stream: S,
} }
impl<S> PayloadBuffer<S> impl<S> PayloadBuffer<S>
@ -308,7 +308,7 @@ where
} }
#[inline] #[inline]
fn poll_stream(&mut self) -> Poll<bool, PayloadError> { pub(crate) fn poll_stream(&mut self) -> Poll<bool, PayloadError> {
self.stream.poll().map(|res| match res { self.stream.poll().map(|res| match res {
Async::Ready(Some(data)) => { Async::Ready(Some(data)) => {
self.len += data.len(); self.len += data.len();
@ -322,17 +322,8 @@ where
/// Read first available chunk of bytes /// Read first available chunk of bytes
#[inline] #[inline]
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> { pub fn readany(&mut self) -> ReadAny<S> {
if let Some(data) = self.items.pop_front() { ReadAny { inner: self }
self.len -= data.len();
Ok(Async::Ready(Some(data)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.readany(),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
} }
/// Check if buffer contains enough bytes /// Check if buffer contains enough bytes
@ -367,37 +358,8 @@ where
/// Read exact number of bytes /// Read exact number of bytes
#[inline] #[inline]
pub fn read_exact(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> { pub fn read_exact(&mut self, size: usize) -> ReadExact<S> {
if size <= self.len { ReadExact { inner: self, size }
self.len -= size;
let mut chunk = self.items.pop_front().unwrap();
if size < chunk.len() {
let buf = chunk.split_to(size);
self.items.push_front(chunk);
Ok(Async::Ready(Some(buf)))
} else if size == chunk.len() {
Ok(Async::Ready(Some(chunk)))
} else {
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&chunk);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
}
}
Ok(Async::Ready(Some(buf.freeze())))
}
} else {
match self.poll_stream()? {
Async::Ready(true) => self.read_exact(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
} }
/// Remove specified amount if bytes from buffer /// Remove specified amount if bytes from buffer
@ -420,42 +382,164 @@ where
} }
/// Copy buffered data /// Copy buffered data
pub fn copy(&mut self, size: usize) -> Poll<Option<BytesMut>, PayloadError> { pub fn copy(&mut self, size: usize) -> Copy<S> {
if size <= self.len { Copy { inner: self, size }
let mut buf = BytesMut::with_capacity(size); }
for chunk in &self.items {
if buf.len() < size { /// Read until specified ending
let rem = cmp::min(size - buf.len(), chunk.len()); pub fn read_until(&mut self, line: &[u8]) -> ReadUntil<S> {
ReadUntil { inner: self, line: line.to_vec() }
}
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> ReadUntil<S> {
self.read_until(b"\n")
}
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data);
}
/// Get remaining data from the buffer
pub fn remaining(&mut self) -> Bytes {
self.items
.iter_mut()
.fold(BytesMut::new(), |mut b, c| {
b.extend_from_slice(c);
b
}).freeze()
}
}
pub struct ReadAny<'a, S> {
inner: &'a mut PayloadBuffer<S>,
}
impl<'a, S: 'a> Stream for ReadAny<'a, S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(data) = self.inner.items.pop_front() {
self.inner.len -= data.len();
Ok(Async::Ready(Some(data)))
} else {
match self.inner.poll_stream()? {
Async::Ready(true) => self.poll(),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
}
pub struct ReadExact<'a, S> {
inner: &'a mut PayloadBuffer<S>,
size: usize,
}
impl<'a, S: 'a> Stream for ReadExact<'a, S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.size <= self.inner.len {
self.inner.len -= self.size;
let mut chunk = self.inner.items.pop_front().unwrap();
if self.size < chunk.len() {
let buf = chunk.split_to(self.size);
self.inner.items.push_front(chunk);
Ok(Async::Ready(Some(buf)))
} else if self.size == chunk.len() {
Ok(Async::Ready(Some(chunk)))
} else {
let mut buf = BytesMut::with_capacity(self.size);
buf.extend_from_slice(&chunk);
while buf.len() < self.size {
let mut chunk = self.inner.items.pop_front().unwrap();
let rem = cmp::min(self.size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.inner.items.push_front(chunk);
}
}
Ok(Async::Ready(Some(buf.freeze())))
}
} else {
match self.inner.poll_stream()? {
Async::Ready(true) => self.poll(),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
}
pub struct Copy<'a, S> {
inner: &'a mut PayloadBuffer<S>,
size: usize,
}
impl<'a, S: 'a> Stream for Copy<'a, S>
where
S: Stream<Item = Bytes, Error = PayloadError>
{
type Item = BytesMut;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.size <= self.inner.len {
let mut buf = BytesMut::with_capacity(self.size);
for chunk in &self.inner.items {
if buf.len() < self.size {
let rem = cmp::min(self.size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk[..rem]); buf.extend_from_slice(&chunk[..rem]);
} }
if buf.len() == size { if buf.len() == self.size {
return Ok(Async::Ready(Some(buf))); return Ok(Async::Ready(Some(buf)));
} }
} }
} }
match self.poll_stream()? { match self.inner.poll_stream()? {
Async::Ready(true) => self.copy(size), Async::Ready(true) => self.poll(),
Async::Ready(false) => Ok(Async::Ready(None)), Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
} }
} }
}
/// Read until specified ending pub struct ReadUntil<'a, S> {
pub fn read_until(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> { inner: &'a mut PayloadBuffer<S>,
line: Vec<u8>,
}
impl<'a, S: 'a> Stream for ReadUntil<'a, S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut idx = 0; let mut idx = 0;
let mut num = 0; let mut num = 0;
let mut offset = 0; let mut offset = 0;
let mut found = false; let mut found = false;
let mut length = 0; let mut length = 0;
for no in 0..self.items.len() { for no in 0..self.inner.items.len() {
{ {
let chunk = &self.items[no]; let chunk = &self.inner.items[no];
for (pos, ch) in chunk.iter().enumerate() { for (pos, ch) in chunk.iter().enumerate() {
if *ch == line[idx] { if *ch == self.line[idx] {
idx += 1; idx += 1;
if idx == line.len() { if idx == self.line.len() {
num = no; num = no;
offset = pos + 1; offset = pos + 1;
length += pos + 1; length += pos + 1;
@ -475,48 +559,27 @@ where
let mut buf = BytesMut::with_capacity(length); let mut buf = BytesMut::with_capacity(length);
if num > 0 { if num > 0 {
for _ in 0..num { for _ in 0..num {
buf.extend_from_slice(&self.items.pop_front().unwrap()); buf.extend_from_slice(&self.inner.items.pop_front().unwrap());
} }
} }
if offset > 0 { if offset > 0 {
let mut chunk = self.items.pop_front().unwrap(); let mut chunk = self.inner.items.pop_front().unwrap();
buf.extend_from_slice(&chunk.split_to(offset)); buf.extend_from_slice(&chunk.split_to(offset));
if !chunk.is_empty() { if !chunk.is_empty() {
self.items.push_front(chunk) self.inner.items.push_front(chunk)
} }
} }
self.len -= length; self.inner.len -= length;
return Ok(Async::Ready(Some(buf.freeze()))); return Ok(Async::Ready(Some(buf.freeze())));
} }
} }
match self.poll_stream()? { match self.inner.poll_stream()? {
Async::Ready(true) => self.read_until(line), Async::Ready(true) => self.poll(),
Async::Ready(false) => Ok(Async::Ready(None)), Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
} }
} }
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.read_until(b"\n")
}
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data);
}
/// Get remaining data from the buffer
pub fn remaining(&mut self) -> Bytes {
self.items
.iter_mut()
.fold(BytesMut::new(), |mut b, c| {
b.extend_from_slice(c);
b
}).freeze()
}
} }
#[cfg(test)] #[cfg(test)]
@ -550,7 +613,7 @@ mod tests {
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(payload.len, 0); assert_eq!(payload.len, 0);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -565,16 +628,16 @@ mod tests {
let (mut sender, payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
sender.feed_data(Bytes::from("data")); sender.feed_data(Bytes::from("data"));
sender.feed_eof(); sender.feed_eof();
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from("data"))), Async::Ready(Some(Bytes::from("data"))),
payload.readany().ok().unwrap() payload.readany().poll().ok().unwrap()
); );
assert_eq!(payload.len, 0); assert_eq!(payload.len, 0);
assert_eq!(Async::Ready(None), payload.readany().ok().unwrap()); assert_eq!(Async::Ready(None), payload.readany().poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -589,10 +652,10 @@ mod tests {
let (mut sender, payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readany().err().unwrap(); payload.readany().poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -611,13 +674,13 @@ mod tests {
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from("line1"))), Async::Ready(Some(Bytes::from("line1"))),
payload.readany().ok().unwrap() payload.readany().poll().ok().unwrap()
); );
assert_eq!(payload.len, 0); assert_eq!(payload.len, 0);
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from("line2"))), Async::Ready(Some(Bytes::from("line2"))),
payload.readany().ok().unwrap() payload.readany().poll().ok().unwrap()
); );
assert_eq!(payload.len, 0); assert_eq!(payload.len, 0);
@ -634,25 +697,25 @@ mod tests {
let (mut sender, payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap()); assert_eq!(Async::NotReady, payload.read_exact(2).poll().ok().unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from_static(b"li"))), Async::Ready(Some(Bytes::from_static(b"li"))),
payload.read_exact(2).ok().unwrap() payload.read_exact(2).poll().ok().unwrap()
); );
assert_eq!(payload.len, 3); assert_eq!(payload.len, 3);
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from_static(b"ne1l"))), Async::Ready(Some(Bytes::from_static(b"ne1l"))),
payload.read_exact(4).ok().unwrap() payload.read_exact(4).poll().ok().unwrap()
); );
assert_eq!(payload.len, 4); assert_eq!(payload.len, 4);
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.read_exact(10).err().unwrap(); payload.read_exact(10).poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -667,25 +730,25 @@ mod tests {
let (mut sender, payload) = Payload::new(false); let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap()); assert_eq!(Async::NotReady, payload.read_until(b"ne").poll().ok().unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from("line"))), Async::Ready(Some(Bytes::from("line"))),
payload.read_until(b"ne").ok().unwrap() payload.read_until(b"ne").poll().ok().unwrap()
); );
assert_eq!(payload.len, 1); assert_eq!(payload.len, 1);
assert_eq!( assert_eq!(
Async::Ready(Some(Bytes::from("1line2"))), Async::Ready(Some(Bytes::from("1line2"))),
payload.read_until(b"2").ok().unwrap() payload.read_until(b"2").poll().ok().unwrap()
); );
assert_eq!(payload.len, 0); assert_eq!(payload.len, 0);
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.read_until(b"b").err().unwrap(); payload.read_until(b"b").poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)

View File

@ -54,7 +54,7 @@ impl Frame {
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Bytes, Error = PayloadError>,
{ {
let mut idx = 2; let mut idx = 2;
let buf = match pl.copy(2)? { let buf = match pl.copy(2).poll()? {
Async::Ready(Some(buf)) => buf, Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
@ -80,7 +80,7 @@ impl Frame {
let len = second & 0x7F; let len = second & 0x7F;
let length = if len == 126 { let length = if len == 126 {
let buf = match pl.copy(4)? { let buf = match pl.copy(4).poll()? {
Async::Ready(Some(buf)) => buf, Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
@ -89,7 +89,7 @@ impl Frame {
idx += 2; idx += 2;
len len
} else if len == 127 { } else if len == 127 {
let buf = match pl.copy(10)? { let buf = match pl.copy(10).poll()? {
Async::Ready(Some(buf)) => buf, Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
@ -110,7 +110,7 @@ impl Frame {
} }
let mask = if server { let mask = if server {
let buf = match pl.copy(idx + 4)? { let buf = match pl.copy(idx + 4).poll()? {
Async::Ready(Some(buf)) => buf, Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
@ -241,7 +241,7 @@ impl Frame {
}))); })));
} }
let data = match pl.read_exact(length)? { let data = match pl.read_exact(length).poll()? {
Async::Ready(Some(buf)) => buf, Async::Ready(Some(buf)) => buf,
Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => panic!(), Async::NotReady => panic!(),