Merge branch 'master' into feat/awc_response_timeout

This commit is contained in:
fakeshadow 2021-02-07 18:23:39 -08:00 committed by GitHub
commit 38eaada70b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 240 additions and 107 deletions

View File

@ -4,6 +4,8 @@
### Added
* The method `Either<web::Json<T>, web::Form<T>>::into_inner()` which returns the inner type for
whichever variant was created. Also works for `Either<web::Form<T>, web::Json<T>>`. [#1894]
* Add `services!` macro for helping register multiple services to `App`. [#1933]
* Enable registering vector of same type of `HttpServiceFactory` to `App` [#1933]
### Changed
* Rework `Responder` trait to be sync and returns `Response`/`HttpResponse` directly.
@ -34,6 +36,7 @@
[#1869]: https://github.com/actix/actix-web/pull/1869
[#1905]: https://github.com/actix/actix-web/pull/1905
[#1906]: https://github.com/actix/actix-web/pull/1906
[#1933]: https://github.com/actix/actix-web/pull/1933
[#1957]: https://github.com/actix/actix-web/pull/1957

View File

@ -734,6 +734,78 @@ where
}
Ok(())
}
/// Returns true when io stream can be disconnected after write to it.
///
/// It covers these conditions:
///
/// - `std::io::ErrorKind::ConnectionReset` after partial read.
/// - all data read done.
#[inline(always)]
fn read_available(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
let this = self.project();
if this.flags.contains(Flags::READ_DISCONNECT) {
return Ok(false);
};
let mut io = Pin::new(this.io.as_mut().unwrap());
let mut read_some = false;
loop {
// grow buffer if necessary.
let remaining = this.read_buf.capacity() - this.read_buf.len();
if remaining < LW_BUFFER_SIZE {
this.read_buf.reserve(HW_BUFFER_SIZE - remaining);
}
match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) {
Poll::Pending => return Ok(false),
Poll::Ready(Ok(n)) => {
if n == 0 {
return Ok(true);
} else {
// Return early when read buf exceed decoder's max buffer size.
if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE {
// at this point it's not known io is still scheduled to
// be waked up. so force wake up dispatcher just in case.
// TODO: figure out the overhead.
cx.waker().wake_by_ref();
return Ok(false);
}
read_some = true;
}
}
Poll::Ready(Err(err)) => {
return if err.kind() == io::ErrorKind::WouldBlock {
Ok(false)
} else if err.kind() == io::ErrorKind::ConnectionReset && read_some {
Ok(true)
} else {
Err(DispatchError::Io(err))
}
}
}
}
}
/// call upgrade service with request.
fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future {
let this = self.project();
let mut parts = FramedParts::with_read_buf(
this.io.take().unwrap(),
mem::take(this.codec),
mem::take(this.read_buf),
);
parts.write_buf = mem::take(this.write_buf);
let framed = Framed::from_parts(parts);
this.flow.upgrade.as_ref().unwrap().call((req, framed))
}
}
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
@ -778,60 +850,36 @@ where
}
}
} else {
// read socket into a buf
let should_disconnect =
if !inner.flags.contains(Flags::READ_DISCONNECT) {
let mut inner_p = inner.as_mut().project();
read_available(
cx,
inner_p.io.as_mut().unwrap(),
&mut inner_p.read_buf,
)?
} else {
None
};
// read from io stream and fill read buffer.
let should_disconnect = inner.as_mut().read_available(cx)?;
inner.as_mut().poll_request(cx)?;
if let Some(true) = should_disconnect {
let inner_p = inner.as_mut().project();
inner_p.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = inner_p.payload.take() {
// io stream should to be closed.
if should_disconnect {
let inner = inner.as_mut().project();
inner.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = inner.payload.take() {
payload.feed_eof();
}
};
loop {
let inner_p = inner.as_mut().project();
let remaining =
inner_p.write_buf.capacity() - inner_p.write_buf.len();
if remaining < LW_BUFFER_SIZE {
inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining);
}
let result = inner.as_mut().poll_response(cx)?;
let drain = result == PollResponse::DrainWriteBuf;
// switch to upgrade handler
if let PollResponse::Upgrade(req) = result {
let inner_p = inner.as_mut().project();
let mut parts = FramedParts::with_read_buf(
inner_p.io.take().unwrap(),
mem::take(inner_p.codec),
mem::take(inner_p.read_buf),
);
parts.write_buf = mem::take(inner_p.write_buf);
let framed = Framed::from_parts(parts);
let upgrade = inner_p
.flow
.upgrade
.as_ref()
.unwrap()
.call((req, framed));
self.as_mut()
.project()
.inner
.set(DispatcherState::Upgrade(upgrade));
return self.poll(cx);
}
// poll_response and populate write buffer.
// drain indicate if write buffer should be emptied before next run.
let drain = match inner.as_mut().poll_response(cx)? {
PollResponse::DrainWriteBuf => true,
PollResponse::DoNothing => false,
// upgrade request and goes Upgrade variant of DispatcherState.
PollResponse::Upgrade(req) => {
let upgrade = inner.upgrade(req);
self.as_mut()
.project()
.inner
.set(DispatcherState::Upgrade(upgrade));
return self.poll(cx);
}
};
// we didn't get WouldBlock from write operation,
// so data get written to kernel completely (macOS)
@ -888,66 +936,6 @@ where
}
}
/// Returns either:
/// - `Ok(Some(true))` - data was read and done reading all data.
/// - `Ok(Some(false))` - data was read but there should be more to read.
/// - `Ok(None)` - no data was read but there should be more to read later.
/// - Unhandled Errors
fn read_available<T>(
cx: &mut Context<'_>,
io: &mut T,
buf: &mut BytesMut,
) -> Result<Option<bool>, io::Error>
where
T: AsyncRead + Unpin,
{
let mut read_some = false;
loop {
// reserve capacity for buffer
let remaining = buf.capacity() - buf.len();
if remaining < LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE - remaining);
}
match actix_codec::poll_read_buf(Pin::new(io), cx, buf) {
Poll::Pending => {
return if read_some { Ok(Some(false)) } else { Ok(None) };
}
Poll::Ready(Ok(n)) => {
if n == 0 {
return Ok(Some(true));
} else {
// If buf is full return but do not disconnect since
// there is more reading to be done
if buf.len() >= super::decoder::MAX_BUFFER_SIZE {
// at this point it's not known io is still scheduled to
// be waked up. so force wake up dispatcher just in case.
// TODO: figure out the overhead.
cx.waker().wake_by_ref();
return Ok(Some(false));
}
read_some = true;
}
}
Poll::Ready(Err(err)) => {
return if err.kind() == io::ErrorKind::WouldBlock {
if read_some {
Ok(Some(false))
} else {
Ok(None)
}
} else if err.kind() == io::ErrorKind::ConnectionReset && read_some {
Ok(Some(true))
} else {
Err(err)
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::str;

View File

@ -22,6 +22,13 @@ pub trait HttpServiceFactory {
fn register(self, config: &mut AppService);
}
impl<T: HttpServiceFactory> HttpServiceFactory for Vec<T> {
fn register(self, config: &mut AppService) {
self.into_iter()
.for_each(|factory| factory.register(config));
}
}
pub(crate) trait AppServiceFactory {
fn register(&mut self, config: &mut AppService);
}
@ -532,6 +539,65 @@ where
}
}
/// Macro helping register different types of services at the sametime.
///
/// The service type must be implementing [`HttpServiceFactory`](self::HttpServiceFactory) trait.
///
/// The max number of services can be grouped together is 12.
///
/// # Examples
///
/// ```
/// use actix_web::{services, web, App};
///
/// let services = services![
/// web::resource("/test2").to(|| async { "test2" }),
/// web::scope("/test3").route("/", web::get().to(|| async { "test3" }))
/// ];
///
/// let app = App::new().service(services);
///
/// // services macro just convert multiple services to a tuple.
/// // below would also work without importing the macro.
/// let app = App::new().service((
/// web::resource("/test2").to(|| async { "test2" }),
/// web::scope("/test3").route("/", web::get().to(|| async { "test3" }))
/// ));
/// ```
#[macro_export]
macro_rules! services {
($($x:expr),+ $(,)?) => {
($($x,)+)
}
}
/// HttpServiceFactory trait impl for tuples
macro_rules! service_tuple ({ $(($n:tt, $T:ident)),+} => {
impl<$($T: HttpServiceFactory),+> HttpServiceFactory for ($($T,)+) {
fn register(self, config: &mut AppService) {
$(self.$n.register(config);)+
}
}
});
#[rustfmt::skip]
mod m {
use super::*;
service_tuple!((0, A));
service_tuple!((0, A), (1, B));
service_tuple!((0, A), (1, B), (2, C));
service_tuple!((0, A), (1, B), (2, C), (3, D));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J), (10, K));
service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J), (10, K), (11, L));
}
#[cfg(test)]
mod tests {
use super::*;
@ -606,4 +672,80 @@ mod tests {
assert!(s.contains("ServiceResponse"));
assert!(s.contains("x-test"));
}
#[actix_rt::test]
async fn test_services_macro() {
let scoped = services![
web::service("/scoped_test1").name("scoped_test1").finish(
|req: ServiceRequest| async {
Ok(req.into_response(HttpResponse::Ok().finish()))
}
),
web::resource("/scoped_test2").to(|| async { "test2" }),
];
let services = services![
web::service("/test1")
.name("test")
.finish(|req: ServiceRequest| async {
Ok(req.into_response(HttpResponse::Ok().finish()))
}),
web::resource("/test2").to(|| async { "test2" }),
web::scope("/test3").service(scoped)
];
let srv = init_service(App::new().service(services)).await;
let req = TestRequest::with_uri("/test1").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = TestRequest::with_uri("/test2").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = TestRequest::with_uri("/test3/scoped_test1").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = TestRequest::with_uri("/test3/scoped_test2").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
}
#[actix_rt::test]
async fn test_services_vec() {
let services = vec![
web::resource("/test1").to(|| async { "test1" }),
web::resource("/test2").to(|| async { "test2" }),
];
let scoped = vec![
web::resource("/scoped_test1").to(|| async { "test1" }),
web::resource("/scoped_test2").to(|| async { "test2" }),
];
let srv = init_service(
App::new()
.service(services)
.service(web::scope("/test3").service(scoped)),
)
.await;
let req = TestRequest::with_uri("/test1").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = TestRequest::with_uri("/test2").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = TestRequest::with_uri("/test3/scoped_test1").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
let req = TestRequest::with_uri("/test3/scoped_test2").to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
}
}