From 3872d3ba5a3a1d2eacf41fffd20ee66d50ed44fd Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Sat, 6 Apr 2019 00:16:04 -0700
Subject: [PATCH] refactor h1 dispatcher

---
 actix-http/src/h1/codec.rs      |  19 +-
 actix-http/src/h1/decoder.rs    |   4 +-
 actix-http/src/h1/dispatcher.rs | 385 +++++++++++++++++++++-----------
 actix-http/src/h1/encoder.rs    |   6 -
 actix-http/src/response.rs      |  12 -
 actix-http/src/service.rs       |  11 +-
 actix-http/tests/test_server.rs |   4 +-
 tests/test_server.rs            |   2 +-
 8 files changed, 275 insertions(+), 168 deletions(-)

diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs
index 64731ac9..3834254a 100644
--- a/actix-http/src/h1/codec.rs
+++ b/actix-http/src/h1/codec.rs
@@ -9,7 +9,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCOD
 use http::{Method, StatusCode, Version};
 
 use super::decoder::{PayloadDecoder, PayloadItem, PayloadType};
-use super::{decoder, encoder, reserve_readbuf};
+use super::{decoder, encoder};
 use super::{Message, MessageType};
 use crate::body::BodySize;
 use crate::config::ServiceConfig;
@@ -31,7 +31,7 @@ const AVERAGE_HEADER_SIZE: usize = 30;
 
 /// HTTP/1 Codec
 pub struct Codec {
-    config: ServiceConfig,
+    pub(crate) config: ServiceConfig,
     decoder: decoder::MessageDecoder<Request>,
     payload: Option<PayloadDecoder>,
     version: Version,
@@ -78,16 +78,25 @@ impl Codec {
         }
     }
 
+    #[inline]
     /// Check if request is upgrade
     pub fn upgrade(&self) -> bool {
         self.ctype == ConnectionType::Upgrade
     }
 
+    #[inline]
     /// Check if last response is keep-alive
     pub fn keepalive(&self) -> bool {
         self.ctype == ConnectionType::KeepAlive
     }
 
