revert dispatcher change

This commit is contained in:
fakeshadow 2021-02-06 10:41:42 -08:00
parent 24228b89d0
commit 66e3dafaa7
2 changed files with 270 additions and 319 deletions

View File

@ -14,7 +14,7 @@ use crate::header::HeaderMap;
use crate::message::{ConnectionType, ResponseHead}; use crate::message::{ConnectionType, ResponseHead};
use crate::request::Request; use crate::request::Request;
pub(crate) const MAX_BUFFER_SIZE: usize = 131_072; const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 96; const MAX_HEADERS: usize = 96;
/// Incoming message decoder /// Incoming message decoder
@ -203,15 +203,7 @@ impl MessageType for Request {
(len, method, uri, version, req.headers.len()) (len, method, uri, version, req.headers.len())
} }
httparse::Status::Partial => { httparse::Status::Partial => return Ok(None),
return if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
Err(ParseError::TooLarge)
} else {
// Return None to notify more read are needed for parsing request
Ok(None)
};
}
} }
}; };
@ -230,6 +222,9 @@ impl MessageType for Request {
PayloadLength::None => { PayloadLength::None => {
if method == Method::CONNECT { if method == Method::CONNECT {
PayloadType::Stream(PayloadDecoder::eof()) PayloadType::Stream(PayloadDecoder::eof())
} else if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else { } else {
PayloadType::None PayloadType::None
} }
@ -278,14 +273,7 @@ impl MessageType for ResponseHead {
(len, version, status, res.headers.len()) (len, version, status, res.headers.len())
} }
httparse::Status::Partial => { httparse::Status::Partial => return Ok(None),
return if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
Err(ParseError::TooLarge)
} else {
Ok(None)
}
}
} }
}; };
@ -301,6 +289,9 @@ impl MessageType for ResponseHead {
} else if status == StatusCode::SWITCHING_PROTOCOLS { } else if status == StatusCode::SWITCHING_PROTOCOLS {
// switching protocol or connect // switching protocol or connect
PayloadType::Stream(PayloadDecoder::eof()) PayloadType::Stream(PayloadDecoder::eof())
} else if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else { } else {
// for HTTP/1.0 read to eof and close connection // for HTTP/1.0 read to eof and close connection
if msg.version == Version::HTTP_10 { if msg.version == Version::HTTP_10 {

View File

@ -45,7 +45,7 @@ bitflags! {
} }
} }
#[pin_project] #[pin_project::pin_project]
/// Dispatcher for HTTP/1.1 protocol /// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X, U> pub struct Dispatcher<T, S, B, X, U>
where where
@ -139,14 +139,27 @@ where
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
matches!(self, State::None) matches!(self, State::None)
} }
}
fn is_call(&self) -> bool {
matches!(self, State::ServiceCall(_))
}
}
enum PollResponse { enum PollResponse {
Upgrade(Request), Upgrade(Request),
DoNothing, DoNothing,
DrainWriteBuf, DrainWriteBuf,
} }
impl PartialEq for PollResponse {
fn eq(&self, other: &PollResponse) -> bool {
match self {
PollResponse::DrainWriteBuf => matches!(other, PollResponse::DrainWriteBuf),
PollResponse::DoNothing => matches!(other, PollResponse::DoNothing),
_ => false,
}
}
}
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
@ -163,7 +176,7 @@ where
pub(crate) fn new( pub(crate) fn new(
stream: T, stream: T,
config: ServiceConfig, config: ServiceConfig,
flow: Rc<HttpFlow<S, X, U>>, services: Rc<HttpFlow<S, X, U>>,
on_connect_data: OnConnectData, on_connect_data: OnConnectData,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
) -> Self { ) -> Self {
@ -173,7 +186,7 @@ where
config, config,
BytesMut::with_capacity(HW_BUFFER_SIZE), BytesMut::with_capacity(HW_BUFFER_SIZE),
None, None,
flow, services,
on_connect_data, on_connect_data,
peer_addr, peer_addr,
) )
@ -186,7 +199,7 @@ where
config: ServiceConfig, config: ServiceConfig,
read_buf: BytesMut, read_buf: BytesMut,
timeout: Option<Sleep>, timeout: Option<Sleep>,
flow: Rc<HttpFlow<S, X, U>>, services: Rc<HttpFlow<S, X, U>>,
on_connect_data: OnConnectData, on_connect_data: OnConnectData,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
) -> Self { ) -> Self {
@ -216,7 +229,7 @@ where
io: Some(io), io: Some(io),
codec, codec,
read_buf, read_buf,
flow, flow: services,
on_connect_data, on_connect_data,
flags, flags,
peer_addr, peer_addr,
@ -256,14 +269,13 @@ where
} }
// if checked is set to true, delay disconnect until all tasks have finished. // if checked is set to true, delay disconnect until all tasks have finished.
fn client_disconnected(self: Pin<&mut Self>, err: impl Into<DispatchError>) { fn client_disconnected(self: Pin<&mut Self>) {
let this = self.project(); let this = self.project();
this.flags this.flags
.insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
} }
*this.error = Some(err.into());
} }
/// Flush stream /// Flush stream
@ -312,10 +324,9 @@ where
message: Response<()>, message: Response<()>,
body: ResponseBody<B>, body: ResponseBody<B>,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let size = body.size();
let mut this = self.project(); let mut this = self.project();
this.codec this.codec
.encode(Message::Item((message, size)), &mut this.write_buf) .encode(Message::Item((message, body.size())), &mut this.write_buf)
.map_err(|err| { .map_err(|err| {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
@ -324,76 +335,74 @@ where
})?; })?;
this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); this.flags.set(Flags::KEEPALIVE, this.codec.keepalive());
match size { match body.size() {
BodySize::None | BodySize::Empty => this.state.set(State::None), BodySize::None | BodySize::Empty => this.state.set(State::None),
_ => this.state.set(State::SendPayload(body)), _ => this.state.set(State::SendPayload(body)),
}; };
Ok(()) Ok(())
} }
fn send_continue(self: Pin<&mut Self>) {
self.project()
.write_buf
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
}
fn poll_response( fn poll_response(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<PollResponse, DispatchError> { ) -> Result<PollResponse, DispatchError> {
loop { loop {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
match this.state.as_mut().project() { // state is not changed on Poll::Pending.
// no future is in InnerDispatcher state. pop next message. // other variant and conditions always trigger a state change(or an error).
let state_change = match this.state.project() {
StateProj::None => match this.messages.pop_front() { StateProj::None => match this.messages.pop_front() {
// handle request message.
Some(DispatcherMessage::Item(req)) => { Some(DispatcherMessage::Item(req)) => {
// Handle `EXPECT: 100-Continue` header self.as_mut().handle_request(req, cx)?;
if req.head().expect() { true
// set InnerDispatcher state and continue loop to poll it.
let task = this.flow.expect.call(req);
this.state.set(State::ExpectCall(task));
} else {
// the same as expect call.
let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task));
};
} }
// handle error message.
Some(DispatcherMessage::Error(res)) => { Some(DispatcherMessage::Error(res)) => {
// send_response would update InnerDispatcher state to SendPayload or
// None(If response body is empty).
// continue loop to poll it.
self.as_mut() self.as_mut()
.send_response(res, ResponseBody::Other(Body::Empty))?; .send_response(res, ResponseBody::Other(Body::Empty))?;
true
} }
// return with upgrade request and poll it exclusively.
Some(DispatcherMessage::Upgrade(req)) => { Some(DispatcherMessage::Upgrade(req)) => {
return Ok(PollResponse::Upgrade(req)); return Ok(PollResponse::Upgrade(req));
} }
// all messages are dealt with. None => false,
None => return Ok(PollResponse::DoNothing),
}, },
StateProj::ServiceCall(fut) => match fut.poll(cx) { StateProj::ExpectCall(fut) => match fut.poll(cx) {
// service call resolved. send response. Poll::Ready(Ok(req)) => {
Poll::Ready(Ok(res)) => { self.as_mut().send_continue();
let (res, body) = res.into().replace_body(()); this = self.as_mut().project();
self.as_mut().send_response(res, body)?; let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall(fut));
continue;
} }
// send service call error as response
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())?; self.as_mut().send_response(res, body.into_body())?;
true
} }
// service call pending and could be waiting for more chunk messages. Poll::Pending => false,
// (pipeline message limit and/or payload can_read limit) },
Poll::Pending => { StateProj::ServiceCall(fut) => match fut.poll(cx) {
// no new message is decoded and no new payload is feed. Poll::Ready(Ok(res)) => {
// nothing to do except waiting for new incoming data from client. let (res, body) = res.into().replace_body(());
if !self.as_mut().poll_request(cx)? { self.as_mut().send_response(res, body)?;
return Ok(PollResponse::DoNothing); continue;
} }
// otherwise keep loop. Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())?;
true
} }
Poll::Pending => false,
}, },
StateProj::SendPayload(mut stream) => { StateProj::SendPayload(mut stream) => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
loop { loop {
if this.write_buf.len() < HW_BUFFER_SIZE { if this.write_buf.len() < HW_BUFFER_SIZE {
match stream.as_mut().poll_next(cx) { match stream.as_mut().poll_next(cx) {
@ -402,60 +411,50 @@ where
Message::Chunk(Some(item)), Message::Chunk(Some(item)),
&mut this.write_buf, &mut this.write_buf,
)?; )?;
continue;
} }
Poll::Ready(None) => { Poll::Ready(None) => {
this.codec.encode( this.codec.encode(
Message::Chunk(None), Message::Chunk(None),
&mut this.write_buf, &mut this.write_buf,
)?; )?;
// payload stream finished. this = self.as_mut().project();
// break and goes out of scope of borrowed stream. this.state.set(State::None);
break;
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(_))) => {
return Err(DispatchError::Service(e)) return Err(DispatchError::Unknown)
} }
// Payload Stream Pending should only be given when the caller
// promise to wake it up properly.
//
// TODO: Think if it's an good idea to mix in a self wake up.
// It would turn dispatcher into a busy polling style of stream
// handling. (Or another timer as source of scheduled wake up)
// As There is no way to know when or how the caller would wake
// up the stream so a self wake up is an overhead that would
// result in a double polling(or an extra timer)
Poll::Pending => return Ok(PollResponse::DoNothing), Poll::Pending => return Ok(PollResponse::DoNothing),
} }
} else { } else {
// buffer is beyond max size.
// return and write the whole buffer to io stream.
return Ok(PollResponse::DrainWriteBuf); return Ok(PollResponse::DrainWriteBuf);
} }
break;
} }
// break from Poll::Ready(None) on stream finished. continue;
// this is for re borrow InnerDispatcher state and set it to None.
this.state.set(State::None);
} }
StateProj::ExpectCall(fut) => match fut.poll(cx) { };
// expect resolved. write continue to buffer and set InnerDispatcher state
// to service call. // state is changed and continue when the state is not Empty
Poll::Ready(Ok(req)) => { if state_change {
this.write_buf if !self.state.is_empty() {
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); continue;
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall(fut));
} }
// send expect error as response } else {
Poll::Ready(Err(e)) => { // if read-backpressure is enabled and we consumed some data.
let res: Response = e.into().into(); // we may read more data and retry
let (res, body) = res.replace_body(()); if self.state.is_call() {
self.as_mut().send_response(res, body.into_body())?; if self.as_mut().poll_request(cx)? {
continue;
} }
// expect must be solved before progress can be made. } else if !self.messages.is_empty() {
Poll::Pending => return Ok(PollResponse::DoNothing), continue;
},
} }
} }
break;
}
Ok(PollResponse::DoNothing)
} }
fn handle_request( fn handle_request(
@ -463,28 +462,52 @@ where
req: Request, req: Request,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let mut this = self.as_mut().project();
// Handle `EXPECT: 100-Continue` header // Handle `EXPECT: 100-Continue` header
if req.head().expect() { if req.head().expect() {
// set InnerDispatcher state so the future is pinned. // set dispatcher state so the future is pinned.
let mut this = self.as_mut().project();
let task = this.flow.expect.call(req); let task = this.flow.expect.call(req);
this.state.set(State::ExpectCall(task)); this.state.set(State::ExpectCall(task));
} else { } else {
// the same as above. // the same as above.
let mut this = self.as_mut().project();
let task = this.flow.service.call(req); let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task)); this.state.set(State::ServiceCall(task));
}; };
// eagerly poll the future for once(or twice if expect is resolved immediately). // eagerly poll the future for once(or twice if expect is resolved immediately).
loop { loop {
match this.state.as_mut().project() { match self.as_mut().project().state.project() {
StateProj::ExpectCall(fut) => {
match fut.poll(cx) {
// expect is resolved. continue loop and poll the service call branch.
Poll::Ready(Ok(req)) => {
self.as_mut().send_continue();
let mut this = self.as_mut().project();
let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task));
continue;
}
// future is pending. return Ok(()) to notify that a new state is
// set and the outer loop should be continue.
Poll::Pending => return Ok(()),
// future is error. send response and return a result. On success
// to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Err(e)) => {
let e = e.into();
let res: Response = e.into();
let (res, body) = res.replace_body(());
return self.send_response(res, body.into_body());
}
}
}
StateProj::ServiceCall(fut) => { StateProj::ServiceCall(fut) => {
// return no matter the service call future's result. // return no matter the service call future's result.
return match fut.poll(cx) { return match fut.poll(cx) {
// future is resolved. send response and return a result. On success // future is resolved. send response and return a result. On success
// to notify the dispatcher a new InnerDispatcher state is set and the // to notify the dispatcher a new state is set and the outer loop
// outer loop should be continue. // should be continue.
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.send_response(res, body) self.send_response(res, body)
@ -499,28 +522,6 @@ where
} }
}; };
} }
StateProj::ExpectCall(fut) => {
match fut.poll(cx) {
// expect is resolved. continue loop and poll the service call branch.
Poll::Ready(Ok(req)) => {
this.write_buf
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
let task = this.flow.service.call(req);
this.state.as_mut().set(State::ServiceCall(task));
}
// future is pending. return Ok(()) to notify that a new InnerDispatcher
// state is set and the outer loop should be continue.
Poll::Pending => return Ok(()),
// future is error. send response and return a result. On success
// to notify the dispatcher a new InnerDispatcher state is set and
// the outer loop should be continue.
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
return self.send_response(res, body.into_body());
}
}
}
_ => unreachable!( _ => unreachable!(
"State must be set to ServiceCall or ExceptCall in handle_request" "State must be set to ServiceCall or ExceptCall in handle_request"
), ),
@ -547,39 +548,26 @@ where
this.flags.insert(Flags::STARTED); this.flags.insert(Flags::STARTED);
match msg { match msg {
// handle new request.
Message::Item(mut req) => { Message::Item(mut req) => {
let pl = this.codec.message_type();
req.head_mut().peer_addr = *this.peer_addr; req.head_mut().peer_addr = *this.peer_addr;
// merge on_connect_ext data into request extensions // merge on_connect_ext data into request extensions
this.on_connect_data.merge_into(&mut req); this.on_connect_data.merge_into(&mut req);
match this.codec.message_type() { if pl == MessageType::Stream && this.flow.upgrade.is_some() {
// break when upgrade received. this.messages.push_back(DispatcherMessage::Upgrade(req));
// existing buffer and io stream would be handled by framed
// after upgrade success.
MessageType::Stream if this.flow.upgrade.is_some() => {
this.messages
.push_back(DispatcherMessage::Upgrade(req));
break; break;
} }
// construct request and payload. if pl == MessageType::Payload || pl == MessageType::Stream {
MessageType::Payload | MessageType::Stream => {
// PayloadSender and Payload are smart pointers share the same
// state. Payload is pass to Request and handed to service
// for extracting state. PayloadSender is held by dispatcher
// to push new data/error to state.
let (ps, pl) = Payload::create(false); let (ps, pl) = Payload::create(false);
let (req1, _) = let (req1, _) =
req.replace_payload(crate::Payload::H1(pl)); req.replace_payload(crate::Payload::H1(pl));
req = req1; req = req1;
*this.payload = Some(ps); *this.payload = Some(ps);
} }
// Ignore empty payload.
MessageType::None => {}
}
// handle request early if no future lives in InnerDispatcher state. // handle request early
if this.state.is_empty() { if this.state.is_empty() {
self.as_mut().handle_request(req, cx)?; self.as_mut().handle_request(req, cx)?;
this = self.as_mut().project(); this = self.as_mut().project();
@ -587,60 +575,54 @@ where
this.messages.push_back(DispatcherMessage::Item(req)); this.messages.push_back(DispatcherMessage::Item(req));
} }
} }
Message::Chunk(Some(chunk)) => match this.payload { Message::Chunk(Some(chunk)) => {
Some(ref mut payload) => payload.feed_data(chunk), if let Some(ref mut payload) = this.payload {
None => { payload.feed_data(chunk);
} else {
error!( error!(
"Internal server error: unexpected payload chunk" "Internal server error: unexpected payload chunk"
); );
self.as_mut().response_error( this.flags.insert(Flags::READ_DISCONNECT);
this.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish().drop_body(), Response::InternalServerError().finish().drop_body(),
DispatchError::InternalError, ));
); *this.error = Some(DispatchError::InternalError);
this = self.project();
break; break;
} }
}, }
Message::Chunk(None) => match this.payload.take() { Message::Chunk(None) => {
Some(mut payload) => payload.feed_eof(), if let Some(mut payload) = this.payload.take() {
None => { payload.feed_eof();
} else {
error!("Internal server error: unexpected eof"); error!("Internal server error: unexpected eof");
self.as_mut().response_error( this.flags.insert(Flags::READ_DISCONNECT);
this.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish().drop_body(), Response::InternalServerError().finish().drop_body(),
DispatchError::InternalError, ));
); *this.error = Some(DispatchError::InternalError);
this = self.project();
break; break;
} }
}, }
} }
} }
Ok(None) => break, Ok(None) => break,
Err(ParseError::Io(e)) => { Err(ParseError::Io(e)) => {
self.as_mut().client_disconnected(e); self.as_mut().client_disconnected();
this = self.as_mut().project(); this = self.as_mut().project();
break; *this.error = Some(DispatchError::Io(e));
}
// big size requests overflow should be responded with 413
Err(ParseError::TooLarge) => {
if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted);
}
self.as_mut().response_error(
Response::PayloadTooLarge().finish().drop_body(),
ParseError::TooLarge,
);
this = self.project();
break; break;
} }
Err(e) => { Err(e) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted); payload.set_error(PayloadError::EncodingCorrupted);
} }
// Malformed requests should be responded with 400 // Malformed requests should be responded with 400
self.as_mut() this.messages.push_back(DispatcherMessage::Error(
.response_error(Response::BadRequest().finish().drop_body(), e); Response::BadRequest().finish().drop_body(),
this = self.project(); ));
this.flags.insert(Flags::READ_DISCONNECT);
*this.error = Some(e.into());
break; break;
} }
} }
@ -651,7 +633,6 @@ where
*this.ka_expire = expire; *this.ka_expire = expire;
} }
} }
Ok(updated) Ok(updated)
} }
@ -739,94 +720,6 @@ where
} }
Ok(()) Ok(())
} }
/// Returns true when io stream can be disconnected after write to it.
///
/// It covers these conditions:
///
/// - `Flags::READ_DISCONNECT` flag active.
/// - `std::io::ErrorKind::ConnectionReset` after partial read.
/// - all data read done.
#[inline(always)]
fn read_available(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
let this = self.project();
if this.flags.contains(Flags::READ_DISCONNECT) {
return Ok(true);
};
let mut io = Pin::new(this.io.as_mut().unwrap());
let mut read_some = false;
loop {
// grow buffer if necessary.
let remaining = this.read_buf.capacity() - this.read_buf.len();
if remaining < LW_BUFFER_SIZE {
this.read_buf.reserve(HW_BUFFER_SIZE - remaining);
}
match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) {
Poll::Pending => return Ok(false),
Poll::Ready(Ok(n)) => {
if n == 0 {
return Ok(true);
} else {
// Return early when read buf exceed decoder's max buffer size.
if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE {
// at this point it's not known io is still scheduled to
// be waked up. so force wake up dispatcher just in case.
// TODO: figure out the overhead.
cx.waker().wake_by_ref();
return Ok(false);
}
read_some = true;
}
}
Poll::Ready(Err(err)) => {
return if err.kind() == io::ErrorKind::WouldBlock {
Ok(false)
} else if err.kind() == io::ErrorKind::ConnectionReset && read_some {
Ok(true)
} else {
Err(DispatchError::Io(err))
}
}
}
}
}
/// call upgrade service with request.
fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future {
let this = self.project();
let mut parts = FramedParts::with_read_buf(
this.io.take().unwrap(),
mem::take(this.codec),
mem::take(this.read_buf),
);
parts.write_buf = mem::take(this.write_buf);
let framed = Framed::from_parts(parts);
this.flow.upgrade.as_ref().unwrap().call((req, framed))
}
/// response error handler.
fn response_error(
self: Pin<&mut Self>,
res: Response<()>,
err: impl Into<DispatchError>,
) {
let this = self.project();
// set flag to read disconnect so no new data is read.
this.flags.insert(Flags::READ_DISCONNECT);
// add response to message and send back to client.
this.messages.push_back(DispatcherMessage::Error(res));
// attach error for resolve dispatcher future with error.
*this.error = Some(err.into());
}
} }
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
@ -860,10 +753,9 @@ where
if inner.flags.contains(Flags::WRITE_DISCONNECT) { if inner.flags.contains(Flags::WRITE_DISCONNECT) {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else { } else {
// flush buffer. // flush buffer
inner.as_mut().poll_flush(cx)?; inner.as_mut().poll_flush(cx)?;
if !inner.write_buf.is_empty() { if !inner.write_buf.is_empty() {
// still have unfinished data. wait.
Poll::Pending Poll::Pending
} else { } else {
Pin::new(inner.project().io.as_mut().unwrap()) Pin::new(inner.project().io.as_mut().unwrap())
@ -872,46 +764,60 @@ where
} }
} }
} else { } else {
// read from io stream and fill read buffer. // read socket into a buf
let should_disconnect = inner.as_mut().read_available(cx)?; let should_disconnect =
if !inner.flags.contains(Flags::READ_DISCONNECT) {
let mut inner_p = inner.as_mut().project();
read_available(
cx,
inner_p.io.as_mut().unwrap(),
&mut inner_p.read_buf,
)?
} else {
None
};
inner.as_mut().poll_request(cx)?; inner.as_mut().poll_request(cx)?;
if let Some(true) = should_disconnect {
// io stream should to be closed. let inner_p = inner.as_mut().project();
if should_disconnect { inner_p.flags.insert(Flags::READ_DISCONNECT);
let inner = inner.as_mut().project(); if let Some(mut payload) = inner_p.payload.take() {
inner.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = inner.payload.take() {
payload.feed_eof(); payload.feed_eof();
} }
}; };
loop { loop {
// grow buffer if necessary. let inner_p = inner.as_mut().project();
{
let inner = inner.as_mut().project();
let remaining = let remaining =
inner.write_buf.capacity() - inner.write_buf.len(); inner_p.write_buf.capacity() - inner_p.write_buf.len();
if remaining < LW_BUFFER_SIZE { if remaining < LW_BUFFER_SIZE {
inner.write_buf.reserve(HW_BUFFER_SIZE - remaining); inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining);
}
} }
let result = inner.as_mut().poll_response(cx)?;
let drain = result == PollResponse::DrainWriteBuf;
// poll_response and populate write buffer. // switch to upgrade handler
// drain indicate if write buffer should be emptied before next run. if let PollResponse::Upgrade(req) = result {
let drain = match inner.as_mut().poll_response(cx)? { let inner_p = inner.as_mut().project();
PollResponse::DrainWriteBuf => true, let mut parts = FramedParts::with_read_buf(
PollResponse::DoNothing => false, inner_p.io.take().unwrap(),
// upgrade request and goes Upgrade variant of DispatcherState. mem::take(inner_p.codec),
PollResponse::Upgrade(req) => { mem::take(inner_p.read_buf),
let upgrade = inner.upgrade(req); );
parts.write_buf = mem::take(inner_p.write_buf);
let framed = Framed::from_parts(parts);
let upgrade = inner_p
.flow
.upgrade
.as_ref()
.unwrap()
.call((req, framed));
self.as_mut() self.as_mut()
.project() .project()
.inner .inner
.set(DispatcherState::Upgrade(upgrade)); .set(DispatcherState::Upgrade(upgrade));
return self.poll(cx); return self.poll(cx);
} }
};
// we didn't get WouldBlock from write operation, // we didn't get WouldBlock from write operation,
// so data get written to kernel completely (macOS) // so data get written to kernel completely (macOS)
@ -929,29 +835,28 @@ where
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
// check if still have unsolved future in InnerDispatcher state.
let is_empty = inner.state.is_empty(); let is_empty = inner.state.is_empty();
let inner = inner.as_mut().project(); let inner_p = inner.as_mut().project();
// read half is closed and we do not processing any responses // read half is closed and we do not processing any responses
if inner.flags.contains(Flags::READ_DISCONNECT) && is_empty { if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty {
inner.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
} }
// keep-alive and stream errors // keep-alive and stream errors
if is_empty && inner.write_buf.is_empty() { if is_empty && inner_p.write_buf.is_empty() {
if let Some(err) = inner.error.take() { if let Some(err) = inner_p.error.take() {
Poll::Ready(Err(err)) Poll::Ready(Err(err))
} }
// disconnect if keep-alive is not enabled // disconnect if keep-alive is not enabled
else if inner.flags.contains(Flags::STARTED) else if inner_p.flags.contains(Flags::STARTED)
&& !inner.flags.intersects(Flags::KEEPALIVE) && !inner_p.flags.intersects(Flags::KEEPALIVE)
{ {
inner.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
self.poll(cx) self.poll(cx)
} }
// disconnect if shutdown // disconnect if shutdown
else if inner.flags.contains(Flags::SHUTDOWN) { else if inner_p.flags.contains(Flags::SHUTDOWN) {
self.poll(cx) self.poll(cx)
} else { } else {
Poll::Pending Poll::Pending
@ -969,6 +874,61 @@ where
} }
} }
/// Returns either:
/// - `Ok(Some(true))` - data was read and done reading all data.
/// - `Ok(Some(false))` - data was read but there should be more to read.
/// - `Ok(None)` - no data was read but there should be more to read later.
/// - Unhandled Errors
fn read_available<T>(
cx: &mut Context<'_>,
io: &mut T,
buf: &mut BytesMut,
) -> Result<Option<bool>, io::Error>
where
T: AsyncRead + Unpin,
{
let mut read_some = false;
loop {
// If buf is full return but do not disconnect since
// there is more reading to be done
if buf.len() >= HW_BUFFER_SIZE {
return Ok(Some(false));
}
let remaining = buf.capacity() - buf.len();
if remaining < LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE - remaining);
}
match actix_codec::poll_read_buf(Pin::new(io), cx, buf) {
Poll::Pending => {
return if read_some { Ok(Some(false)) } else { Ok(None) };
}
Poll::Ready(Ok(n)) => {
if n == 0 {
return Ok(Some(true));
} else {
read_some = true;
}
}
Poll::Ready(Err(err)) => {
return if err.kind() == io::ErrorKind::WouldBlock {
if read_some {
Ok(Some(false))
} else {
Ok(None)
}
} else if err.kind() == io::ErrorKind::ConnectionReset && read_some {
Ok(Some(true))
} else {
Err(err)
}
}
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::str; use std::str;