From c2751efa871119b1fe6318c66bfe563fd3930bb5 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Wed, 13 Dec 2017 21:38:47 -0800
Subject: [PATCH] refactor keep-alive; update guide

---
 build.rs             |  2 +-
 guide/src/SUMMARY.md |  2 +-
 guide/src/qs_3.md    | 43 +++++++++++++++++++++++
 guide/src/qs_6.md    | 37 --------------------
 src/channel.rs       |  5 +--
 src/h1.rs            | 41 ++++++++++++----------
 src/h2.rs            | 43 ++++++++++++++---------
 src/lib.rs           |  1 +
 src/server.rs        | 82 ++++++++++++++++++++++++++++++++++----------
 9 files changed, 161 insertions(+), 95 deletions(-)
 delete mode 100644 guide/src/qs_6.md

diff --git a/build.rs b/build.rs
index 3b916a95..081d2b50 100644
--- a/build.rs
+++ b/build.rs
@@ -15,10 +15,10 @@ fn main() {
               "guide/src/qs_1.md",
               "guide/src/qs_2.md",
               "guide/src/qs_3.md",
+              "guide/src/qs_3_5.md",
               "guide/src/qs_4.md",
               "guide/src/qs_4_5.md",
               "guide/src/qs_5.md",
-              "guide/src/qs_6.md",
               "guide/src/qs_7.md",
               "guide/src/qs_9.md",
               "guide/src/qs_10.md",
diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md
index a9befac8..e260000a 100644
--- a/guide/src/SUMMARY.md
+++ b/guide/src/SUMMARY.md
@@ -3,9 +3,9 @@
 [Quickstart](./qs_1.md)
 - [Getting Started](./qs_2.md)
 - [Application](./qs_3.md)
+- [Server](./qs_3_5.md)
 - [Handler](./qs_4.md)
 - [Errors](./qs_4_5.md)
-- [State](./qs_6.md)
 - [URL Dispatch](./qs_5.md)
 - [Request & Response](./qs_7.md)
 - [WebSockets](./qs_9.md)
diff --git a/guide/src/qs_3.md b/guide/src/qs_3.md
index 51e82d49..909ba292 100644
--- a/guide/src/qs_3.md
+++ b/guide/src/qs_3.md
@@ -56,3 +56,46 @@ fn main() {
 ```
 
 All `/app1` requests route to first application, `/app2` to second and then all other to third.
+
+## State
+
+Application state is shared with all routes and resources within same application.
+State could be accessed with `HttpRequest::state()` method as a read-only item
+but interior mutability pattern with `RefCell` could be used to archive state mutability.
+State could be accessed with `HttpContext::state()` in case of http actor. 
+State also available to route matching predicates and middlewares.
+
+Let's write simple application that uses shared state. We are going to store requests count
+in the state: 
+ 
+```rust
+# extern crate actix;
+# extern crate actix_web;
+# 
+use actix_web::*;
+use std::cell::Cell;
+
+// This struct represents state
+struct AppState {
+    counter: Cell<usize>,
+}
+
+fn index(req: HttpRequest<AppState>) -> String {
+    let count = req.state().counter.get() + 1; // <- get count
+    req.state().counter.set(count);            // <- store new count in state
+
+    format!("Request number: {}", count)       // <- response with count
+}
+
+fn main() {
+    Application::with_state(AppState{counter: Cell::new(0)})
+        .resource("/", |r| r.method(Method::GET).f(index))
+        .finish();
+}
+```
+
+Note on application state, http server accepts application factory rather than application
+instance. Http server construct application instance for each thread, so application state
+must be constructed multiple times. If you want to share state between different thread
+shared object should be used, like `Arc`. Application state does not need to be `Send` and `Sync`
+but application factory must be `Send` + `Sync`.
diff --git a/guide/src/qs_6.md b/guide/src/qs_6.md
deleted file mode 100644
index f7c88946..00000000
--- a/guide/src/qs_6.md
+++ /dev/null
@@ -1,37 +0,0 @@
-# Application state
-
-Application state is shared with all routes and resources within same application.
-State could be accessed with `HttpRequest::state()` method as a read-only item
-but interior mutability pattern with `RefCell` could be used to archive state mutability.
-State could be accessed with `HttpContext::state()` in case of http actor. 
-State also available to route matching predicates. State is not available
-to application middlewares, middlewares receives `HttpRequest<()>` object.
-
-Let's write simple application that uses shared state. We are going to store requests count
-in the state: 
- 
-```rust
-# extern crate actix;
-# extern crate actix_web;
-# 
-use actix_web::*;
-use std::cell::Cell;
-
-// This struct represents state
-struct AppState {
-    counter: Cell<usize>,
-}
-
-fn index(req: HttpRequest<AppState>) -> String {
-    let count = req.state().counter.get() + 1; // <- get count
-    req.state().counter.set(count);            // <- store new count in state
-
-    format!("Request number: {}", count)       // <- response with count
-}
-
-fn main() {
-    Application::with_state(AppState{counter: Cell::new(0)})
-        .resource("/", |r| r.method(Method::GET).f(index))
-        .finish();
-}
-```
diff --git a/src/channel.rs b/src/channel.rs
index bf7e24a9..6503e82f 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -11,7 +11,7 @@ use h2;
 use error::Error;
 use h1writer::Writer;
 use httprequest::HttpRequest;
-use server::ServerSettings;
+use server::{ServerSettings, WorkerSettings};
 
 /// Low level http request handler
 #[allow(unused_variables)]
@@ -67,7 +67,8 @@ pub struct HttpChannel<T, H>
 impl<T, H> HttpChannel<T, H>
     where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
 {
-    pub fn new(h: Rc<Vec<H>>, io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
+    pub(crate) fn new(h: Rc<WorkerSettings<H>>,
+               io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
     {
         if http2 {
             HttpChannel {
diff --git a/src/h1.rs b/src/h1.rs
index 3b47ca84..0f35f131 100644
--- a/src/h1.rs
+++ b/src/h1.rs
@@ -17,12 +17,12 @@ use pipeline::Pipeline;
 use encoding::PayloadType;
 use channel::{HttpHandler, HttpHandlerTask};
 use h1writer::H1Writer;
+use server::WorkerSettings;
 use httpcodes::HTTPNotFound;
 use httprequest::HttpRequest;
 use error::{ParseError, PayloadError, ResponseError};
 use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
 
-const KEEPALIVE_PERIOD: u64 = 15; // seconds
 const INIT_BUFFER_SIZE: usize = 8192;
 const MAX_BUFFER_SIZE: usize = 131_072;
 const MAX_HEADERS: usize = 100;
@@ -59,7 +59,7 @@ enum Item {
 
 pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> {
     flags: Flags,
-    handlers: Rc<Vec<H>>,
+    settings: Rc<WorkerSettings<H>>,
     addr: Option<SocketAddr>,
     stream: H1Writer<T>,
     reader: Reader,
@@ -77,9 +77,9 @@ impl<T, H> Http1<T, H>
     where T: AsyncRead + AsyncWrite + 'static,
           H: HttpHandler + 'static
 {
-    pub fn new(h: Rc<Vec<H>>, stream: T, addr: Option<SocketAddr>) -> Self {
+    pub fn new(h: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>) -> Self {
         Http1{ flags: Flags::KEEPALIVE,
-               handlers: h,
+               settings: h,
                addr: addr,
                stream: H1Writer::new(stream),
                reader: Reader::new(),
@@ -88,8 +88,8 @@ impl<T, H> Http1<T, H>
                keepalive_timer: None }
     }
 
-    pub fn into_inner(self) -> (Rc<Vec<H>>, T, Option<SocketAddr>, Bytes) {
-        (self.handlers, self.stream.into_inner(), self.addr, self.read_buf.freeze())
+    pub fn into_inner(self) -> (Rc<WorkerSettings<H>>, T, Option<SocketAddr>, Bytes) {
+        (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
     }
 
     pub fn poll(&mut self) -> Poll<Http1Result, ()> {
@@ -198,7 +198,7 @@ impl<T, H> Http1<T, H>
 
                         // start request processing
                         let mut pipe = None;
-                        for h in self.handlers.iter() {
+                        for h in self.settings.handlers().iter() {
                             req = match h.handle(req) {
                                 Ok(t) => {
                                     pipe = Some(t);
@@ -249,19 +249,24 @@ impl<T, H> Http1<T, H>
                     Ok(Async::NotReady) => {
                         // start keep-alive timer, this is also slow request timeout
                         if self.tasks.is_empty() {
-                            if self.flags.contains(Flags::KEEPALIVE) {
-                                if self.keepalive_timer.is_none() {
-                                    trace!("Start keep-alive timer");
-                                    let mut to = Timeout::new(
-                                        Duration::new(KEEPALIVE_PERIOD, 0),
-                                        Arbiter::handle()).unwrap();
-                                    // register timeout
-                                    let _ = to.poll();
-                                    self.keepalive_timer = Some(to);
+                            if let Some(keep_alive) = self.settings.keep_alive() {
+                                if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
+                                    if self.keepalive_timer.is_none() {
+                                        trace!("Start keep-alive timer");
+                                        let mut to = Timeout::new(
+                                            Duration::new(keep_alive as u64, 0),
+                                            Arbiter::handle()).unwrap();
+                                        // register timeout
+                                        let _ = to.poll();
+                                        self.keepalive_timer = Some(to);
+                                    }
+                                } else {
+                                    // keep-alive disable, drop connection
+                                    return Ok(Async::Ready(Http1Result::Done))
                                 }
                             } else {
-                                // keep-alive disable, drop connection
-                                return Ok(Async::Ready(Http1Result::Done))
+                                // keep-alive unset, rely on operating system
+                                return Ok(Async::NotReady)
                             }
                         }
                         break
diff --git a/src/h2.rs b/src/h2.rs
index 62568162..87566277 100644
--- a/src/h2.rs
+++ b/src/h2.rs
@@ -16,6 +16,7 @@ use tokio_core::reactor::Timeout;
 
 use pipeline::Pipeline;
 use h2writer::H2Writer;
+use server::WorkerSettings;
 use channel::{HttpHandler, HttpHandlerTask};
 use error::PayloadError;
 use encoding::PayloadType;
@@ -23,8 +24,6 @@ use httpcodes::HTTPNotFound;
 use httprequest::HttpRequest;
 use payload::{Payload, PayloadWriter};
 
-const KEEPALIVE_PERIOD: u64 = 15; // seconds
-
 bitflags! {
     struct Flags: u8 {
         const DISCONNECTED = 0b0000_0010;
@@ -36,7 +35,7 @@ pub(crate) struct Http2<T, H>
     where T: AsyncRead + AsyncWrite + 'static, H: 'static
 {
     flags: Flags,
-    handlers: Rc<Vec<H>>,
+    settings: Rc<WorkerSettings<H>>,
     addr: Option<SocketAddr>,
     state: State<IoWrapper<T>>,
     tasks: VecDeque<Entry>,
@@ -53,14 +52,14 @@ impl<T, H> Http2<T, H>
     where T: AsyncRead + AsyncWrite + 'static,
           H: HttpHandler + 'static
 {
-    pub fn new(h: Rc<Vec<H>>, stream: T, addr: Option<SocketAddr>, buf: Bytes) -> Self
+    pub fn new(h: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes) -> Self
     {
         Http2{ flags: Flags::empty(),
-               handlers: h,
+               settings: h,
                addr: addr,
                tasks: VecDeque::new(),
                state: State::Handshake(
-                   Server::handshake(IoWrapper{unread: Some(buf), inner: stream})),
+                   Server::handshake(IoWrapper{unread: Some(buf), inner: io})),
                keepalive_timer: None,
         }
     }
@@ -151,18 +150,28 @@ impl<T, H> Http2<T, H>
                             self.keepalive_timer.take();
 
                             self.tasks.push_back(
-                                Entry::new(parts, body, resp, self.addr, &self.handlers));
+                                Entry::new(parts, body, resp, self.addr, &self.settings));
                         }
                         Ok(Async::NotReady) => {
                             // start keep-alive timer
-                            if self.tasks.is_empty() && self.keepalive_timer.is_none() {
-                                trace!("Start keep-alive timer");
-                                let mut timeout = Timeout::new(
-                                    Duration::new(KEEPALIVE_PERIOD, 0),
-                                    Arbiter::handle()).unwrap();
-                                // register timeout
-                                let _ = timeout.poll();
-                                self.keepalive_timer = Some(timeout);
+                            if self.tasks.is_empty() {
+                                if let Some(keep_alive) = self.settings.keep_alive() {
+                                    if keep_alive > 0 && self.keepalive_timer.is_none() {
+                                        trace!("Start keep-alive timer");
+                                        let mut timeout = Timeout::new(
+                                            Duration::new(keep_alive as u64, 0),
+                                            Arbiter::handle()).unwrap();
+                                        // register timeout
+                                        let _ = timeout.poll();
+                                        self.keepalive_timer = Some(timeout);
+                                    }
+                                } else {
+                                    // keep-alive disable, drop connection
+                                    return Ok(Async::Ready(()))
+                                }
+                            } else {
+                                // keep-alive unset, rely on operating system
+                                return Ok(Async::NotReady)
                             }
                         }
                         Err(err) => {
@@ -230,7 +239,7 @@ impl Entry {
               recv: RecvStream,
               resp: Respond<Bytes>,
               addr: Option<SocketAddr>,
-              handlers: &Rc<Vec<H>>) -> Entry
+              settings: &Rc<WorkerSettings<H>>) -> Entry
         where H: HttpHandler + 'static
     {
         // Payload and Content-Encoding
@@ -247,7 +256,7 @@ impl Entry {
 
         // start request processing
         let mut task = None;
-        for h in handlers.iter() {
+        for h in settings.handlers().iter() {
             req = match h.handle(req) {
                 Ok(t) => {
                     task = Some(t);
diff --git a/src/lib.rs b/src/lib.rs
index 8f05bc2c..7c05015e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -110,6 +110,7 @@ pub mod headers {
 //! Headers implementation
 
     pub use encoding::ContentEncoding;
+    pub use httpresponse::ConnectionType;
 
     pub use cookie::Cookie;
     pub use cookie::CookieBuilder;
diff --git a/src/server.rs b/src/server.rs
index 4e9c00e5..ee7ad90e 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -90,10 +90,11 @@ impl ServerSettings {
 pub struct HttpServer<T, A, H, U>
     where H: 'static
 {
-    h: Rc<Vec<H>>,
+    h: Option<Rc<WorkerSettings<H>>>,
     io: PhantomData<T>,
     addr: PhantomData<A>,
     threads: usize,
+    keep_alive: Option<u16>,
     factory: Arc<Fn() -> U + Send + Sync>,
     workers: Vec<SyncAddress<Worker<H>>>,
 }
@@ -124,10 +125,11 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
     pub fn new<F>(factory: F) -> Self
         where F: Sync + Send + 'static + Fn() -> U,
     {
-        HttpServer{ h: Rc::new(Vec::new()),
+        HttpServer{ h: None,
                     io: PhantomData,
                     addr: PhantomData,
                     threads: num_cpus::get(),
+                    keep_alive: None,
                     factory: Arc::new(factory),
                     workers: Vec::new(),
         }
@@ -141,6 +143,20 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
         self
     }
 
+    /// Set server keep-alive setting.
+    ///
+    /// By default keep alive is enabled.
+    ///
+    ///  - `Some(75)` - enable
+    ///
+    ///  - `Some(0)` - disable
+    ///
+    ///  - `None` - use `SO_KEEPALIVE` socket option
+    pub fn keep_alive(mut self, val: Option<u16>) -> Self {
+        self.keep_alive = val;
+        self
+    }
+
     /// Start listening for incomming connections from a stream.
     ///
     /// This method uses only one thread for handling incoming connections.
@@ -155,7 +171,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
         for app in &mut apps {
             app.server_settings(settings.clone());
         }
-        self.h = Rc::new(apps);
+        self.h = Some(Rc::new(WorkerSettings{h: apps, keep_alive: self.keep_alive}));
 
         // start server
         Ok(HttpServer::create(move |ctx| {
@@ -215,15 +231,16 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
     }
 
     fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-                     -> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>
+                     -> Vec<mpsc::UnboundedSender<IoStream<Socket>>>
     {
         // start workers
         let mut workers = Vec::new();
         for _ in 0..self.threads {
             let s = settings.clone();
-            let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>();
+            let (tx, rx) = mpsc::unbounded::<IoStream<Socket>>();
 
             let h = handler.clone();
+            let ka = self.keep_alive.clone();
             let factory = Arc::clone(&self.factory);
             let addr = Arbiter::start(move |ctx: &mut Context<_>| {
                 let mut apps: Vec<_> = (*factory)()
@@ -232,7 +249,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
                     app.server_settings(s.clone());
                 }
                 ctx.add_stream(rx);
-                Worker{h: Rc::new(apps), handler: h}
+                Worker::new(apps, h, ka)
             });
             workers.push(tx);
             self.workers.push(addr);
@@ -379,7 +396,7 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
               -> Response<Self, IoStream<T>>
     {
         Arbiter::handle().spawn(
-            HttpChannel::new(Rc::clone(&self.h), msg.io, msg.peer, msg.http2));
+            HttpChannel::new(Rc::clone(&self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2));
         Self::empty()
     }
 }
@@ -389,11 +406,33 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
 ///
 /// Worker accepts Socket objects via unbounded channel and start requests processing.
 struct Worker<H> {
-    h: Rc<Vec<H>>,
+    h: Rc<WorkerSettings<H>>,
     handler: StreamHandlerType,
 }
 
+pub(crate) struct WorkerSettings<H> {
+    h: Vec<H>,
+    keep_alive: Option<u16>,
+}
+
+impl<H> WorkerSettings<H> {
+    pub fn handlers(&self) -> &Vec<H> {
+        &self.h
+    }
+    pub fn keep_alive(&self) -> Option<u16> {
+        self.keep_alive
+    }
+}
+
 impl<H: 'static> Worker<H> {
+
+    fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u16>) -> Worker<H> {
+        Worker {
+            h: Rc::new(WorkerSettings{h: h, keep_alive: keep_alive}),
+            handler: handler,
+        }
+    }
+    
     fn update_time(&self, ctx: &mut Context<Self>) {
         utils::update_date();
         ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
@@ -408,15 +447,20 @@ impl<H: 'static> Actor for Worker<H> {
     }
 }
 
-impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H>
+impl<H> StreamHandler<IoStream<Socket>> for Worker<H>
     where H: HttpHandler + 'static {}
 
-impl<H> Handler<IoStream<net::TcpStream>> for Worker<H>
+impl<H> Handler<IoStream<Socket>> for Worker<H>
     where H: HttpHandler + 'static,
 {
-    fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-              -> Response<Self, IoStream<net::TcpStream>>
+    fn handle(&mut self, msg: IoStream<Socket>, _: &mut Context<Self>)
+              -> Response<Self, IoStream<Socket>>
     {
+        if let None = self.h.keep_alive {
+            if msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err() {
+                error!("Can not set socket keep-alive option");
+            }
+        }
         self.handler.handle(Rc::clone(&self.h), msg);
         Self::empty()
     }
@@ -432,10 +476,11 @@ enum StreamHandlerType {
 }
 
 impl StreamHandlerType {
-    fn handle<H: HttpHandler>(&mut self, h: Rc<Vec<H>>, msg: IoStream<net::TcpStream>) {
+
+    fn handle<H: HttpHandler>(&mut self, h: Rc<WorkerSettings<H>>, msg: IoStream<Socket>) {
         match *self {
             StreamHandlerType::Normal => {
-                let io = TcpStream::from_stream(msg.io, Arbiter::handle())
+                let io = TcpStream::from_stream(msg.io.into_tcp_stream(), Arbiter::handle())
                     .expect("failed to associate TCP stream");
 
                 Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
@@ -443,7 +488,7 @@ impl StreamHandlerType {
             #[cfg(feature="tls")]
             StreamHandlerType::Tls(ref acceptor) => {
                 let IoStream { io, peer, http2 } = msg;
-                let io = TcpStream::from_stream(io, Arbiter::handle())
+                let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle())
                     .expect("failed to associate TCP stream");
 
                 Arbiter::handle().spawn(
@@ -461,7 +506,7 @@ impl StreamHandlerType {
             #[cfg(feature="alpn")]
             StreamHandlerType::Alpn(ref acceptor) => {
                 let IoStream { io, peer, .. } = msg;
-                let io = TcpStream::from_stream(io, Arbiter::handle())
+                let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle())
                     .expect("failed to associate TCP stream");
 
                 Arbiter::handle().spawn(
@@ -488,7 +533,7 @@ impl StreamHandlerType {
 }
 
 fn start_accept_thread(sock: Socket, addr: net::SocketAddr,
-                       workers: Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>) {
+                       workers: Vec<mpsc::UnboundedSender<IoStream<Socket>>>) {
     // start acceptors thread
     let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
         let mut next = 0;
@@ -500,8 +545,7 @@ fn start_accept_thread(sock: Socket, addr: net::SocketAddr,
                     } else {
                         net::SocketAddr::V6(addr.as_inet6().unwrap())
                     };
-                    let msg = IoStream{
-                        io: socket.into_tcp_stream(), peer: Some(addr), http2: false};
+                    let msg = IoStream{io: socket, peer: Some(addr), http2: false};
                     workers[next].unbounded_send(msg).expect("worker thread died");
                     next = (next + 1) % workers.len();
                 }