mirror of https://github.com/fafhrd91/actix-web
rewrite h1 timeout handling
This commit is contained in:
parent
78719c43a9
commit
f0faadad09
|
@ -179,9 +179,11 @@ impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
|
||||||
&self.config,
|
&self.config,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::Chunk(Some(bytes)) => {
|
Message::Chunk(Some(bytes)) => {
|
||||||
self.encoder.encode_chunk(bytes.as_ref(), dst)?;
|
self.encoder.encode_chunk(bytes.as_ref(), dst)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::Chunk(None) => {
|
Message::Chunk(None) => {
|
||||||
self.encoder.encode_eof(dst)?;
|
self.encoder.encode_eof(dst)?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,20 +38,23 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
|
||||||
|
|
||||||
bitflags! {
|
bitflags! {
|
||||||
pub struct Flags: u8 {
|
pub struct Flags: u8 {
|
||||||
/// Set when first byte is read from stream.
|
/// Set when stream is read for first time.
|
||||||
const STARTED = 0b0000_0001;
|
const STARTED = 0b0000_0001;
|
||||||
|
|
||||||
/// Set if connection is in keep-alive state.
|
/// Set when full request-response cycle has occurred.
|
||||||
const KEEP_ALIVE = 0b0000_0010;
|
const FINISHED = 0b0000_0010;
|
||||||
|
|
||||||
|
/// Set if connection is in keep-alive (inactive) state.
|
||||||
|
const KEEP_ALIVE = 0b0000_0100;
|
||||||
|
|
||||||
/// Set if in shutdown procedure.
|
/// Set if in shutdown procedure.
|
||||||
const SHUTDOWN = 0b0000_0100;
|
const SHUTDOWN = 0b0000_1000;
|
||||||
|
|
||||||
/// Set if read-half is disconnected.
|
/// Set if read-half is disconnected.
|
||||||
const READ_DISCONNECT = 0b0000_1000;
|
const READ_DISCONNECT = 0b0001_0000;
|
||||||
|
|
||||||
/// Set if write-half is disconnected.
|
/// Set if write-half is disconnected.
|
||||||
const WRITE_DISCONNECT = 0b0001_0000;
|
const WRITE_DISCONNECT = 0b0010_0000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,11 +155,6 @@ pin_project! {
|
||||||
payload: Option<PayloadSender>,
|
payload: Option<PayloadSender>,
|
||||||
messages: VecDeque<DispatcherMessage>,
|
messages: VecDeque<DispatcherMessage>,
|
||||||
|
|
||||||
// // Initialized as initial KA deadline or current time.
|
|
||||||
// // Updated when messages are read from stream and after timer is used for
|
|
||||||
// // first-request timeout.
|
|
||||||
// ka_deadline: Instant,
|
|
||||||
|
|
||||||
head_timer: TimerState,
|
head_timer: TimerState,
|
||||||
ka_timer: TimerState,
|
ka_timer: TimerState,
|
||||||
shutdown_timer: TimerState,
|
shutdown_timer: TimerState,
|
||||||
|
@ -188,11 +186,27 @@ impl TimerState {
|
||||||
matches!(self, Self::Active { .. } | Self::Inactive)
|
matches!(self, Self::Active { .. } | Self::Inactive)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set(&mut self, timer: Sleep) {
|
fn set(&mut self, timer: Sleep, line: u32) {
|
||||||
|
if !self.is_enabled() {
|
||||||
|
warn!("setting disabled timer from line {}", line);
|
||||||
|
}
|
||||||
|
|
||||||
*self = Self::Active {
|
*self = Self::Active {
|
||||||
timer: Box::pin(timer),
|
timer: Box::pin(timer),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self, line: u32) {
|
||||||
|
if !self.is_enabled() {
|
||||||
|
warn!("trying to clear a disabled timer from line {}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
if matches!(self, Self::Inactive) {
|
||||||
|
warn!("trying to clear an inactive timer from line {}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
*self = Self::Inactive;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for TimerState {
|
impl fmt::Display for TimerState {
|
||||||
|
@ -410,7 +424,7 @@ where
|
||||||
|
|
||||||
fn send_response_inner(
|
fn send_response_inner(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
message: Response<()>,
|
res: Response<()>,
|
||||||
body: &impl MessageBody,
|
body: &impl MessageBody,
|
||||||
) -> Result<BodySize, DispatchError> {
|
) -> Result<BodySize, DispatchError> {
|
||||||
log::trace!("enter InnerDispatcher::send_response_inner");
|
log::trace!("enter InnerDispatcher::send_response_inner");
|
||||||
|
@ -420,7 +434,7 @@ where
|
||||||
let size = body.size();
|
let size = body.size();
|
||||||
|
|
||||||
this.codec
|
this.codec
|
||||||
.encode(Message::Item((message, size)), this.write_buf)
|
.encode(Message::Item((res, size)), this.write_buf)
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
if let Some(mut payload) = this.payload.take() {
|
if let Some(mut payload) = this.payload.take() {
|
||||||
payload.set_error(PayloadError::Incomplete(None));
|
payload.set_error(PayloadError::Incomplete(None));
|
||||||
|
@ -429,39 +443,51 @@ where
|
||||||
DispatchError::Io(err)
|
DispatchError::Io(err)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
this.flags.set(Flags::KEEP_ALIVE, this.codec.keepalive());
|
let conn_keep_alive = this.codec.keepalive();
|
||||||
|
this.flags.set(Flags::KEEP_ALIVE, conn_keep_alive);
|
||||||
|
|
||||||
|
if !conn_keep_alive {
|
||||||
|
log::trace!("clearing keep-alive timer");
|
||||||
|
this.ka_timer.clear(line!());
|
||||||
|
}
|
||||||
|
|
||||||
Ok(size)
|
Ok(size)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_response(
|
fn send_response(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
message: Response<()>,
|
res: Response<()>,
|
||||||
body: B,
|
body: B,
|
||||||
) -> Result<(), DispatchError> {
|
) -> Result<(), DispatchError> {
|
||||||
log::trace!("enter InnerDispatcher::send_response");
|
log::trace!("enter InnerDispatcher::send_response");
|
||||||
|
|
||||||
let size = self.as_mut().send_response_inner(message, &body)?;
|
let size = self.as_mut().send_response_inner(res, &body)?;
|
||||||
let state = match size {
|
let mut this = self.project();
|
||||||
BodySize::None | BodySize::Sized(0) => State::None,
|
this.state.set(match size {
|
||||||
|
BodySize::None | BodySize::Sized(0) => {
|
||||||
|
this.flags.insert(Flags::FINISHED);
|
||||||
|
State::None
|
||||||
|
}
|
||||||
_ => State::SendPayload { body },
|
_ => State::SendPayload { body },
|
||||||
};
|
});
|
||||||
self.project().state.set(state);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_error_response(
|
fn send_error_response(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
message: Response<()>,
|
res: Response<()>,
|
||||||
body: BoxBody,
|
body: BoxBody,
|
||||||
) -> Result<(), DispatchError> {
|
) -> Result<(), DispatchError> {
|
||||||
log::trace!("enter InnerDispatcher::send_error_response");
|
log::trace!("enter InnerDispatcher::send_error_response");
|
||||||
|
|
||||||
let size = self.as_mut().send_response_inner(message, &body)?;
|
let size = self.as_mut().send_response_inner(res, &body)?;
|
||||||
|
let mut this = self.project();
|
||||||
self.project().state.set(match size {
|
this.state.set(match size {
|
||||||
BodySize::None | BodySize::Sized(0) => State::None,
|
BodySize::None | BodySize::Sized(0) => {
|
||||||
|
this.flags.insert(Flags::FINISHED);
|
||||||
|
State::None
|
||||||
|
}
|
||||||
_ => State::SendErrorPayload { body },
|
_ => State::SendErrorPayload { body },
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -480,70 +506,87 @@ where
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Result<PollResponse, DispatchError> {
|
) -> Result<PollResponse, DispatchError> {
|
||||||
log::trace!("enter InnerDispatcher::poll_response");
|
|
||||||
|
|
||||||
'res: loop {
|
'res: loop {
|
||||||
|
log::trace!("enter InnerDispatcher::poll_response loop iteration");
|
||||||
|
|
||||||
let mut this = self.as_mut().project();
|
let mut this = self.as_mut().project();
|
||||||
match this.state.as_mut().project() {
|
match this.state.as_mut().project() {
|
||||||
// no future is in InnerDispatcher state. pop next message.
|
// no future is in InnerDispatcher state; pop next message
|
||||||
StateProj::None => match this.messages.pop_front() {
|
StateProj::None => match this.messages.pop_front() {
|
||||||
// handle request message.
|
// handle request message
|
||||||
Some(DispatcherMessage::Item(req)) => {
|
Some(DispatcherMessage::Item(req)) => {
|
||||||
// Handle `EXPECT: 100-Continue` header
|
// Handle `EXPECT: 100-Continue` header
|
||||||
if req.head().expect() {
|
if req.head().expect() {
|
||||||
// set InnerDispatcher state and continue loop to poll it.
|
log::trace!(" passing request to expect handler");
|
||||||
|
// set InnerDispatcher state and continue loop to poll it
|
||||||
let fut = this.flow.expect.call(req);
|
let fut = this.flow.expect.call(req);
|
||||||
this.state.set(State::ExpectCall { fut });
|
this.state.set(State::ExpectCall { fut });
|
||||||
} else {
|
} else {
|
||||||
// the same as expect call.
|
log::trace!(" passing request to service handler");
|
||||||
|
// set InnerDispatcher state and continue loop to poll it
|
||||||
let fut = this.flow.service.call(req);
|
let fut = this.flow.service.call(req);
|
||||||
this.state.set(State::ServiceCall { fut });
|
this.state.set(State::ServiceCall { fut });
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle error message.
|
// handle error message
|
||||||
Some(DispatcherMessage::Error(res)) => {
|
Some(DispatcherMessage::Error(res)) => {
|
||||||
// send_response would update InnerDispatcher state to SendPayload or
|
log::trace!(" handling dispatcher error message");
|
||||||
// None(If response body is empty).
|
// send_response would update InnerDispatcher state to SendPayload or None
|
||||||
// continue loop to poll it.
|
// (If response body is empty)
|
||||||
|
// continue loop to poll it
|
||||||
self.as_mut().send_error_response(res, BoxBody::new(()))?;
|
self.as_mut().send_error_response(res, BoxBody::new(()))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// return with upgrade request and poll it exclusively.
|
// return with upgrade request and poll it exclusively
|
||||||
Some(DispatcherMessage::Upgrade(req)) => {
|
Some(DispatcherMessage::Upgrade(req)) => {
|
||||||
|
// return upgrade
|
||||||
return Ok(PollResponse::Upgrade(req));
|
return Ok(PollResponse::Upgrade(req));
|
||||||
}
|
}
|
||||||
|
|
||||||
// all messages are dealt with.
|
// all messages are dealt with
|
||||||
None => return Ok(PollResponse::DoNothing),
|
None => {
|
||||||
|
log::trace!("all messages handled");
|
||||||
|
return Ok(PollResponse::DoNothing);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
StateProj::ServiceCall { fut } => match fut.poll(cx) {
|
|
||||||
|
StateProj::ServiceCall { fut } => {
|
||||||
|
log::trace!(" calling request handler service");
|
||||||
|
|
||||||
|
match fut.poll(cx) {
|
||||||
// service call resolved. send response.
|
// service call resolved. send response.
|
||||||
Poll::Ready(Ok(res)) => {
|
Poll::Ready(Ok(res)) => {
|
||||||
|
log::trace!(" ok");
|
||||||
let (res, body) = res.into().replace_body(());
|
let (res, body) = res.into().replace_body(());
|
||||||
self.as_mut().send_response(res, body)?;
|
self.as_mut().send_response(res, body)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send service call error as response
|
// send service call error as response
|
||||||
Poll::Ready(Err(err)) => {
|
Poll::Ready(Err(err)) => {
|
||||||
|
log::trace!(" error");
|
||||||
let res: Response<BoxBody> = err.into();
|
let res: Response<BoxBody> = err.into();
|
||||||
let (res, body) = res.replace_body(());
|
let (res, body) = res.replace_body(());
|
||||||
self.as_mut().send_error_response(res, body)?;
|
self.as_mut().send_error_response(res, body)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// service call pending and could be waiting for more chunk messages.
|
// service call pending and could be waiting for more chunk messages
|
||||||
// (pipeline message limit and/or payload can_read limit)
|
// (pipeline message limit and/or payload can_read limit)
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
// no new message is decoded and no new payload is feed.
|
log::trace!(" pending");
|
||||||
// nothing to do except waiting for new incoming data from client.
|
// no new message is decoded and no new payload is fed
|
||||||
|
// nothing to do except waiting for new incoming data from client
|
||||||
if !self.as_mut().poll_request(cx)? {
|
if !self.as_mut().poll_request(cx)? {
|
||||||
return Ok(PollResponse::DoNothing);
|
return Ok(PollResponse::DoNothing);
|
||||||
}
|
}
|
||||||
// otherwise keep loop.
|
// else loop
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
|
||||||
|
|
||||||
StateProj::SendPayload { mut body } => {
|
StateProj::SendPayload { mut body } => {
|
||||||
|
log::trace!("sending payload");
|
||||||
|
|
||||||
// keep populate writer buffer until buffer size limit hit,
|
// keep populate writer buffer until buffer size limit hit,
|
||||||
// get blocked or finished.
|
// get blocked or finished.
|
||||||
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
|
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
|
||||||
|
@ -555,14 +598,18 @@ where
|
||||||
|
|
||||||
Poll::Ready(None) => {
|
Poll::Ready(None) => {
|
||||||
this.codec.encode(Message::Chunk(None), this.write_buf)?;
|
this.codec.encode(Message::Chunk(None), this.write_buf)?;
|
||||||
|
|
||||||
// payload stream finished.
|
// payload stream finished.
|
||||||
// set state to None and handle next message
|
// set state to None and handle next message
|
||||||
this.state.set(State::None);
|
this.state.set(State::None);
|
||||||
|
this.flags.insert(Flags::FINISHED);
|
||||||
|
|
||||||
continue 'res;
|
continue 'res;
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Ready(Some(Err(err))) => {
|
Poll::Ready(Some(Err(err))) => {
|
||||||
return Err(DispatchError::Body(err.into()))
|
this.flags.insert(Flags::FINISHED);
|
||||||
|
return Err(DispatchError::Body(err.into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending => return Ok(PollResponse::DoNothing),
|
Poll::Pending => return Ok(PollResponse::DoNothing),
|
||||||
|
@ -574,6 +621,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
StateProj::SendErrorPayload { mut body } => {
|
StateProj::SendErrorPayload { mut body } => {
|
||||||
|
log::trace!(" sending error payload");
|
||||||
|
|
||||||
// TODO: de-dupe impl with SendPayload
|
// TODO: de-dupe impl with SendPayload
|
||||||
|
|
||||||
// keep populate writer buffer until buffer size limit hit,
|
// keep populate writer buffer until buffer size limit hit,
|
||||||
|
@ -587,7 +636,7 @@ where
|
||||||
|
|
||||||
Poll::Ready(None) => {
|
Poll::Ready(None) => {
|
||||||
this.codec.encode(Message::Chunk(None), this.write_buf)?;
|
this.codec.encode(Message::Chunk(None), this.write_buf)?;
|
||||||
// payload stream finished.
|
// payload stream finished
|
||||||
// set state to None and handle next message
|
// set state to None and handle next message
|
||||||
this.state.set(State::None);
|
this.state.set(State::None);
|
||||||
continue 'res;
|
continue 'res;
|
||||||
|
@ -602,12 +651,16 @@ where
|
||||||
Poll::Pending => return Ok(PollResponse::DoNothing),
|
Poll::Pending => return Ok(PollResponse::DoNothing),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// buffer is beyond max size.
|
|
||||||
// return and try to write the whole buffer to io stream.
|
// buffer is beyond max size
|
||||||
|
// return and try to write the whole buffer to stream
|
||||||
return Ok(PollResponse::DrainWriteBuf);
|
return Ok(PollResponse::DrainWriteBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
StateProj::ExpectCall { fut } => match fut.poll(cx) {
|
StateProj::ExpectCall { fut } => {
|
||||||
|
log::trace!(" calling expect service");
|
||||||
|
|
||||||
|
match fut.poll(cx) {
|
||||||
// expect resolved. write continue to buffer and set InnerDispatcher state
|
// expect resolved. write continue to buffer and set InnerDispatcher state
|
||||||
// to service call.
|
// to service call.
|
||||||
Poll::Ready(Ok(req)) => {
|
Poll::Ready(Ok(req)) => {
|
||||||
|
@ -626,7 +679,8 @@ where
|
||||||
|
|
||||||
// expect must be solved before progress can be made.
|
// expect must be solved before progress can be made.
|
||||||
Poll::Pending => return Ok(PollResponse::DoNothing),
|
Poll::Pending => return Ok(PollResponse::DoNothing),
|
||||||
},
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -686,22 +740,22 @@ where
|
||||||
StateProj::ServiceCall { fut } => {
|
StateProj::ServiceCall { fut } => {
|
||||||
// return no matter the service call future's result.
|
// return no matter the service call future's result.
|
||||||
return match fut.poll(cx) {
|
return match fut.poll(cx) {
|
||||||
// future is resolved. send response and return a result. On success
|
// Future is resolved. Send response and return a result. On success
|
||||||
// to notify the dispatcher a new state is set and the outer loop
|
// to notify the dispatcher a new state is set and the outer loop
|
||||||
// should be continue.
|
// should be continue.
|
||||||
Poll::Ready(Ok(res)) => {
|
Poll::Ready(Ok(res)) => {
|
||||||
let (res, body) = res.into().replace_body(());
|
let (res, body) = res.into().replace_body(());
|
||||||
self.send_response(res, body)
|
self.as_mut().send_response(res, body)
|
||||||
}
|
}
|
||||||
|
|
||||||
// see the comment on ExpectCall state branch's Pending
|
// see the comment on ExpectCall state branch's Pending
|
||||||
Poll::Pending => Ok(()),
|
Poll::Pending => Ok(()),
|
||||||
|
|
||||||
// see the comment on ExpectCall state branch's Ready(Err(err))
|
// see the comment on ExpectCall state branch's Ready(Err(_))
|
||||||
Poll::Ready(Err(err)) => {
|
Poll::Ready(Err(err)) => {
|
||||||
let res: Response<BoxBody> = err.into();
|
let res: Response<BoxBody> = err.into();
|
||||||
let (res, body) = res.replace_body(());
|
let (res, body) = res.replace_body(());
|
||||||
self.send_error_response(res, body)
|
self.as_mut().send_error_response(res, body)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -730,7 +784,6 @@ where
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut updated = false;
|
|
||||||
let mut this = self.as_mut().project();
|
let mut this = self.as_mut().project();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -738,11 +791,12 @@ where
|
||||||
|
|
||||||
match this.codec.decode(this.read_buf) {
|
match this.codec.decode(this.read_buf) {
|
||||||
Ok(Some(msg)) => {
|
Ok(Some(msg)) => {
|
||||||
updated = true;
|
log::trace!("found full frame (head)");
|
||||||
// this.flags.insert(Flags::STARTED);
|
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
Message::Item(mut req) => {
|
Message::Item(mut req) => {
|
||||||
|
this.head_timer.clear(line!());
|
||||||
|
|
||||||
req.head_mut().peer_addr = *this.peer_addr;
|
req.head_mut().peer_addr = *this.peer_addr;
|
||||||
|
|
||||||
req.conn_data = this.conn_data.as_ref().map(Rc::clone);
|
req.conn_data = this.conn_data.as_ref().map(Rc::clone);
|
||||||
|
@ -811,14 +865,15 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// decode is partial and buffer is not full yet.
|
// decode is partial and buffer is not full yet
|
||||||
// break and wait for more read.
|
// break and wait for more read
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
log::trace!("found partial frame");
|
log::trace!("found partial frame");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(ParseError::Io(err)) => {
|
Err(ParseError::Io(err)) => {
|
||||||
|
log::trace!("io error: {}", &err);
|
||||||
self.as_mut().client_disconnected();
|
self.as_mut().client_disconnected();
|
||||||
this = self.as_mut().project();
|
this = self.as_mut().project();
|
||||||
*this.error = Some(DispatchError::Io(err));
|
*this.error = Some(DispatchError::Io(err));
|
||||||
|
@ -826,6 +881,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(ParseError::TooLarge) => {
|
Err(ParseError::TooLarge) => {
|
||||||
|
log::trace!("request head is too big");
|
||||||
|
|
||||||
if let Some(mut payload) = this.payload.take() {
|
if let Some(mut payload) = this.payload.take() {
|
||||||
payload.set_error(PayloadError::Overflow);
|
payload.set_error(PayloadError::Overflow);
|
||||||
}
|
}
|
||||||
|
@ -844,11 +901,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
log::trace!("parse error {}", &err);
|
||||||
|
|
||||||
if let Some(mut payload) = this.payload.take() {
|
if let Some(mut payload) = this.payload.take() {
|
||||||
payload.set_error(PayloadError::EncodingCorrupted);
|
payload.set_error(PayloadError::EncodingCorrupted);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Malformed requests should be responded with 400
|
// malformed requests should be responded with 400
|
||||||
this.messages.push_back(DispatcherMessage::Error(
|
this.messages.push_back(DispatcherMessage::Error(
|
||||||
Response::bad_request().drop_body(),
|
Response::bad_request().drop_body(),
|
||||||
));
|
));
|
||||||
|
@ -860,14 +919,8 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if updated && this.timer.is_some() {
|
// TODO: what's this boolean do now?
|
||||||
// if let Some(expire) = this.config.keep_alive_deadline() {
|
Ok(false)
|
||||||
// log::trace!("set keep-alive deadline");
|
|
||||||
// *this.ka_deadline = expire;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
Ok(updated)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_head_timer(
|
fn poll_head_timer(
|
||||||
|
@ -907,9 +960,38 @@ where
|
||||||
) -> Result<(), DispatchError> {
|
) -> Result<(), DispatchError> {
|
||||||
let this = self.as_mut().project();
|
let this = self.as_mut().project();
|
||||||
match this.ka_timer {
|
match this.ka_timer {
|
||||||
|
TimerState::Active { timer } => {
|
||||||
|
debug_assert!(
|
||||||
|
this.flags.contains(Flags::KEEP_ALIVE),
|
||||||
|
"keep-alive flag should be set when timer is active",
|
||||||
|
);
|
||||||
|
debug_assert!(
|
||||||
|
this.state.is_none(),
|
||||||
|
"dispatcher should not be in keep-alive phase if state is not none",
|
||||||
|
);
|
||||||
|
debug_assert!(
|
||||||
|
this.write_buf.is_empty(),
|
||||||
|
"dispatcher should not be in keep-alive phase if write_buf is not empty",
|
||||||
|
);
|
||||||
|
|
||||||
|
// keep-alive timer has timed out
|
||||||
|
if timer.as_mut().poll(cx).is_ready() {
|
||||||
|
// no tasks at hand
|
||||||
|
log::trace!("timer timed out; closing connection");
|
||||||
|
this.flags.insert(Flags::SHUTDOWN);
|
||||||
|
|
||||||
|
if let Some(deadline) = this.config.client_disconnect_deadline() {
|
||||||
|
// start shutdown timeout if enabled
|
||||||
|
log::trace!("starting disconnect timer");
|
||||||
|
this.shutdown_timer.set(sleep_until(deadline), line!());
|
||||||
|
} else {
|
||||||
|
// no shutdown timeout, drop socket
|
||||||
|
this.flags.insert(Flags::WRITE_DISCONNECT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
TimerState::Disabled => {}
|
TimerState::Disabled => {}
|
||||||
TimerState::Inactive => {}
|
TimerState::Inactive => {}
|
||||||
TimerState::Active { timer } => {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -926,9 +1008,10 @@ where
|
||||||
TimerState::Active { timer } => {
|
TimerState::Active { timer } => {
|
||||||
debug_assert!(
|
debug_assert!(
|
||||||
this.flags.contains(Flags::SHUTDOWN),
|
this.flags.contains(Flags::SHUTDOWN),
|
||||||
"shutdown flag should be set when timer is active"
|
"shutdown flag should be set when timer is active",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// timed-out during shutdown; drop connection
|
||||||
if timer.as_mut().poll(cx).is_ready() {
|
if timer.as_mut().poll(cx).is_ready() {
|
||||||
log::trace!("timed-out during shutdown");
|
log::trace!("timed-out during shutdown");
|
||||||
return Err(DispatchError::DisconnectTimeout);
|
return Err(DispatchError::DisconnectTimeout);
|
||||||
|
@ -950,7 +1033,7 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Poll head/keep-alive/disconnect timer.
|
/// Poll head, keep-alive, and disconnect timer.
|
||||||
fn poll_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
|
fn poll_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
|
||||||
log::trace!("enter InnerDispatcher::poll_timer");
|
log::trace!("enter InnerDispatcher::poll_timer");
|
||||||
trace_timer_states(&self.head_timer, &self.ka_timer, &self.shutdown_timer);
|
trace_timer_states(&self.head_timer, &self.ka_timer, &self.shutdown_timer);
|
||||||
|
@ -959,77 +1042,13 @@ where
|
||||||
self.as_mut().poll_ka_timer(cx)?;
|
self.as_mut().poll_ka_timer(cx)?;
|
||||||
self.as_mut().poll_shutdown_timer(cx)?;
|
self.as_mut().poll_shutdown_timer(cx)?;
|
||||||
|
|
||||||
// Some(mut timer) => {
|
|
||||||
// let deadline = timer.deadline();
|
|
||||||
|
|
||||||
// // only operate when timer is resolved
|
|
||||||
// if timer.as_mut().poll(cx).is_ready() {
|
|
||||||
// log::trace!("timer reached deadline");
|
|
||||||
|
|
||||||
// // timed-out during shutdown; drop connection
|
|
||||||
// if this.flags.contains(Flags::SHUTDOWN) {
|
|
||||||
// log::trace!("timed-out during shutdown");
|
|
||||||
// return Err(DispatchError::DisconnectTimeout);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // exceeded deadline; check for any outstanding tasks
|
|
||||||
// if timer.deadline() >= *this.ka_deadline {
|
|
||||||
// if this.state.is_none() && this.write_buf.is_empty() {
|
|
||||||
// // no tasks at hand
|
|
||||||
// if this.flags.contains(Flags::KEEP_ALIVE) {
|
|
||||||
// log::trace!("timer timed out; closing connection");
|
|
||||||
// this.flags.insert(Flags::SHUTDOWN);
|
|
||||||
|
|
||||||
// // start shutdown timeout
|
|
||||||
// if let Some(deadline) = this.config.client_disconnect_deadline()
|
|
||||||
// {
|
|
||||||
// log::trace!("starting disconnect timer");
|
|
||||||
// timer.as_mut().reset(deadline);
|
|
||||||
// let _ = timer.poll(cx);
|
|
||||||
// } else {
|
|
||||||
// // no shutdown timeout, drop socket
|
|
||||||
// this.flags.insert(Flags::WRITE_DISCONNECT);
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// // timeout on first request (slow request) return 408
|
|
||||||
|
|
||||||
// log::trace!(
|
|
||||||
// "timed out on slow request; \
|
|
||||||
// replying with 408 and closing connection"
|
|
||||||
// );
|
|
||||||
|
|
||||||
// let _ = self.as_mut().send_error_response(
|
|
||||||
// Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
|
|
||||||
// BoxBody::new(()),
|
|
||||||
// );
|
|
||||||
|
|
||||||
// this = self.project();
|
|
||||||
// this.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
|
|
||||||
// }
|
|
||||||
// } else if let Some(deadline) = this.config.keep_alive_deadline() {
|
|
||||||
// // still have unfinished tasks; try to reset and register keep-alive
|
|
||||||
// log::trace!("starting keep-alive timer");
|
|
||||||
// timer.as_mut().reset(deadline);
|
|
||||||
// let _ = timer.poll(cx);
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// // timer resolved but still have not met the expire deadline
|
|
||||||
// // reset and register for later wakeup
|
|
||||||
// log::trace!("reset timer to keep-alive deadline");
|
|
||||||
// timer.as_mut().reset(*this.ka_deadline);
|
|
||||||
// let _ = timer.poll(cx);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true when I/O stream can be disconnected after write to it.
|
/// Returns true when I/O stream can be disconnected after write to it.
|
||||||
///
|
///
|
||||||
/// It covers these conditions:
|
/// It covers these conditions:
|
||||||
/// - `std::io::ErrorKind::ConnectionReset` after partial read.
|
/// - `std::io::ErrorKind::ConnectionReset` after partial read;
|
||||||
/// - all data read done.
|
/// - all data read done.
|
||||||
#[inline(always)] // TODO: bench this inline
|
#[inline(always)] // TODO: bench this inline
|
||||||
fn read_available(
|
fn read_available(
|
||||||
|
@ -1037,10 +1056,12 @@ where
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Result<bool, DispatchError> {
|
) -> Result<bool, DispatchError> {
|
||||||
log::trace!("enter InnerDispatcher::read_available");
|
log::trace!("enter InnerDispatcher::read_available");
|
||||||
|
log::trace!(" reading from a {}", core::any::type_name::<T>());
|
||||||
|
|
||||||
let this = self.project();
|
let this = self.project();
|
||||||
|
|
||||||
if this.flags.contains(Flags::READ_DISCONNECT) {
|
if this.flags.contains(Flags::READ_DISCONNECT) {
|
||||||
|
log::trace!(" read DC");
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1062,7 +1083,7 @@ where
|
||||||
// When read_buf is beyond max buffer size the early return could be successfully
|
// When read_buf is beyond max buffer size the early return could be successfully
|
||||||
// be parsed as a new Request. This case would not generate ParseError::TooLarge and
|
// be parsed as a new Request. This case would not generate ParseError::TooLarge and
|
||||||
// at this point IO stream is not fully read to Pending and would result in
|
// at this point IO stream is not fully read to Pending and would result in
|
||||||
// dispatcher stuck until timeout (KA)
|
// dispatcher stuck until timeout (keep-alive).
|
||||||
//
|
//
|
||||||
// Note:
|
// Note:
|
||||||
// This is a perf choice to reduce branch on <Request as MessageType>::decode.
|
// This is a perf choice to reduce branch on <Request as MessageType>::decode.
|
||||||
|
@ -1096,18 +1117,31 @@ where
|
||||||
|
|
||||||
match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) {
|
match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) {
|
||||||
Poll::Ready(Ok(n)) => {
|
Poll::Ready(Ok(n)) => {
|
||||||
|
log::trace!(" read {} bytes", n);
|
||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
|
log::trace!(" signalling should_disconnect");
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
read_some = true;
|
read_some = true;
|
||||||
}
|
}
|
||||||
Poll::Pending => return Ok(false),
|
Poll::Pending => {
|
||||||
Poll::Ready(Err(err)) => {
|
log::trace!(" read pending");
|
||||||
return match err.kind() {
|
return Ok(false);
|
||||||
io::ErrorKind::WouldBlock => Ok(false),
|
|
||||||
io::ErrorKind::ConnectionReset if read_some => Ok(true),
|
|
||||||
_ => Err(DispatchError::Io(err)),
|
|
||||||
}
|
}
|
||||||
|
Poll::Ready(Err(err)) => {
|
||||||
|
log::trace!(" read err: {:?}", &err);
|
||||||
|
|
||||||
|
return match err.kind() {
|
||||||
|
// convert WouldBlock error to the same as Pending return
|
||||||
|
io::ErrorKind::WouldBlock => Ok(false),
|
||||||
|
|
||||||
|
// connection reset after partial read
|
||||||
|
io::ErrorKind::ConnectionReset if read_some => Ok(true),
|
||||||
|
|
||||||
|
_ => Err(DispatchError::Io(err)),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1147,6 +1181,7 @@ where
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
log::trace!(target: "", "");
|
||||||
log::trace!("enter Dispatcher::poll");
|
log::trace!("enter Dispatcher::poll");
|
||||||
|
|
||||||
let this = self.as_mut().project();
|
let this = self.as_mut().project();
|
||||||
|
@ -1162,13 +1197,13 @@ where
|
||||||
|
|
||||||
inner.as_mut().poll_timer(cx)?;
|
inner.as_mut().poll_timer(cx)?;
|
||||||
|
|
||||||
if inner.flags.contains(Flags::SHUTDOWN) {
|
let poll = if inner.flags.contains(Flags::SHUTDOWN) {
|
||||||
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
|
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
} else {
|
} else {
|
||||||
// flush buffer and wait on blocked
|
// flush buffer and wait on blocked
|
||||||
ready!(inner.as_mut().poll_flush(cx))?;
|
ready!(inner.as_mut().poll_flush(cx))?;
|
||||||
Pin::new(inner.project().io.as_mut().unwrap())
|
Pin::new(inner.as_mut().project().io.as_mut().unwrap())
|
||||||
.poll_shutdown(cx)
|
.poll_shutdown(cx)
|
||||||
.map_err(DispatchError::from)
|
.map_err(DispatchError::from)
|
||||||
}
|
}
|
||||||
|
@ -1186,13 +1221,14 @@ where
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.project()
|
.project()
|
||||||
.head_timer
|
.head_timer
|
||||||
.set(sleep_until(deadline));
|
.set(sleep_until(deadline), line!());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inner.as_mut().poll_request(cx)?;
|
inner.as_mut().poll_request(cx)?;
|
||||||
|
|
||||||
if should_disconnect {
|
if should_disconnect {
|
||||||
|
log::trace!("should_disconnect = true");
|
||||||
// I/O stream should to be closed
|
// I/O stream should to be closed
|
||||||
let inner = inner.as_mut().project();
|
let inner = inner.as_mut().project();
|
||||||
inner.flags.insert(Flags::READ_DISCONNECT);
|
inner.flags.insert(Flags::READ_DISCONNECT);
|
||||||
|
@ -1205,9 +1241,25 @@ where
|
||||||
// poll response to populate write buffer
|
// poll response to populate write buffer
|
||||||
// drain indicates whether write buffer should be emptied before next run
|
// drain indicates whether write buffer should be emptied before next run
|
||||||
let drain = match inner.as_mut().poll_response(cx)? {
|
let drain = match inner.as_mut().poll_response(cx)? {
|
||||||
PollResponse::DrainWriteBuf => true,
|
PollResponse::DrainWriteBuf => {
|
||||||
|
inner.flags.contains(Flags::KEEP_ALIVE);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
PollResponse::DoNothing => false,
|
PollResponse::DoNothing => {
|
||||||
|
if inner.flags.contains(Flags::KEEP_ALIVE) {
|
||||||
|
if let Some(deadline) = inner.config.keep_alive_timer() {
|
||||||
|
log::trace!("setting keep-alive timer");
|
||||||
|
inner
|
||||||
|
.as_mut()
|
||||||
|
.project()
|
||||||
|
.ka_timer
|
||||||
|
.set(deadline, line!());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
// upgrade request and goes Upgrade variant of DispatcherState.
|
// upgrade request and goes Upgrade variant of DispatcherState.
|
||||||
PollResponse::Upgrade(req) => {
|
PollResponse::Upgrade(req) => {
|
||||||
|
@ -1234,15 +1286,16 @@ where
|
||||||
|
|
||||||
// client is gone
|
// client is gone
|
||||||
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
|
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
|
||||||
|
log::trace!("client is gone; disconnecting");
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let state_is_none = inner.state.is_none();
|
|
||||||
|
|
||||||
let inner_p = inner.as_mut().project();
|
let inner_p = inner.as_mut().project();
|
||||||
|
let state_is_none = inner_p.state.is_none();
|
||||||
|
|
||||||
// read half is closed; we do not process any responses
|
// read half is closed; we do not process any responses
|
||||||
if inner_p.flags.contains(Flags::READ_DISCONNECT) && state_is_none {
|
if inner_p.flags.contains(Flags::READ_DISCONNECT) && state_is_none {
|
||||||
|
log::trace!("read half closed; start shutdown");
|
||||||
inner_p.flags.insert(Flags::SHUTDOWN);
|
inner_p.flags.insert(Flags::SHUTDOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1255,14 +1308,17 @@ where
|
||||||
return Poll::Ready(Err(err));
|
return Poll::Ready(Err(err));
|
||||||
}
|
}
|
||||||
|
|
||||||
// // disconnect if keep-alive is not enabled
|
// disconnect if keep-alive is not enabled
|
||||||
// if inner_p.flags.contains(Flags::STARTED)
|
if inner_p.flags.contains(Flags::FINISHED)
|
||||||
// && !inner_p.flags.contains(Flags::KEEP_ALIVE)
|
&& !inner_p.flags.contains(Flags::KEEP_ALIVE)
|
||||||
// {
|
{
|
||||||
// log::trace!("shutdown because keep-alive is not enabled");
|
log::trace!(
|
||||||
// inner_p.flags.insert(Flags::SHUTDOWN);
|
"start shutdown because keep-alive is disabled or opted \
|
||||||
// return self.poll(cx);
|
out for this connection"
|
||||||
// }
|
);
|
||||||
|
inner_p.flags.insert(Flags::SHUTDOWN);
|
||||||
|
return self.poll(cx);
|
||||||
|
}
|
||||||
|
|
||||||
// disconnect if shutdown
|
// disconnect if shutdown
|
||||||
if inner_p.flags.contains(Flags::SHUTDOWN) {
|
if inner_p.flags.contains(Flags::SHUTDOWN) {
|
||||||
|
@ -1271,7 +1327,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log::trace!("but after all that, PENDING; wait for more data");
|
log::trace!("dispatcher going to sleep; wait for next event");
|
||||||
|
|
||||||
trace_timer_states(
|
trace_timer_states(
|
||||||
inner_p.head_timer,
|
inner_p.head_timer,
|
||||||
|
@ -1280,7 +1336,11 @@ where
|
||||||
);
|
);
|
||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
};
|
||||||
|
|
||||||
|
log::trace!("current flags: {:?}", &inner.flags);
|
||||||
|
|
||||||
|
poll
|
||||||
}
|
}
|
||||||
|
|
||||||
DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| {
|
DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| {
|
||||||
|
|
|
@ -17,7 +17,7 @@ use crate::{
|
||||||
h1::{Codec, ExpectHandler, UpgradeHandler},
|
h1::{Codec, ExpectHandler, UpgradeHandler},
|
||||||
service::HttpFlow,
|
service::HttpFlow,
|
||||||
test::{TestBuffer, TestSeqBuffer},
|
test::{TestBuffer, TestSeqBuffer},
|
||||||
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response,
|
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
||||||
|
@ -34,7 +34,13 @@ fn stabilize_date_header(payload: &mut [u8]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
||||||
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok())))
|
status_service(StatusCode::OK)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_service(
|
||||||
|
status: StatusCode,
|
||||||
|
) -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
||||||
|
fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status))))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn echo_path_service(
|
fn echo_path_service(
|
||||||
|
@ -127,7 +133,7 @@ async fn late_request() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_basic() {
|
async fn oneshot_connection() {
|
||||||
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None);
|
let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None);
|
||||||
|
@ -179,7 +185,7 @@ async fn test_basic() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_keep_alive_timeout() {
|
async fn keep_alive_timeout() {
|
||||||
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None);
|
let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None);
|
||||||
|
@ -252,7 +258,7 @@ async fn test_keep_alive_timeout() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_keep_alive_follow_up_req() {
|
async fn keep_alive_follow_up_req() {
|
||||||
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None);
|
let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None);
|
||||||
|
@ -371,7 +377,7 @@ async fn test_keep_alive_follow_up_req() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_req_parse_err() {
|
async fn req_parse_err() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
|
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
|
||||||
|
|
||||||
|
@ -531,7 +537,7 @@ async fn pipelining_ok_then_bad() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_expect() {
|
async fn expect_handling() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let mut buf = TestSeqBuffer::empty();
|
let mut buf = TestSeqBuffer::empty();
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
||||||
|
@ -562,7 +568,6 @@ async fn test_expect() {
|
||||||
|
|
||||||
// polls: manual
|
// polls: manual
|
||||||
assert_eq!(h1.poll_count, 1);
|
assert_eq!(h1.poll_count, 1);
|
||||||
eprintln!("poll count: {}", h1.poll_count);
|
|
||||||
|
|
||||||
if let DispatcherState::Normal { ref inner } = h1.inner {
|
if let DispatcherState::Normal { ref inner } = h1.inner {
|
||||||
let io = inner.io.as_ref().unwrap();
|
let io = inner.io.as_ref().unwrap();
|
||||||
|
@ -603,7 +608,7 @@ async fn test_expect() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_eager_expect() {
|
async fn expect_eager() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let mut buf = TestSeqBuffer::empty();
|
let mut buf = TestSeqBuffer::empty();
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
||||||
|
@ -663,7 +668,7 @@ async fn test_eager_expect() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_upgrade() {
|
async fn upgrade_handling() {
|
||||||
struct TestUpgrade;
|
struct TestUpgrade;
|
||||||
|
|
||||||
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
|
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
|
||||||
|
|
|
@ -318,16 +318,17 @@ impl MessageType for RequestHeadType {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: MessageType> MessageEncoder<T> {
|
impl<T: MessageType> MessageEncoder<T> {
|
||||||
/// Encode message
|
/// Encode chunk.
|
||||||
pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result<bool> {
|
pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result<bool> {
|
||||||
self.te.encode(msg, buf)
|
self.te.encode(msg, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Encode eof
|
/// Encode EOF.
|
||||||
pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {
|
pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {
|
||||||
self.te.encode_eof(buf)
|
self.te.encode_eof(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Encode message.
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&mut self,
|
&mut self,
|
||||||
dst: &mut BytesMut,
|
dst: &mut BytesMut,
|
||||||
|
|
|
@ -28,9 +28,10 @@ pub use self::utils::SendResponse;
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Codec message
|
/// Codec message
|
||||||
pub enum Message<T> {
|
pub enum Message<T> {
|
||||||
/// Http message
|
/// HTTP message.
|
||||||
Item(T),
|
Item(T),
|
||||||
/// Payload chunk
|
|
||||||
|
/// Payload chunk.
|
||||||
Chunk(Option<Bytes>),
|
Chunk(Option<Bytes>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -208,16 +208,20 @@ mod tests {
|
||||||
};
|
};
|
||||||
|
|
||||||
use memchr::memmem;
|
use memchr::memmem;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
h1::H1Service,
|
||||||
header::{HeaderName, HeaderValue},
|
header::{HeaderName, HeaderValue},
|
||||||
Error, HttpService, Request, Response,
|
Error, Request, Response, ServiceConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn camel_case_headers() {
|
async fn camel_case_headers() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
let mut srv = actix_http_test::test_server(|| {
|
let mut srv = actix_http_test::test_server(|| {
|
||||||
HttpService::new(|req: Request| async move {
|
H1Service::with_config(ServiceConfig::default(), |req: Request| async move {
|
||||||
let mut res = Response::ok();
|
let mut res = Response::ok();
|
||||||
|
|
||||||
if req.path().contains("camel") {
|
if req.path().contains("camel") {
|
||||||
|
@ -228,16 +232,21 @@ mod tests {
|
||||||
HeaderName::from_static("foo-bar"),
|
HeaderName::from_static("foo-bar"),
|
||||||
HeaderValue::from_static("baz"),
|
HeaderValue::from_static("baz"),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok::<_, Error>(res)
|
Ok::<_, Error>(res)
|
||||||
})
|
})
|
||||||
.tcp()
|
.tcp()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = tokio::net::TcpStream::connect(srv.addr()).await.unwrap();
|
||||||
let _ = stream.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n");
|
dbg!(stream.ready(Interest::WRITABLE).await.unwrap());
|
||||||
|
let _ = stream
|
||||||
|
.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
let mut data = vec![0; 1024];
|
let mut data = vec![0; 1024];
|
||||||
let _ = stream.read(&mut data);
|
let _ = stream.read_to_end(&mut data).await.unwrap();
|
||||||
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
assert!(memmem::find(&data, b"Foo-Bar").is_some());
|
assert!(memmem::find(&data, b"Foo-Bar").is_some());
|
||||||
assert!(memmem::find(&data, b"foo-bar").is_none());
|
assert!(memmem::find(&data, b"foo-bar").is_none());
|
||||||
|
@ -247,9 +256,11 @@ mod tests {
|
||||||
assert!(memmem::find(&data, b"content-length").is_none());
|
assert!(memmem::find(&data, b"content-length").is_none());
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
let _ = stream.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n");
|
let _ = stream
|
||||||
|
.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n")
|
||||||
|
.unwrap();
|
||||||
let mut data = vec![0; 1024];
|
let mut data = vec![0; 1024];
|
||||||
let _ = stream.read(&mut data);
|
let _ = stream.read_to_end(&mut data).unwrap();
|
||||||
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
assert!(memmem::find(&data, b"Foo-Bar").is_none());
|
assert!(memmem::find(&data, b"Foo-Bar").is_none());
|
||||||
assert!(memmem::find(&data, b"foo-bar").is_some());
|
assert!(memmem::find(&data, b"foo-bar").is_some());
|
||||||
|
|
|
@ -242,7 +242,7 @@ impl io::Read for TestBuffer {
|
||||||
|
|
||||||
impl io::Write for TestBuffer {
|
impl io::Write for TestBuffer {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
RefCell::borrow_mut(&self.write_buf).extend(buf);
|
self.write_buf.borrow_mut().extend(buf);
|
||||||
Ok(buf.len())
|
Ok(buf.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,40 @@ async fn test_chunked_payload() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_slow_request() {
|
async fn test_slow_request_close() {
|
||||||
|
let mut srv = test_server(|| {
|
||||||
|
HttpService::build()
|
||||||
|
.client_timeout(200)
|
||||||
|
.keep_alive(2)
|
||||||
|
.finish(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
|
.tcp()
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
let mut data = Vec::new();
|
||||||
|
let _ = stream.read(&mut data).unwrap();
|
||||||
|
assert!(
|
||||||
|
data.is_empty(),
|
||||||
|
"connection should close without writing a response"
|
||||||
|
);
|
||||||
|
|
||||||
|
let end = Instant::now();
|
||||||
|
|
||||||
|
if end.duration_since(start) > Duration::from_secs(1) {
|
||||||
|
panic!("request took way too long to time out");
|
||||||
|
}
|
||||||
|
|
||||||
|
srv.stop().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_slow_request_408() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(200)
|
.client_timeout(200)
|
||||||
|
|
|
@ -238,7 +238,9 @@ where
|
||||||
|
|
||||||
/// Default service to be used if no matching resource could be found.
|
/// Default service to be used if no matching resource could be found.
|
||||||
///
|
///
|
||||||
/// It is possible to use services like `Resource`, `Route`.
|
/// It is possible to use services like `Route` and [`NamedFile`].
|
||||||
|
///
|
||||||
|
/// [`NamedFile`]: https://docs.rs/actix-files/latest/actix_files/struct.NamedFile.html
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// use actix_web::{web, App, HttpResponse};
|
/// use actix_web::{web, App, HttpResponse};
|
||||||
|
|
Loading…
Reference in New Issue