use config in dispatcher directly

This commit is contained in:
Rob Ede 2022-01-27 02:18:12 +00:00
parent 79b16d8efa
commit a323a5afec
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
4 changed files with 32 additions and 12 deletions

View File

@ -144,6 +144,7 @@ pin_project! {
flags: Flags, flags: Flags,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
conn_data: Option<Rc<Extensions>>, conn_data: Option<Rc<Extensions>>,
config: ServiceConfig,
error: Option<DispatchError>, error: Option<DispatchError>,
#[pin] #[pin]
@ -252,6 +253,7 @@ where
flags, flags,
peer_addr, peer_addr,
conn_data: conn_data.0.map(Rc::new), conn_data: conn_data.0.map(Rc::new),
config: config.clone(),
error: None, error: None,
state: State::None, state: State::None,
@ -346,14 +348,17 @@ where
message: Response<()>, message: Response<()>,
body: &impl MessageBody, body: &impl MessageBody,
) -> Result<BodySize, DispatchError> { ) -> Result<BodySize, DispatchError> {
let size = body.size();
let this = self.project(); let this = self.project();
let size = body.size();
this.codec this.codec
.encode(Message::Item((message, size)), this.write_buf) .encode(Message::Item((message, size)), this.write_buf)
.map_err(|err| { .map_err(|err| {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
} }
DispatchError::Io(err) DispatchError::Io(err)
})?; })?;
@ -373,6 +378,7 @@ where
_ => State::SendPayload { body }, _ => State::SendPayload { body },
}; };
self.project().state.set(state); self.project().state.set(state);
Ok(()) Ok(())
} }
@ -773,7 +779,7 @@ where
} }
if updated && this.timer.is_some() { if updated && this.timer.is_some() {
if let Some(expire) = this.codec.config().keep_alive_deadline() { if let Some(expire) = this.config.keep_alive_deadline() {
*this.ka_deadline = expire; *this.ka_deadline = expire;
} }
} }
@ -789,7 +795,7 @@ where
None => { None => {
// conditionally go into shutdown timeout // conditionally go into shutdown timeout
if this.flags.contains(Flags::SHUTDOWN) { if this.flags.contains(Flags::SHUTDOWN) {
if let Some(deadline) = this.codec.config().client_disconnect_deadline() { if let Some(deadline) = this.config.client_disconnect_deadline() {
// write client disconnect time out and poll again to // write client disconnect time out and poll again to
// go into Some(timer) branch // go into Some(timer) branch
this.timer.set(Some(sleep_until(deadline))); this.timer.set(Some(sleep_until(deadline)));
@ -815,8 +821,7 @@ where
this.flags.insert(Flags::SHUTDOWN); this.flags.insert(Flags::SHUTDOWN);
// start shutdown timeout // start shutdown timeout
if let Some(deadline) = if let Some(deadline) = this.config.client_disconnect_deadline()
this.codec.config().client_disconnect_deadline()
{ {
timer.as_mut().reset(deadline); timer.as_mut().reset(deadline);
let _ = timer.poll(cx); let _ = timer.poll(cx);
@ -840,8 +845,7 @@ where
this = self.project(); this = self.project();
this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
} }
} else if let Some(deadline) = this.codec.config().keep_alive_deadline() } else if let Some(deadline) = this.config.keep_alive_deadline() {
{
// still have unfinished task. try to reset and register keep-alive // still have unfinished task. try to reset and register keep-alive
timer.as_mut().reset(deadline); timer.as_mut().reset(deadline);
let _ = timer.poll(cx); let _ = timer.poll(cx);
@ -1015,11 +1019,13 @@ where
}; };
loop { loop {
// poll_response and populate write buffer. // poll response to populate write buffer
// drain indicate if write buffer should be emptied before next run. // drain indicates whether write buffer should be emptied before next run
let drain = match inner.as_mut().poll_response(cx)? { let drain = match inner.as_mut().poll_response(cx)? {
PollResponse::DrainWriteBuf => true, PollResponse::DrainWriteBuf => true,
PollResponse::DoNothing => false, PollResponse::DoNothing => false,
// upgrade request and goes Upgrade variant of DispatcherState. // upgrade request and goes Upgrade variant of DispatcherState.
PollResponse::Upgrade(req) => { PollResponse::Upgrade(req) => {
let upgrade = inner.upgrade(req); let upgrade = inner.upgrade(req);

View File

@ -630,7 +630,7 @@ impl Removed {
/// Returns true if iterator contains no elements, without consuming it. /// Returns true if iterator contains no elements, without consuming it.
/// ///
/// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate /// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate
/// wether any items were actually replaced or removed, respectively. /// whether any items were actually replaced or removed, respectively.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
match self.inner { match self.inner {
// size hint lower bound of smallvec is the correct length // size hint lower bound of smallvec is the correct length

View File

@ -42,7 +42,7 @@ impl ResponseHead {
&mut self.headers &mut self.headers
} }
/// Sets the flag that controls wether to send headers formatted as Camel-Case. /// Sets the flag that controls whether to send headers formatted as Camel-Case.
/// ///
/// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase. /// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase.
#[inline] #[inline]

View File

@ -2,7 +2,7 @@ use std::{
convert::Infallible, convert::Infallible,
io::{Read, Write}, io::{Read, Write},
net, thread, net, thread,
time::Duration, time::{Duration, Instant},
}; };
use actix_http::{ use actix_http::{
@ -201,11 +201,14 @@ async fn test_slow_request() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.client_timeout(100) .client_timeout(100)
.keep_alive(2)
.finish(|_| ok::<_, Infallible>(Response::ok())) .finish(|_| ok::<_, Infallible>(Response::ok()))
.tcp() .tcp()
}) })
.await; .await;
let start = Instant::now();
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n");
let mut data = String::new(); let mut data = String::new();
@ -216,6 +219,17 @@ async fn test_slow_request() {
data data
); );
let end = Instant::now();
let diff = end - start;
if diff < Duration::from_secs(1) {
// test success
} else if diff < Duration::from_secs(3) {
panic!("request seems to have wrongly timed-out according to keep-alive");
} else {
panic!("request took way too long to time out");
}
srv.stop().await; srv.stop().await;
} }