Poll upgrade service from H1ServiceHandler too

This commit is contained in:
Rajasekharan Vengalil 2019-12-16 10:01:10 -08:00
parent 8741c094b4
commit 1853016922
3 changed files with 25 additions and 7 deletions

View File

@ -1,5 +1,11 @@
# Changes # Changes
## [1.0.xx] - 2019-12-xx
### Fixed
* Poll upgrade service's readiness from HTTP service handlers
## [1.0.0] - 2019-12-13 ## [1.0.0] - 2019-12-13
### Added ### Added

View File

@ -72,7 +72,7 @@ where
Request = (Request, Framed<TcpStream, Codec>), Request = (Request, Framed<TcpStream, Codec>),
Response = (), Response = (),
>, >,
U::Error: fmt::Display, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
/// Create simple tcp stream service /// Create simple tcp stream service
@ -115,7 +115,7 @@ mod openssl {
Request = (Request, Framed<SslStream<TcpStream>, Codec>), Request = (Request, Framed<SslStream<TcpStream>, Codec>),
Response = (), Response = (),
>, >,
U::Error: fmt::Display, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
/// Create openssl based service /// Create openssl based service
@ -255,7 +255,7 @@ where
X::Error: Into<Error>, X::Error: Into<Error>,
X::InitError: fmt::Debug, X::InitError: fmt::Debug,
U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>, U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
type Config = (); type Config = ();
@ -412,7 +412,7 @@ where
X: Service<Request = Request, Response = Request>, X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>, X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>, U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display + Into<Error>,
{ {
type Request = (T, Option<net::SocketAddr>); type Request = (T, Option<net::SocketAddr>);
type Response = (); type Response = ();
@ -441,6 +441,19 @@ where
.is_ready() .is_ready()
&& ready; && ready;
let ready = if let Some(ref mut upg) = self.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready
} else {
ready
};
if ready { if ready {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else { } else {

View File

@ -8,11 +8,10 @@ use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{fn_factory, Service}; use actix_service::{fn_factory, Service};
use actix_utils::framed::Dispatcher; use actix_utils::framed::Dispatcher;
use bitflags::_core::task::{Context, Poll};
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
use futures::task::{Context, Poll};
use futures::{Future, SinkExt, StreamExt}; use futures::{Future, SinkExt, StreamExt};
use futures_util::future::ok;
struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>); struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>);
@ -90,7 +89,7 @@ async fn test_simple() {
move || { move || {
let ws_service = ws_service.clone(); let ws_service = ws_service.clone();
HttpService::build() HttpService::build()
.upgrade(fn_factory(move || ok::<_, ()>(ws_service.clone()))) .upgrade(fn_factory(move || future::ok::<_, ()>(ws_service.clone())))
.finish(|_| future::ok::<_, ()>(Response::NotFound())) .finish(|_| future::ok::<_, ()>(Response::NotFound()))
.tcp() .tcp()
} }