change behavior of default upgrade handler

This commit is contained in:
fakeshadow 2021-03-13 21:32:29 +08:00
parent 22dcc31193
commit f0987c4668
2 changed files with 29 additions and 28 deletions

View File

@ -2,7 +2,7 @@ use std::task::Poll;
use actix_codec::Framed; use actix_codec::Framed;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_core::future::LocalBoxFuture;
use crate::error::Error; use crate::error::Error;
use crate::h1::Codec; use crate::h1::Codec;
@ -16,7 +16,7 @@ impl<T> ServiceFactory<(Request, Framed<T, Codec>)> for UpgradeHandler {
type Config = (); type Config = ();
type Service = UpgradeHandler; type Service = UpgradeHandler;
type InitError = Error; type InitError = Error;
type Future = Ready<Result<Self::Service, Self::InitError>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
unimplemented!() unimplemented!()
@ -26,11 +26,11 @@ impl<T> ServiceFactory<(Request, Framed<T, Codec>)> for UpgradeHandler {
impl<T> Service<(Request, Framed<T, Codec>)> for UpgradeHandler { impl<T> Service<(Request, Framed<T, Codec>)> for UpgradeHandler {
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type Future = Ready<Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!(); actix_service::always_ready!();
fn call(&self, _: (Request, Framed<T, Codec>)) -> Self::Future { fn call(&self, _: (Request, Framed<T, Codec>)) -> Self::Future {
ready(Ok(())) unimplemented!()
} }
} }

View File

@ -12,13 +12,6 @@ use actix_http::{
use actix_server::{Server, ServerBuilder}; use actix_server::{Server, ServerBuilder};
use actix_service::{map_config, IntoServiceFactory, Service, ServiceFactory}; use actix_service::{map_config, IntoServiceFactory, Service, ServiceFactory};
#[cfg(unix)]
use actix_http::Protocol;
#[cfg(unix)]
use actix_service::pipeline_factory;
#[cfg(unix)]
use futures_util::future::ok;
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
use actix_tls::accept::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use actix_tls::accept::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
@ -489,7 +482,9 @@ where
#[cfg(unix)] #[cfg(unix)]
/// Start listening for unix domain (UDS) connections on existing listener. /// Start listening for unix domain (UDS) connections on existing listener.
pub fn listen_uds(mut self, lst: std::os::unix::net::UnixListener) -> io::Result<Self> { pub fn listen_uds(mut self, lst: std::os::unix::net::UnixListener) -> io::Result<Self> {
use actix_http::Protocol;
use actix_rt::net::UnixStream; use actix_rt::net::UnixStream;
use actix_service::pipeline_factory;
let cfg = self.config.clone(); let cfg = self.config.clone();
let factory = self.factory.clone(); let factory = self.factory.clone();
@ -511,19 +506,22 @@ where
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then({ pipeline_factory(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) })
let svc = HttpService::build() .and_then({
.keep_alive(c.keep_alive) let svc = HttpService::build()
.client_timeout(c.client_timeout); .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout);
let svc = if let Some(handler) = on_connect_fn.clone() { let svc = if let Some(handler) = on_connect_fn.clone() {
svc.on_connect_ext(move |io: &_, ext: _| (&*handler)(io as &dyn Any, ext)) svc.on_connect_ext(move |io: &_, ext: _| {
} else { (&*handler)(io as &dyn Any, ext)
svc })
}; } else {
svc
};
svc.finish(map_config(factory(), move |_| config.clone())) svc.finish(map_config(factory(), move |_| config.clone()))
}) })
})?; })?;
Ok(self) Ok(self)
} }
@ -534,7 +532,9 @@ where
where where
A: AsRef<std::path::Path>, A: AsRef<std::path::Path>,
{ {
use actix_http::Protocol;
use actix_rt::net::UnixStream; use actix_rt::net::UnixStream;
use actix_service::pipeline_factory;
let cfg = self.config.clone(); let cfg = self.config.clone();
let factory = self.factory.clone(); let factory = self.factory.clone();
@ -555,12 +555,13 @@ where
socket_addr, socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then( pipeline_factory(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) })
HttpService::build() .and_then(
.keep_alive(c.keep_alive) HttpService::build()
.client_timeout(c.client_timeout) .keep_alive(c.keep_alive)
.finish(map_config(factory(), move |_| config.clone())), .client_timeout(c.client_timeout)
) .finish(map_config(factory(), move |_| config.clone())),
)
}, },
)?; )?;
Ok(self) Ok(self)