From 0be859d4409ca2370cd9f81cfc85b263730ddbf7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 12 Nov 2019 11:44:30 +0600 Subject: [PATCH] start server worker in start mehtod --- actix-codec/src/framed_write.rs | 57 +------------- actix-server/src/worker.rs | 1 + actix-server/tests/test_server.rs | 127 ++++++++++++++---------------- actix-service/src/fn_service.rs | 18 ++++- actix-service/src/lib.rs | 2 +- 5 files changed, 79 insertions(+), 126 deletions(-) diff --git a/actix-codec/src/framed_write.rs b/actix-codec/src/framed_write.rs index 542bb195..ce60d1eb 100644 --- a/actix-codec/src/framed_write.rs +++ b/actix-codec/src/framed_write.rs @@ -233,10 +233,7 @@ where { type Error = T::Error; - fn poll_ready( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { let len = self.buffer.len(); if len >= self.high_watermark { return Poll::Pending; @@ -247,6 +244,7 @@ where fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { let this = unsafe { self.get_unchecked_mut() }; + // Check the buffer capacity let len = this.buffer.len(); if len < this.low_watermark { @@ -299,57 +297,6 @@ where Poll::Ready(Ok(())) } - - /* - fn start_send(&mut self, item: T::Item) -> StartSend { - // Check the buffer capacity - let len = self.buffer.len(); - if len >= self.high_watermark { - return Ok(AsyncSink::NotReady(item)); - } - if len < self.low_watermark { - self.buffer.reserve(self.high_watermark - len) - } - - self.inner.encode(item, &mut self.buffer)?; - - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - trace!("flushing framed transport"); - - while !self.buffer.is_empty() { - trace!("writing; remaining={}", self.buffer.len()); - - let n = try_ready!(self.inner.poll_write(&self.buffer)); - - if n == 0 { - return Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to \ - write frame to transport", - ) - .into()); - } - - // TODO: Add a way to `bytes` to do this w/o returning the drained - // data. - let _ = self.buffer.split_to(n); - } - - // Try flushing the underlying IO - try_ready!(self.inner.poll_flush()); - - trace!("framed transport flushed"); - Ok(Async::Ready(())) - } - - fn close(&mut self) -> Poll<(), Self::SinkError> { - try_ready!(self.poll_complete()); - Ok(self.inner.shutdown()?) - } - */ } impl Decoder for FramedWrite2 { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a99064b6..a1ba6c78 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -185,6 +185,7 @@ impl Worker { Arbiter::current().stop(); } } + wrk.await } .boxed_local(), ); diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 91048a2c..7147a642 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,14 +1,13 @@ -use std::io::{self, Read}; +use std::io::Read; use std::sync::mpsc; use std::{net, thread, time}; use actix_codec::{BytesCodec, Framed}; use actix_server::{Io, Server, ServerConfig}; -use actix_service::{into_service, service_fn, service_fn_config, IntoService}; +use actix_service::{service_fn, service_fn2, service_fn_config, IntoService}; use bytes::Bytes; -use futures::{Future, FutureExt, Sink, SinkExt}; +use futures::{future::ok, SinkExt}; use net2::TcpBuilder; -use tokio::future::ok; use tokio_net::tcp::TcpStream; fn unused_addr() -> net::SocketAddr { @@ -95,75 +94,71 @@ fn test_listen() { let _ = h.join(); } -// #[test] -// #[cfg(unix)] -// fn test_start() { -// let addr = unused_addr(); -// let (tx, rx) = mpsc::channel(); +#[test] +#[cfg(unix)] +fn test_start() { + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); -// let h = thread::spawn(move || { -// let sys = actix_rt::System::new("test"); -// let srv: Server = Server::build() -// .backlog(100) -// .bind("test", addr, move || { -// service_fn_config(move |cfg: &ServerConfig| { -// assert_eq!(cfg.local_addr(), addr); + let h = thread::spawn(move || { + let sys = actix_rt::System::new("test"); + let srv: Server = Server::build() + .backlog(100) + .bind("test", addr, move || { + service_fn_config(move |cfg: &ServerConfig| { + assert_eq!(cfg.local_addr(), addr); -// let srv = into_service( -// (|io: Io| { -// let t = async { -// let mut f = Framed::new(io.into_parts().0, BytesCodec); -// f.send(Bytes::from_static(b"test")).await.unwrap(); -// Ok::<_, ()>(()) -// }; -// //ok::<_, ()>(()) -// t -// }), -// ); + let srv = service_fn2(|io: Io| { + async { + let mut f = Framed::new(io.into_parts().0, BytesCodec); + f.send(Bytes::from_static(b"test")).await.unwrap(); + Ok::<_, ()>(()) + } + }); -// ok::<_, ()>(srv) -// }) -// }) -// .unwrap() -// .start(); + ok::<_, ()>(srv) + }) + }) + .unwrap() + .start(); -// let _ = tx.send((srv, actix_rt::System::current())); -// let _ = sys.run(); -// }); -// let (srv, sys) = rx.recv().unwrap(); + let _ = tx.send((srv, actix_rt::System::current())); + let _ = sys.run(); + }); + let (srv, sys) = rx.recv().unwrap(); -// let mut buf = [1u8; 4]; -// let mut conn = net::TcpStream::connect(addr).unwrap(); -// let _ = conn.read_exact(&mut buf); -// assert_eq!(buf, b"test"[..]); + let mut buf = [1u8; 4]; + let mut conn = net::TcpStream::connect(addr).unwrap(); + let _ = conn.read_exact(&mut buf); + assert_eq!(buf, b"test"[..]); -// // pause -// let _ = srv.pause(); -// thread::sleep(time::Duration::from_millis(200)); -// let mut conn = net::TcpStream::connect(addr).unwrap(); -// conn.set_read_timeout(Some(time::Duration::from_millis(100))) -// .unwrap(); -// let res = conn.read_exact(&mut buf); -// assert!(res.is_err()); + // pause + let _ = srv.pause(); + thread::sleep(time::Duration::from_millis(200)); + let mut conn = net::TcpStream::connect(addr).unwrap(); + conn.set_read_timeout(Some(time::Duration::from_millis(100))) + .unwrap(); + let res = conn.read_exact(&mut buf); + assert!(res.is_err()); -// // resume -// let _ = srv.resume(); -// thread::sleep(time::Duration::from_millis(100)); -// assert!(net::TcpStream::connect(addr).is_ok()); -// assert!(net::TcpStream::connect(addr).is_ok()); -// assert!(net::TcpStream::connect(addr).is_ok()); + // resume + let _ = srv.resume(); + thread::sleep(time::Duration::from_millis(100)); + assert!(net::TcpStream::connect(addr).is_ok()); + assert!(net::TcpStream::connect(addr).is_ok()); + assert!(net::TcpStream::connect(addr).is_ok()); -// let mut buf = [0u8; 4]; -// let mut conn = net::TcpStream::connect(addr).unwrap(); -// let _ = conn.read_exact(&mut buf); -// assert_eq!(buf, b"test"[..]); + let mut buf = [0u8; 4]; + let mut conn = net::TcpStream::connect(addr).unwrap(); + let _ = conn.read_exact(&mut buf); + assert_eq!(buf, b"test"[..]); -// // stop -// let _ = srv.stop(false); -// thread::sleep(time::Duration::from_millis(100)); -// assert!(net::TcpStream::connect(addr).is_err()); + // stop + let _ = srv.stop(false); + thread::sleep(time::Duration::from_millis(100)); + assert!(net::TcpStream::connect(addr).is_err()); -// thread::sleep(time::Duration::from_millis(100)); -// let _ = sys.stop(); -// let _ = h.join(); -// } + thread::sleep(time::Duration::from_millis(100)); + let _ = sys.stop(); + let _ = h.join(); +} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 3d4ca7a8..f7da4407 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -8,7 +8,7 @@ use pin_project::pin_project; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -/// Create `ServiceFactory` for function that can act as a Service +/// Create `ServiceFactory` for function that can act as a `Service` pub fn service_fn( f: F, ) -> impl ServiceFactory @@ -20,6 +20,16 @@ where NewServiceFn::new(f) } +pub fn service_fn2( + f: F, +) -> impl Service +where + F: FnMut(Req) -> Fut, + Fut: Future>, +{ + ServiceFn::new(f) +} + /// Create `ServiceFactory` for function that can produce services pub fn service_fn_factory( f: F, @@ -88,7 +98,7 @@ where impl Service for ServiceFn where - F: FnMut(Req) -> Fut + Clone, + F: FnMut(Req) -> Fut, Fut: Future>, { type Request = Req; @@ -96,7 +106,7 @@ where type Error = Err; type Future = Fut; - fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -107,7 +117,7 @@ where impl IntoService> for F where - F: FnMut(Req) -> Fut + Clone, + F: FnMut(Req) -> Fut, Fut: Future>, { fn into_service(self) -> ServiceFn { diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 19067480..c32f46a4 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -22,7 +22,7 @@ mod transform_err; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; -pub use self::fn_service::{service_fn, service_fn_config, service_fn_factory}; +pub use self::fn_service::{service_fn, service_fn2, service_fn_config, service_fn_factory}; pub use self::into::{into_factory, into_service, ServiceFactoryMapper, ServiceMapper}; pub use self::map_config::{map_config, unit_config, MappedConfig}; pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};