From e439d0546b992687dc57ab4d7b3b7a1dc2e320a6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim <fafhrd91@gmail.com> Date: Wed, 3 Jan 2018 18:21:34 -0800 Subject: [PATCH] * fix force_close * shutdown io before exit * fix response creation with body from pool --- src/application.rs | 2 +- src/channel.rs | 4 +++- src/context.rs | 11 ---------- src/h1.rs | 53 +++++++++++++++++++++++++-------------------- src/h1writer.rs | 16 +++++++++----- src/h2writer.rs | 2 +- src/httpresponse.rs | 2 +- src/pipeline.rs | 2 +- src/router.rs | 2 ++ 9 files changed, 48 insertions(+), 46 deletions(-) diff --git a/src/application.rs b/src/application.rs index 8c284e71..1e4d8273 100644 --- a/src/application.rs +++ b/src/application.rs @@ -48,7 +48,7 @@ impl<S: 'static> PipelineHandler<S> for Inner<S> { if path.is_empty() { req.match_info_mut().add("tail", ""); } else { - req.match_info_mut().add("tail", path.trim_left_matches('/')); + req.match_info_mut().add("tail", path.split_at(1).1); } return handler.handle(req) } diff --git a/src/channel.rs b/src/channel.rs index 633a0595..d736fd20 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -79,7 +79,9 @@ impl<T, H> HttpChannel<T, H> } } -/*impl<T: 'static, A: 'static, H: 'static> Drop for HttpChannel<T, A, H> { +/*impl<T, H> Drop for HttpChannel<T, H> + where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static +{ fn drop(&mut self) { println!("Drop http channel"); } diff --git a/src/context.rs b/src/context.rs index 1cdb6b9b..dff9344a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -14,7 +14,6 @@ use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCel use body::{Body, Binary}; use error::{Error, Result}; use httprequest::HttpRequest; -use httpresponse::HttpResponse; pub trait ActorHttpContext: 'static { @@ -124,16 +123,6 @@ impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> { self.act = Some(actor); self } - - pub fn with_actor(mut self, actor: A, mut resp: HttpResponse) -> Result<HttpResponse> { - if self.act.is_some() { - panic!("Actor is set already"); - } - self.act = Some(actor); - - resp.replace_body(Body::Actor(Box::new(self))); - Ok(resp) - } } impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> { diff --git a/src/h1.rs b/src/h1.rs index f49d0a2e..e4d2930b 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -97,11 +97,11 @@ impl<T, H> Http1<T, H> (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) } - fn poll_completed(&mut self) -> Result<bool, ()> { + fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> { // check stream state - match self.stream.poll_completed() { - Ok(Async::Ready(_)) => Ok(false), - Ok(Async::NotReady) => Ok(true), + match self.stream.poll_completed(shutdown) { + Ok(Async::Ready(_)) => Ok(true), + Ok(Async::NotReady) => Ok(false), Err(err) => { debug!("Error sending data: {}", err); Err(()) @@ -136,7 +136,7 @@ impl<T, H> Http1<T, H> if !io && !item.flags.contains(EntryFlags::EOF) { if item.flags.contains(EntryFlags::ERROR) { // check stream state - if let Ok(Async::NotReady) = self.stream.poll_completed() { + if let Ok(Async::NotReady) = self.stream.poll_completed(true) { return Ok(Async::NotReady) } return Err(()) @@ -147,12 +147,10 @@ impl<T, H> Http1<T, H> not_ready = false; // overide keep-alive state - if self.settings.keep_alive_enabled() { - if self.stream.keepalive() { - self.flags.insert(Flags::KEEPALIVE); - } else { - self.flags.remove(Flags::KEEPALIVE); - } + if self.stream.keepalive() { + self.flags.insert(Flags::KEEPALIVE); + } else { + self.flags.remove(Flags::KEEPALIVE); } self.stream.reset(); @@ -172,7 +170,7 @@ impl<T, H> Http1<T, H> item.flags.insert(EntryFlags::ERROR); // check stream state, we still can have valid data in buffer - if let Ok(Async::NotReady) = self.stream.poll_completed() { + if let Ok(Async::NotReady) = self.stream.poll_completed(true) { return Ok(Async::NotReady) } return Err(()) @@ -207,11 +205,14 @@ impl<T, H> Http1<T, H> // no keep-alive if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() { + let h2 = self.flags.contains(Flags::H2); + // check stream state - if self.poll_completed()? { + if !self.poll_completed(!h2)? { return Ok(Async::NotReady) } - if self.flags.contains(Flags::H2) { + + if h2 { return Ok(Async::Ready(Http1Result::Switch)) } else { return Ok(Async::Ready(Http1Result::Done)) @@ -284,7 +285,7 @@ impl<T, H> Http1<T, H> } } Ok(Async::NotReady) => { - // start keep-alive timer, this is also slow request timeout + // start keep-alive timer, this also is slow request timeout if self.tasks.is_empty() { if self.settings.keep_alive_enabled() { let keep_alive = self.settings.keep_alive(); @@ -300,17 +301,20 @@ impl<T, H> Http1<T, H> } } else { // check stream state - if self.poll_completed()? { + if !self.poll_completed(true)? { return Ok(Async::NotReady) } // keep-alive disable, drop connection return Ok(Async::Ready(Http1Result::Done)) } - } else { - // check stream state - self.poll_completed()?; - // keep-alive unset, rely on operating system + } else if !self.poll_completed(false)? || + self.flags.contains(Flags::KEEPALIVE) + { + // check stream state or + // if keep-alive unset, rely on operating system return Ok(Async::NotReady) + } else { + return Ok(Async::Ready(Http1Result::Done)) } } break @@ -320,12 +324,13 @@ impl<T, H> Http1<T, H> // check for parse error if self.tasks.is_empty() { + let h2 = self.flags.contains(Flags::H2); + // check stream state - if self.poll_completed()? { + if !self.poll_completed(!h2)? { return Ok(Async::NotReady) } - - if self.flags.contains(Flags::H2) { + if h2 { return Ok(Async::Ready(Http1Result::Switch)) } if self.flags.contains(Flags::ERROR) || self.keepalive_timer.is_none() { @@ -334,7 +339,7 @@ impl<T, H> Http1<T, H> } if not_ready { - self.poll_completed()?; + self.poll_completed(false)?; return Ok(Async::NotReady) } } diff --git a/src/h1writer.rs b/src/h1writer.rs index 5352e743..200ff052 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -33,7 +33,7 @@ pub trait Writer { fn write_eof(&mut self) -> Result<WriterState, io::Error>; - fn poll_completed(&mut self) -> Poll<(), io::Error>; + fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>; } bitflags! { @@ -94,7 +94,7 @@ impl<T: AsyncWrite> H1Writer<T> { while !buffer.is_empty() { match self.stream.write(buffer.as_ref()) { Ok(n) => { - buffer.split_to(n); + let _ = buffer.split_to(n); }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { if buffer.len() > MAX_WRITE_BUFFER_SIZE { @@ -112,7 +112,6 @@ impl<T: AsyncWrite> H1Writer<T> { impl<T: AsyncWrite> Writer for H1Writer<T> { - #[cfg_attr(feature = "cargo-clippy", allow(cast_lossless))] fn written(&self) -> u64 { self.written } @@ -218,7 +217,6 @@ impl<T: AsyncWrite> Writer for H1Writer<T> { self.encoder.write_eof()?; if !self.encoder.is_eof() { - // debug!("last payload item, but it is not EOF "); Err(io::Error::new(io::ErrorKind::Other, "Last payload item, but eof is not reached")) } else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { @@ -228,9 +226,15 @@ impl<T: AsyncWrite> Writer for H1Writer<T> { } } - fn poll_completed(&mut self) -> Poll<(), io::Error> { + fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> { match self.write_to_stream() { - Ok(WriterState::Done) => Ok(Async::Ready(())), + Ok(WriterState::Done) => { + if shutdown { + self.stream.shutdown() + } else { + Ok(Async::Ready(())) + } + }, Ok(WriterState::Pause) => Ok(Async::NotReady), Err(err) => Err(err) } diff --git a/src/h2writer.rs b/src/h2writer.rs index 9af11010..57c4bd35 100644 --- a/src/h2writer.rs +++ b/src/h2writer.rs @@ -213,7 +213,7 @@ impl Writer for H2Writer { } } - fn poll_completed(&mut self) -> Poll<(), io::Error> { + fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> { match self.write_to_stream() { Ok(WriterState::Done) => Ok(Async::Ready(())), Ok(WriterState::Pause) => Ok(Async::NotReady), diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 5d5e85fc..f0855301 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -674,7 +674,7 @@ impl Pool { POOL.with(|pool| { if let Some(mut resp) = pool.borrow_mut().0.pop_front() { resp.status = status; - resp.body = Body::Empty; + resp.body = body; resp } else { Box::new(InnerHttpResponse::new(status, body)) diff --git a/src/pipeline.rs b/src/pipeline.rs index ad26266f..44c50310 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -658,7 +658,7 @@ impl<S, H> ProcessResponse<S, H> { // flush io but only if we need to if self.running == RunningState::Paused || self.drain.is_some() { - match io.poll_completed() { + match io.poll_completed(false) { Ok(Async::Ready(_)) => { self.running.resume(); diff --git a/src/router.rs b/src/router.rs index ca678341..560f7de7 100644 --- a/src/router.rs +++ b/src/router.rs @@ -189,6 +189,8 @@ impl Pattern { } /// Extract pattern parameters from the text + // This method unsafe internally, assumption that Pattern instance lives + // longer than `req` pub fn update_match_info<S>(&self, req: &mut HttpRequest<S>, prefix: usize) { if !self.names.is_empty() { let text: &str = unsafe{ mem::transmute(&req.path()[prefix..]) };