Merge branch 'master' into doc/add-native-tls-example

This commit is contained in:
Nikolay Kim 2018-08-31 17:29:50 -07:00 committed by GitHub
commit b3d7929b42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 65 deletions

View File

@ -6,6 +6,11 @@
* Added the ability to pass a custom `TlsConnector`.
### Fixed
* Handle socket read disconnect
## [0.7.4] - 2018-08-23
### Added

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.7.4"
version = "0.7.5"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
readme = "README.md"

View File

@ -38,19 +38,9 @@ impl HttpResponseParser {
where
T: IoStream,
{
// if buf is empty parse_message will always return NotReady, let's avoid that
if buf.is_empty() {
match io.read_available(buf) {
Ok(Async::Ready((_, true))) => {
return Err(HttpResponseParserError::Disconnect)
}
Ok(Async::Ready((_, false))) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
}
}
loop {
// Don't call parser until we have data to parse.
if !buf.is_empty() {
match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)?
{
@ -62,11 +52,16 @@ impl HttpResponseParser {
if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(ParseError::TooLarge));
}
// Parser needs more data.
}
}
}
// Read some more data into the buffer for the parser.
match io.read_available(buf) {
Ok(Async::Ready((_, true))) => {
Ok(Async::Ready((false, true))) => {
return Err(HttpResponseParserError::Disconnect)
}
Ok(Async::Ready((_, false))) => (),
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
return Err(HttpResponseParserError::Error(err.into()))
@ -74,8 +69,6 @@ impl HttpResponseParser {
}
}
}
}
}
pub fn parse_payload<T>(
&mut self, io: &mut T, buf: &mut BytesMut,

View File

@ -22,13 +22,14 @@ use super::{HttpHandler, HttpHandlerTask, IoStream};
const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! {
struct Flags: u8 {
pub struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
const DISCONNECTED = 0b0001_0000;
const POLLED = 0b0010_0000;
const READ_DISCONNECTED = 0b0001_0000;
const WRITE_DISCONNECTED = 0b0010_0000;
const POLLED = 0b0100_0000;
}
}
@ -93,7 +94,7 @@ where
buf: BytesMut, is_eof: bool,
) -> Self {
Http1 {
flags: Flags::KEEPALIVE | if is_eof { Flags::DISCONNECTED } else { Flags::empty() },
flags: if is_eof { Flags::READ_DISCONNECTED } else { Flags::KEEPALIVE },
stream: H1Writer::new(stream, Rc::clone(&settings)),
decoder: H1Decoder::new(),
payload: None,
@ -117,6 +118,10 @@ where
#[inline]
fn can_read(&self) -> bool {
if self.flags.intersects(Flags::ERROR | Flags::READ_DISCONNECTED) {
return false
}
if let Some(ref info) = self.payload {
info.need_read() == PayloadStatus::Read
} else {
@ -125,6 +130,8 @@ where
}
fn notify_disconnect(&mut self) {
self.flags.insert(Flags::WRITE_DISCONNECTED);
// notify all tasks
self.stream.disconnected();
for task in &mut self.tasks {
@ -163,11 +170,15 @@ where
// shutdown
if self.flags.contains(Flags::SHUTDOWN) {
if self.flags.intersects(
Flags::ERROR | Flags::READ_DISCONNECTED | Flags::WRITE_DISCONNECTED) {
return Ok(Async::Ready(()))
}
match self.stream.poll_completed(true) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Err(err) => {
debug!("Error sending data: {}", err);
debug!("Error sendips ng data: {}", err);
return Err(());
}
}
@ -197,11 +208,9 @@ where
self.flags.insert(Flags::POLLED);
return;
}
// read io from socket
if !self.flags.intersects(Flags::ERROR)
&& self.tasks.len() < MAX_PIPELINED_MESSAGES
&& self.can_read()
{
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
match self.stream.get_mut().read_available(&mut self.buf) {
Ok(Async::Ready((read_some, disconnected))) => {
if read_some {
@ -209,7 +218,7 @@ where
}
if disconnected {
// delay disconnect until all tasks have finished.
self.flags.insert(Flags::DISCONNECTED);
self.flags.insert(Flags::READ_DISCONNECTED);
if self.tasks.is_empty() {
self.client_disconnect();
}
@ -231,7 +240,9 @@ where
let mut idx = 0;
while idx < self.tasks.len() {
// only one task can do io operation in http/1
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF) {
if !io && !self.tasks[idx].flags.contains(EntryFlags::EOF)
&& !self.flags.contains(Flags::WRITE_DISCONNECTED)
{
// io is corrupted, send buffer
if self.tasks[idx].flags.contains(EntryFlags::ERROR) {
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
@ -295,7 +306,6 @@ where
}
// cleanup finished tasks
let max = self.tasks.len() >= MAX_PIPELINED_MESSAGES;
while !self.tasks.is_empty() {
if self.tasks[0]
.flags
@ -306,15 +316,13 @@ where
break;
}
}
// read more message
if max && self.tasks.len() >= MAX_PIPELINED_MESSAGES {
return Ok(Async::Ready(true));
}
// check stream state
if self.flags.contains(Flags::STARTED) {
match self.stream.poll_completed(false) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::NotReady) => {
return Ok(Async::NotReady)
},
Err(err) => {
debug!("Error sending data: {}", err);
self.notify_disconnect();
@ -332,8 +340,7 @@ where
// deal with keep-alive and steam eof (client-side write shutdown)
if self.tasks.is_empty() {
// handle stream eof
if self.flags.contains(Flags::DISCONNECTED) {
self.client_disconnect();
if self.flags.contains(Flags::READ_DISCONNECTED) {
return Ok(Async::Ready(false));
}
// no keep-alive
@ -451,7 +458,12 @@ where
break;
}
}
Ok(None) => break,
Ok(None) => {
if self.flags.contains(Flags::READ_DISCONNECTED) && self.tasks.is_empty() {
self.client_disconnect();
}
break
},
Err(e) => {
self.flags.insert(Flags::ERROR);
if let Some(mut payload) = self.payload.take() {

View File

@ -63,7 +63,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
self.flags = Flags::KEEPALIVE;
}
pub fn disconnected(&mut self) {}
pub fn disconnected(&mut self) {
self.flags.insert(Flags::DISCONNECTED);
}
pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
@ -268,10 +270,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
let pl: &[u8] = payload.as_ref();
let n = match Self::write_data(&mut self.stream, pl) {
Err(err) => {
if err.kind() == io::ErrorKind::WriteZero {
self.disconnected();
}
return Err(err);
}
Ok(val) => val,
@ -315,14 +314,15 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
if self.flags.contains(Flags::DISCONNECTED) {
return Err(io::Error::new(io::ErrorKind::Other, "disconnected"));
}
if !self.buffer.is_empty() {
let written = {
match Self::write_data(&mut self.stream, self.buffer.as_ref().as_ref()) {
Err(err) => {
if err.kind() == io::ErrorKind::WriteZero {
self.disconnected();
}
return Err(err);
}
Ok(val) => val,
@ -339,7 +339,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
self.stream.poll_flush()?;
self.stream.shutdown()
} else {
self.stream.poll_flush()
Ok(self.stream.poll_flush()?)
}
}
}

View File

@ -66,6 +66,16 @@ fn test_simple() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
#[test]
fn test_connection_close() {
let mut srv =
test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR)));
let request = srv.get().header("Connection", "close").finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
}
#[test]
fn test_with_query_parameter() {
let mut srv = test::TestServer::new(|app| {