+    #[inline]
+    /// Check if keep-alive enabled on server level
+    pub fn keepalive_enabled(&self) -> bool {
+        self.flags.contains(Flags::KEEPALIVE_ENABLED)
+    }
+
+    #[inline]
     /// Check last request's message type
     pub fn message_type(&self) -> MessageType {
         if self.flags.contains(Flags::STREAM) {
@@ -107,10 +116,7 @@ impl Decoder for Codec {
     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
         if self.payload.is_some() {
             Ok(match self.payload.as_mut().unwrap().decode(src)? {
-                Some(PayloadItem::Chunk(chunk)) => {
-                    reserve_readbuf(src);
-                    Some(Message::Chunk(Some(chunk)))
-                }
+                Some(PayloadItem::Chunk(chunk)) => Some(Message::Chunk(Some(chunk))),
                 Some(PayloadItem::Eof) => {
                     self.payload.take();
                     Some(Message::Chunk(None))
@@ -135,7 +141,6 @@ impl Decoder for Codec {
                     self.flags.insert(Flags::STREAM);
                 }
             }
-            reserve_readbuf(src);
             Ok(Some(Message::Item(req)))
         } else {
             Ok(None)
diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs
index 10652d62..88de9bc6 100644
--- a/actix-http/src/h1/decoder.rs
+++ b/actix-http/src/h1/decoder.rs
@@ -84,7 +84,9 @@ pub(crate) trait MessageType: Sized {
                         header::CONTENT_LENGTH => {
                             if let Ok(s) = value.to_str() {
                                 if let Ok(len) = s.parse::<u64>() {
-                                    content_length = Some(len);
+                                    if len != 0 {
+                                        content_length = Some(len);
+                                    }
                                 } else {
                                     debug!("illegal Content-Length: {:?}", s);
                                     return Err(ParseError::Header);
diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs
index e2306fde..61c284a9 100644
--- a/actix-http/src/h1/dispatcher.rs
+++ b/actix-http/src/h1/dispatcher.rs
@@ -1,20 +1,20 @@
 use std::collections::VecDeque;
-use std::mem;
 use std::time::Instant;
+use std::{fmt, io};
 
-use actix_codec::{AsyncRead, AsyncWrite, Framed};
+use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
 use actix_service::Service;
 use actix_utils::cloneable::CloneableService;
 use bitflags::bitflags;
-use futures::{Async, Future, Poll, Sink, Stream};
-use log::{debug, error, trace};
+use bytes::{BufMut, BytesMut};
+use futures::{Async, Future, Poll};
+use log::{error, trace};
 use tokio_timer::Delay;
 
 use crate::body::{Body, BodySize, MessageBody, ResponseBody};
 use crate::config::ServiceConfig;
 use crate::error::{DispatchError, Error};
 use crate::error::{ParseError, PayloadError};
-use crate::http::StatusCode;
 use crate::request::Request;
 use crate::response::Response;
 
@@ -22,17 +22,19 @@ use super::codec::Codec;
 use super::payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
 use super::{Message, MessageType};
 
+const LW_BUFFER_SIZE: usize = 4096;
+const HW_BUFFER_SIZE: usize = 32_768;
 const MAX_PIPELINED_MESSAGES: usize = 16;
 
 bitflags! {
     pub struct Flags: u8 {
         const STARTED            = 0b0000_0001;
-        const KEEPALIVE_ENABLED  = 0b0000_0010;
-        const KEEPALIVE          = 0b0000_0100;
-        const POLLED             = 0b0000_1000;
-        const SHUTDOWN           = 0b0010_0000;
-        const DISCONNECTED       = 0b0100_0000;
-        const DROPPING           = 0b1000_0000;
+        const KEEPALIVE          = 0b0000_0010;
+        const POLLED             = 0b0000_0100;
+        const SHUTDOWN           = 0b0000_1000;
+        const READ_DISCONNECT    = 0b0001_0000;
+        const WRITE_DISCONNECT   = 0b0010_0000;
+        const DROPPING           = 0b0100_0000;
     }
 }
 
@@ -59,9 +61,7 @@ where
     service: CloneableService<S>,
     expect: CloneableService<X>,
     flags: Flags,
-    framed: Framed<T, Codec>,
     error: Option<DispatchError>,
-    config: ServiceConfig,
 
     state: State<S, B, X>,
     payload: Option<PayloadSender>,
@@ -69,6 +69,11 @@ where
 
     ka_expire: Instant,
     ka_timer: Option<Delay>,
+
+    io: T,
+    read_buf: BytesMut,
+    write_buf: BytesMut,
+    codec: Codec,
 }
 
 enum DispatcherMessage {
@@ -101,6 +106,30 @@ where
             false
         }
     }
+
+    fn is_call(&self) -> bool {
+        if let State::ServiceCall(_) = self {
+            true
+        } else {
+            false
+        }
+    }
+}
+
+impl<S, B, X> fmt::Debug for State<S, B, X>
+where
+    S: Service<Request = Request>,
+    X: Service<Request = Request, Response = Request>,
+    B: MessageBody,
+{
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            State::None => write!(f, "State::None"),
+            State::ExpectCall(_) => write!(f, "State::ExceptCall"),
+            State::ServiceCall(_) => write!(f, "State::ServiceCall"),
+            State::SendPayload(_) => write!(f, "State::SendPayload"),
+        }
+    }
 }
 
 impl<T, S, B, X> Dispatcher<T, S, B, X>
@@ -121,8 +150,10 @@ where
         expect: CloneableService<X>,
     ) -> Self {
         Dispatcher::with_timeout(
-            Framed::new(stream, Codec::new(config.clone())),
+            stream,
+            Codec::new(config.clone()),
             config,
+            BytesMut::with_capacity(HW_BUFFER_SIZE),
             None,
             service,
             expect,
@@ -131,15 +162,17 @@ where
 
     /// Create http/1 dispatcher with slow request timeout.
     pub fn with_timeout(
-        framed: Framed<T, Codec>,
+        io: T,
+        codec: Codec,
         config: ServiceConfig,
+        read_buf: BytesMut,
         timeout: Option<Delay>,
         service: CloneableService<S>,
         expect: CloneableService<X>,
     ) -> Self {
         let keepalive = config.keep_alive_enabled();
         let flags = if keepalive {
-            Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED
+            Flags::KEEPALIVE
         } else {
             Flags::empty()
         };
@@ -155,7 +188,10 @@ where
 
         Dispatcher {
             inner: Some(InnerDispatcher {
-                framed,
+                io,
+                codec,
+                read_buf,
+                write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
                 payload: None,
                 state: State::None,
                 error: None,
@@ -163,7 +199,6 @@ where
                 service,
                 expect,
                 flags,
-                config,
                 ka_expire,
                 ka_timer,
             }),
@@ -182,11 +217,9 @@ where
     X::Error: Into<Error>,
 {
     fn can_read(&self) -> bool {
-        if self.flags.contains(Flags::DISCONNECTED) {
+        if self.flags.contains(Flags::READ_DISCONNECT) {
             return false;
-        }
-
-        if let Some(ref info) = self.payload {
+        } else if let Some(ref info) = self.payload {
             info.need_read() == PayloadStatus::Read
         } else {
             true
@@ -195,32 +228,52 @@ where
 
     // if checked is set to true, delay disconnect until all tasks have finished.
     fn client_disconnected(&mut self) {
-        self.flags.insert(Flags::DISCONNECTED);
+        self.flags
+            .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
         if let Some(mut payload) = self.payload.take() {
             payload.set_error(PayloadError::Incomplete(None));
         }
     }
 
     /// Flush stream
-    fn poll_flush(&mut self) -> Poll<bool, DispatchError> {
-        if !self.framed.is_write_buf_empty() {
-            match self.framed.poll_complete() {
-                Ok(Async::NotReady) => Ok(Async::NotReady),
-                Err(err) => {
-                    debug!("Error sending data: {}", err);
-                    Err(err.into())
-                }
-                Ok(Async::Ready(_)) => {
-                    // if payload is not consumed we can not use connection
-                    if self.payload.is_some() && self.state.is_empty() {
-                        return Err(DispatchError::PayloadIsNotConsumed);
-                    }
-                    Ok(Async::Ready(true))
-                }
-            }
-        } else {
-            Ok(Async::Ready(false))
+    ///
+    /// true - got whouldblock
+    /// false - didnt get whouldblock
+    fn poll_flush(&mut self) -> Result<bool, DispatchError> {
+        if self.write_buf.is_empty() {
+            return Ok(false);
         }
+
+        let len = self.write_buf.len();
+        let mut written = 0;
+        while written < len {
+            match self.io.write(&self.write_buf[written..]) {
+                Ok(0) => {
+                    return Err(DispatchError::Io(io::Error::new(
+                        io::ErrorKind::WriteZero,
+                        "",
+                    )));
+                }
+                Ok(n) => {
+                    written += n;
+                }
+                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+                    if written > 0 {
+                        let _ = self.write_buf.split_to(written);
+                    }
+                    return Ok(true);
+                }
+                Err(err) => return Err(DispatchError::Io(err)),
+            }
+        }
+        if written > 0 {
+            if written == self.write_buf.len() {
+                unsafe { self.write_buf.set_len(0) }
+            } else {
+                let _ = self.write_buf.split_to(written);
+            }
+        }
+        Ok(false)
     }
 
     fn send_response(
@@ -228,8 +281,8 @@ where
         message: Response<()>,
         body: ResponseBody<B>,
     ) -> Result<State<S, B, X>, DispatchError> {
-        self.framed
-            .force_send(Message::Item((message, body.length())))
+        self.codec
+            .encode(Message::Item((message, body.length())), &mut self.write_buf)
             .map_err(|err| {
                 if let Some(mut payload) = self.payload.take() {
                     payload.set_error(PayloadError::Incomplete(None));
@@ -237,113 +290,109 @@ where
                 DispatchError::Io(err)
             })?;
 
-        self.flags
-            .set(Flags::KEEPALIVE, self.framed.get_codec().keepalive());
+        self.flags.set(Flags::KEEPALIVE, self.codec.keepalive());
         match body.length() {
             BodySize::None | BodySize::Empty => Ok(State::None),
             _ => Ok(State::SendPayload(body)),
         }
     }
 
-    fn send_continue(&mut self) -> Result<(), DispatchError> {
-        self.framed
-            .force_send(Message::Item((
-                Response::empty(StatusCode::CONTINUE),
-                BodySize::Empty,
-            )))
-            .map_err(|err| DispatchError::Io(err))
+    fn send_continue(&mut self) {
+        self.write_buf
+            .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
     }
 
-    fn poll_response(&mut self) -> Result<(), DispatchError> {
-        let mut retry = self.can_read();
+    fn poll_response(&mut self) -> Result<bool, DispatchError> {
         loop {
-            let state = match mem::replace(&mut self.state, State::None) {
+            let state = match self.state {
                 State::None => match self.messages.pop_front() {
                     Some(DispatcherMessage::Item(req)) => {
                         Some(self.handle_request(req)?)
                     }
                     Some(DispatcherMessage::Error(res)) => {
-                        self.send_response(res, ResponseBody::Other(Body::Empty))?;
-                        None
+                        Some(self.send_response(res, ResponseBody::Other(Body::Empty))?)
                     }
                     None => None,
                 },
-                State::ExpectCall(mut fut) => match fut.poll() {
+                State::ExpectCall(ref mut fut) => match fut.poll() {
                     Ok(Async::Ready(req)) => {
-                        self.send_continue()?;
-                        Some(State::ServiceCall(self.service.call(req)))
-                    }
-                    Ok(Async::NotReady) => {
-                        self.state = State::ExpectCall(fut);
-                        None
+                        self.send_continue();
+                        self.state = State::ServiceCall(self.service.call(req));
+                        continue;
                     }
+                    Ok(Async::NotReady) => None,
                     Err(e) => {
-                        let e = e.into();
-                        let res: Response = e.into();
+                        let res: Response = e.into().into();
                         let (res, body) = res.replace_body(());
                         Some(self.send_response(res, body.into_body())?)
                     }
                 },
-                State::ServiceCall(mut fut) => match fut.poll() {
+                State::ServiceCall(ref mut fut) => match fut.poll() {
                     Ok(Async::Ready(res)) => {
                         let (res, body) = res.into().replace_body(());
-                        Some(self.send_response(res, body)?)
+                        self.state = self.send_response(res, body)?;
+                        continue;
                     }
-                    Ok(Async::NotReady) => {
-                        self.state = State::ServiceCall(fut);
-                        None
-                    }
-                    Err(_e) => {
-                        let res: Response = Response::InternalServerError().finish();
+                    Ok(Async::NotReady) => None,
+                    Err(e) => {
+                        let res: Response = e.into().into();
                         let (res, body) = res.replace_body(());
                         Some(self.send_response(res, body.into_body())?)
                     }
                 },
-                State::SendPayload(mut stream) => {
+                State::SendPayload(ref mut stream) => {
                     loop {
-                        if !self.framed.is_write_buf_full() {
+                        if self.write_buf.len() < HW_BUFFER_SIZE {
                             match stream
                                 .poll_next()
                                 .map_err(|_| DispatchError::Unknown)?
                             {
                                 Async::Ready(Some(item)) => {
-                                    self.framed
-                                        .force_send(Message::Chunk(Some(item)))?;
+                                    self.codec.encode(
+                                        Message::Chunk(Some(item)),
+                                        &mut self.write_buf,
+                                    )?;
                                     continue;
                                 }
                                 Async::Ready(None) => {
-                                    self.framed.force_send(Message::Chunk(None))?;
-                                }
-                                Async::NotReady => {
-                                    self.state = State::SendPayload(stream);
-                                    return Ok(());
+                                    self.codec.encode(
+                                        Message::Chunk(None),
+                                        &mut self.write_buf,
+                                    )?;
+                                    self.state = State::None;
                                 }
+                                Async::NotReady => return Ok(false),
                             }
                         } else {
-                            self.state = State::SendPayload(stream);
-                            return Ok(());
+                            return Ok(true);
                         }
                         break;
                     }
-                    None
+                    continue;
                 }
             };
 
-            match state {
-                Some(state) => self.state = state,
-                None => {
-                    // if read-backpressure is enabled and we consumed some data.
-                    // we may read more data and retry
-                    if !retry && self.can_read() && self.poll_request()? {
-                        retry = self.can_read();
+            // set new state
+            if let Some(state) = state {
+                self.state = state;
+                if !self.state.is_empty() {
+                    continue;
+                }
+            } else {
+                // if read-backpressure is enabled and we consumed some data.
+                // we may read more data and retry
+                if self.state.is_call() {
+                    if self.poll_request()? {
                         continue;
                     }
-                    break;
+                } else if !self.messages.is_empty() {
+                    continue;
                 }
             }
+            break;
         }
 
-        Ok(())
+        Ok(false)
     }
 
     fn handle_request(&mut self, req: Request) -> Result<State<S, B, X>, DispatchError> {
@@ -352,7 +401,7 @@ where
             let mut task = self.expect.call(req);
             match task.poll() {
                 Ok(Async::Ready(req)) => {
-                    self.send_continue()?;
+                    self.send_continue();
                     req
                 }
                 Ok(Async::NotReady) => return Ok(State::ExpectCall(task)),
@@ -375,8 +424,8 @@ where
                 self.send_response(res, body)
             }
             Ok(Async::NotReady) => Ok(State::ServiceCall(task)),
-            Err(_e) => {
-                let res: Response = Response::InternalServerError().finish();
+            Err(e) => {
+                let res: Response = e.into().into();
                 let (res, body) = res.replace_body(());
                 self.send_response(res, body.into_body())
             }
@@ -386,20 +435,20 @@ where
     /// Process one incoming requests
     pub(self) fn poll_request(&mut self) -> Result<bool, DispatchError> {
         // limit a mount of non processed requests
-        if self.messages.len() >= MAX_PIPELINED_MESSAGES {
+        if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read() {
             return Ok(false);
         }
 
         let mut updated = false;
         loop {
-            match self.framed.poll() {
-                Ok(Async::Ready(Some(msg))) => {
+            match self.codec.decode(&mut self.read_buf) {
+                Ok(Some(msg)) => {
                     updated = true;
                     self.flags.insert(Flags::STARTED);
 
                     match msg {
                         Message::Item(mut req) => {
-                            match self.framed.get_codec().message_type() {
+                            match self.codec.message_type() {
                                 MessageType::Payload | MessageType::Stream => {
                                     let (ps, pl) = Payload::create(false);
                                     let (req1, _) =
@@ -424,7 +473,7 @@ where
                                 error!(
                                     "Internal server error: unexpected payload chunk"
                                 );
-                                self.flags.insert(Flags::DISCONNECTED);
+                                self.flags.insert(Flags::READ_DISCONNECT);
                                 self.messages.push_back(DispatcherMessage::Error(
                                     Response::InternalServerError().finish().drop_body(),
                                 ));
@@ -437,7 +486,7 @@ where
                                 payload.feed_eof();
                             } else {
                                 error!("Internal server error: unexpected eof");
-                                self.flags.insert(Flags::DISCONNECTED);
+                                self.flags.insert(Flags::READ_DISCONNECT);
                                 self.messages.push_back(DispatcherMessage::Error(
                                     Response::InternalServerError().finish().drop_body(),
                                 ));
@@ -447,11 +496,7 @@ where
                         }
                     }
                 }
-                Ok(Async::Ready(None)) => {
-                    self.client_disconnected();
-                    break;
-                }
-                Ok(Async::NotReady) => break,
+                Ok(None) => break,
                 Err(ParseError::Io(e)) => {
                     self.client_disconnected();
                     self.error = Some(DispatchError::Io(e));
@@ -466,15 +511,15 @@ where
                     self.messages.push_back(DispatcherMessage::Error(
                         Response::BadRequest().finish().drop_body(),
                     ));
-                    self.flags.insert(Flags::DISCONNECTED);
+                    self.flags.insert(Flags::READ_DISCONNECT);
                     self.error = Some(e.into());
                     break;
                 }
             }
         }
 
-        if self.ka_timer.is_some() && updated {
-            if let Some(expire) = self.config.keep_alive_expire() {
+        if updated && self.ka_timer.is_some() {
+            if let Some(expire) = self.codec.config.keep_alive_expire() {
                 self.ka_expire = expire;
             }
         }
@@ -486,10 +531,10 @@ where
         if self.ka_timer.is_none() {
             // shutdown timeout
             if self.flags.contains(Flags::SHUTDOWN) {
-                if let Some(interval) = self.config.client_disconnect_timer() {
+                if let Some(interval) = self.codec.config.client_disconnect_timer() {
                     self.ka_timer = Some(Delay::new(interval));
                 } else {
-                    self.flags.insert(Flags::DISCONNECTED);
+                    self.flags.insert(Flags::READ_DISCONNECT);
                     return Ok(());
                 }
             } else {
@@ -507,13 +552,14 @@ where
                     return Err(DispatchError::DisconnectTimeout);
                 } else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire {
                     // check for any outstanding tasks
-                    if self.state.is_empty() && self.framed.is_write_buf_empty() {
+                    if self.state.is_empty() && self.write_buf.is_empty() {
                         if self.flags.contains(Flags::STARTED) {
                             trace!("Keep-alive timeout, close connection");
                             self.flags.insert(Flags::SHUTDOWN);
 
                             // start shutdown timer
-                            if let Some(deadline) = self.config.client_disconnect_timer()
+                            if let Some(deadline) =
+                                self.codec.config.client_disconnect_timer()
                             {
                                 if let Some(timer) = self.ka_timer.as_mut() {
                                     timer.reset(deadline);
@@ -521,7 +567,7 @@ where
                                 }
                             } else {
                                 // no shutdown timeout, drop socket
-                                self.flags.insert(Flags::DISCONNECTED);
+                                self.flags.insert(Flags::WRITE_DISCONNECT);
                                 return Ok(());
                             }
                         } else {
@@ -538,7 +584,8 @@ where
                             self.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
                             self.state = State::None;
                         }
-                    } else if let Some(deadline) = self.config.keep_alive_expire() {
+                    } else if let Some(deadline) = self.codec.config.keep_alive_expire()
+                    {
                         if let Some(timer) = self.ka_timer.as_mut() {
                             timer.reset(deadline);
                             let _ = timer.poll();
@@ -572,34 +619,60 @@ where
     #[inline]
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
         let inner = self.inner.as_mut().unwrap();
+        inner.poll_keepalive()?;
 
         if inner.flags.contains(Flags::SHUTDOWN) {
-            inner.poll_keepalive()?;
-            if inner.flags.contains(Flags::DISCONNECTED) {
+            if inner.flags.contains(Flags::WRITE_DISCONNECT) {
                 Ok(Async::Ready(()))
             } else {
-                // try_ready!(inner.poll_flush());
-                match inner.framed.get_mut().shutdown()? {
-                    Async::Ready(_) => Ok(Async::Ready(())),
-                    Async::NotReady => Ok(Async::NotReady),
+                // flush buffer
+                inner.poll_flush()?;
+                if !inner.write_buf.is_empty() {
+                    Ok(Async::NotReady)
+                } else {
+                    match inner.io.shutdown()? {
+                        Async::Ready(_) => Ok(Async::Ready(())),
+                        Async::NotReady => Ok(Async::NotReady),
+                    }
                 }
             }
         } else {
-            inner.poll_keepalive()?;
+            // read socket into a buf
+            if !inner.flags.contains(Flags::READ_DISCONNECT) {
+                if let Some(true) = read_available(&mut inner.io, &mut inner.read_buf)? {
+                    inner.flags.insert(Flags::READ_DISCONNECT)
+                }
+            }
+
             inner.poll_request()?;
             loop {
-                inner.poll_response()?;
-                if let Async::Ready(false) = inner.poll_flush()? {
+                if inner.write_buf.remaining_mut() < LW_BUFFER_SIZE {
+                    inner.write_buf.reserve(HW_BUFFER_SIZE);
+                }
+                let need_write = inner.poll_response()?;
+
+                // we didnt get WouldBlock from write operation,
+                // so data get written to kernel completely (OSX)
+                // and we have to write again otherwise response can get stuck
+                if inner.poll_flush()? || !need_write {
                     break;
                 }
             }
 
-            if inner.flags.contains(Flags::DISCONNECTED) {
+            // client is gone
+            if inner.flags.contains(Flags::WRITE_DISCONNECT) {
                 return Ok(Async::Ready(()));
             }
 
+            let is_empty = inner.state.is_empty();
+
+            // read half is closed and we do not processing any responses
+            if inner.flags.contains(Flags::READ_DISCONNECT) && is_empty {
+                inner.flags.insert(Flags::SHUTDOWN);
+            }
+
             // keep-alive and stream errors
-            if inner.state.is_empty() && inner.framed.is_write_buf_empty() {
+            if is_empty && inner.write_buf.is_empty() {
                 if let Some(err) = inner.error.take() {
                     Err(err)
                 }
@@ -623,13 +696,52 @@ where
     }
 }
 
+fn read_available<T>(io: &mut T, buf: &mut BytesMut) -> Result<Option<bool>, io::Error>
+where
+    T: io::Read,
+{
+    let mut read_some = false;
+    loop {
+        if buf.remaining_mut() < LW_BUFFER_SIZE {
+            buf.reserve(HW_BUFFER_SIZE);
+        }
+
+        let read = unsafe { io.read(buf.bytes_mut()) };
+        match read {
+            Ok(n) => {
+                if n == 0 {
+                    return Ok(Some(true));
+                } else {
+                    read_some = true;
+                    unsafe {
+                        buf.advance_mut(n);
+                    }
+                }
+            }
+            Err(e) => {
+                return if e.kind() == io::ErrorKind::WouldBlock {
+                    if read_some {
+                        Ok(Some(false))
+                    } else {
+                        Ok(None)
+                    }
+                } else if e.kind() == io::ErrorKind::ConnectionReset && read_some {
+                    Ok(Some(true))
+                } else {
+                    Err(e)
+                };
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::{cmp, io};
 
     use actix_codec::{AsyncRead, AsyncWrite};
     use actix_service::IntoService;
-    use bytes::{Buf, Bytes};
+    use bytes::{Buf, Bytes, BytesMut};
     use futures::future::{lazy, ok};
 
     use super::*;
@@ -638,6 +750,7 @@ mod tests {
 
     struct Buffer {
         buf: Bytes,
+        write_buf: BytesMut,
         err: Option<io::Error>,
     }
 
@@ -645,6 +758,7 @@ mod tests {
         fn new(data: &'static str) -> Buffer {
             Buffer {
                 buf: Bytes::from(data),
+                write_buf: BytesMut::new(),
                 err: None,
             }
         }
@@ -670,6 +784,7 @@ mod tests {
 
     impl io::Write for Buffer {
         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+            self.write_buf.extend(buf);
             Ok(buf.len())
         }
         fn flush(&mut self) -> io::Result<()> {
@@ -699,15 +814,17 @@ mod tests {
                 ),
                 CloneableService::new(ExpectHandler),
             );
-            assert!(h1.poll().is_ok());
-            assert!(h1.poll().is_ok());
+            assert!(h1.poll().is_err());
             assert!(h1
                 .inner
                 .as_ref()
                 .unwrap()
                 .flags
-                .contains(Flags::DISCONNECTED));
-            // assert_eq!(h1.tasks.len(), 1);
+                .contains(Flags::READ_DISCONNECT));
+            assert_eq!(
+                &h1.inner.as_ref().unwrap().io.write_buf[..26],
+                b"HTTP/1.1 400 Bad Request\r\n"
+            );
             ok::<_, ()>(())
         }));
     }
diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs
index 382ebe5f..374b8bec 100644
--- a/actix-http/src/h1/encoder.rs
+++ b/actix-http/src/h1/encoder.rs
@@ -41,8 +41,6 @@ impl<T: MessageType> Default for MessageEncoder<T> {
 pub(crate) trait MessageType: Sized {
     fn status(&self) -> Option<StatusCode>;
 
-    // fn connection_type(&self) -> Option<ConnectionType>;
-
     fn headers(&self) -> &HeaderMap;
 
     fn chunked(&self) -> bool;
@@ -171,10 +169,6 @@ impl MessageType for Response<()> {
         self.head().chunked()
     }
 
-    //fn connection_type(&self) -> Option<ConnectionType> {
-    //    self.head().ctype
-    //}
-
     fn headers(&self) -> &HeaderMap {
         &self.head().headers
     }
diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs
index 0c8c2eef..c3fed133 100644
--- a/actix-http/src/response.rs
+++ b/actix-http/src/response.rs
@@ -51,18 +51,6 @@ impl Response<Body> {
         }
     }
 
-    #[inline]
-    pub(crate) fn empty(status: StatusCode) -> Response<()> {
-        let mut head: Message<ResponseHead> = Message::new();
-        head.status = status;
-
-        Response {
-            head,
-            body: ResponseBody::Body(()),
-            error: None,
-        }
-    }
-
     /// Constructs an error response
     #[inline]
     pub fn from_error(error: Error) -> Response {
diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs
index f259e302..57ab6ec2 100644
--- a/actix-http/src/service.rs
+++ b/actix-http/src/service.rs
@@ -1,7 +1,7 @@
 use std::marker::PhantomData;
 use std::{fmt, io};
 
-use actix_codec::{AsyncRead, AsyncWrite, Framed, FramedParts};
+use actix_codec::{AsyncRead, AsyncWrite};
 use actix_server_config::{Io as ServerIo, Protocol, ServerConfig as SrvConfig};
 use actix_service::{IntoNewService, NewService, Service};
 use actix_utils::cloneable::CloneableService;
@@ -375,13 +375,14 @@ where
                     self.state =
                         State::Handshake(Some((server::handshake(io), cfg, srv)));
                 } else {
-                    let framed = Framed::from_parts(FramedParts::with_read_buf(
+                    self.state = State::H1(h1::Dispatcher::with_timeout(
                         io,
                         h1::Codec::new(cfg.clone()),
+                        cfg,
                         buf,
-                    ));
-                    self.state = State::H1(h1::Dispatcher::with_timeout(
-                        framed, cfg, None, srv, expect,
+                        None,
+                        srv,
+                        expect,
                     ))
                 }
                 self.poll()
diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs
index d777d3d1..da41492f 100644
--- a/actix-http/tests/test_server.rs
+++ b/actix-http/tests/test_server.rs
@@ -911,11 +911,11 @@ fn test_h1_service_error() {
     });
 
     let response = srv.block_on(srv.get("/").send()).unwrap();
-    assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
+    assert_eq!(response.status(), http::StatusCode::BAD_REQUEST);
 
     // read response
     let bytes = srv.load_body(response).unwrap();
-    assert!(bytes.is_empty());
+    assert_eq!(bytes, Bytes::from_static(b"error"));
 }
 
 #[cfg(feature = "ssl")]
diff --git a/tests/test_server.rs b/tests/test_server.rs
index 7c91d4fe..76ce2c76 100644
--- a/tests/test_server.rs
+++ b/tests/test_server.rs
@@ -770,7 +770,7 @@ fn test_reading_deflate_encoding_large_random_ssl() {
                 awc::Connector::new()
                     .timeout(std::time::Duration::from_millis(500))
                     .ssl(builder.build())
-                    .service(),
+                    .finish(),
             )
             .finish()
     });