return 413 error on buffer overflow. fix possible stream hang

This commit is contained in:
fakeshadow 2021-02-06 16:04:51 -08:00
parent 72c60d5ce2
commit 56ef804c5f
2 changed files with 23 additions and 10 deletions

View File

@ -279,13 +279,11 @@ impl MessageType for ResponseHead {
(len, version, status, res.headers.len()) (len, version, status, res.headers.len())
} }
httparse::Status::Partial => { httparse::Status::Partial => {
return { return if src.len() >= MAX_BUFFER_SIZE {
if src.len() >= MAX_BUFFER_SIZE { error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); Err(ParseError::TooLarge)
Err(ParseError::TooLarge) } else {
} else { Ok(None)
Ok(None)
}
} }
} }
} }

View File

@ -27,7 +27,6 @@ use crate::service::HttpFlow;
use crate::OnConnectData; use crate::OnConnectData;
use super::codec::Codec; use super::codec::Codec;
use super::decoder::MAX_BUFFER_SIZE;
use super::payload::{Payload, PayloadSender, PayloadStatus}; use super::payload::{Payload, PayloadSender, PayloadStatus};
use super::{Message, MessageType}; use super::{Message, MessageType};
@ -406,7 +405,7 @@ where
}, },
StateProj::SendPayload(mut stream) => { StateProj::SendPayload(mut stream) => {
loop { loop {
if this.write_buf.len() < HW_BUFFER_SIZE { if this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
match stream.as_mut().poll_next(cx) { match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
this.codec.encode( this.codec.encode(
@ -618,6 +617,18 @@ where
*this.error = Some(DispatchError::Io(e)); *this.error = Some(DispatchError::Io(e));
break; break;
} }
Err(ParseError::TooLarge) => {
if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Overflow);
}
// Requests overflow buffer size should be responded with 413
this.messages.push_back(DispatcherMessage::Error(
Response::PayloadTooLarge().finish().drop_body(),
));
this.flags.insert(Flags::READ_DISCONNECT);
*this.error = Some(ParseError::TooLarge.into());
break;
}
Err(e) => { Err(e) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted); payload.set_error(PayloadError::EncodingCorrupted);
@ -913,7 +924,11 @@ where
} else { } else {
// If buf is full return but do not disconnect since // If buf is full return but do not disconnect since
// there is more reading to be done // there is more reading to be done
if buf.len() >= MAX_BUFFER_SIZE { if buf.len() >= super::decoder::MAX_BUFFER_SIZE {
// at this point it's not known io is still scheduled to
// be waked up. so force wake up dispatcher just in case.
// TODO: figure out the overhead.
cx.waker().wake_by_ref();
return Ok(Some(false)); return Ok(Some(false));
} }