start server worker in start mehtod

This commit is contained in:
Nikolay Kim 2019-11-12 11:44:30 +06:00
parent 4c1cefefe6
commit 0be859d440
5 changed files with 79 additions and 126 deletions

View File

@ -233,10 +233,7 @@ where
{
type Error = T::Error;
fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let len = self.buffer.len();
if len >= self.high_watermark {
return Poll::Pending;
@ -247,6 +244,7 @@ where
fn start_send(self: Pin<&mut Self>, item: <T as Encoder>::Item) -> Result<(), Self::Error> {
let this = unsafe { self.get_unchecked_mut() };
// Check the buffer capacity
let len = this.buffer.len();
if len < this.low_watermark {
@ -299,57 +297,6 @@ where
Poll::Ready(Ok(()))
}
/*
fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
// Check the buffer capacity
let len = self.buffer.len();
if len >= self.high_watermark {
return Ok(AsyncSink::NotReady(item));
}
if len < self.low_watermark {
self.buffer.reserve(self.high_watermark - len)
}
self.inner.encode(item, &mut self.buffer)?;
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
trace!("flushing framed transport");
while !self.buffer.is_empty() {
trace!("writing; remaining={}", self.buffer.len());
let n = try_ready!(self.inner.poll_write(&self.buffer));
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to \
write frame to transport",
)
.into());
}
// TODO: Add a way to `bytes` to do this w/o returning the drained
// data.
let _ = self.buffer.split_to(n);
}
// Try flushing the underlying IO
try_ready!(self.inner.poll_flush());
trace!("framed transport flushed");
Ok(Async::Ready(()))
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_complete());
Ok(self.inner.shutdown()?)
}
*/
}
impl<T: Decoder> Decoder for FramedWrite2<T> {

View File

@ -185,6 +185,7 @@ impl Worker {
Arbiter::current().stop();
}
}
wrk.await
}
.boxed_local(),
);

View File

@ -1,14 +1,13 @@
use std::io::{self, Read};
use std::io::Read;
use std::sync::mpsc;
use std::{net, thread, time};
use actix_codec::{BytesCodec, Framed};
use actix_server::{Io, Server, ServerConfig};
use actix_service::{into_service, service_fn, service_fn_config, IntoService};
use actix_service::{service_fn, service_fn2, service_fn_config, IntoService};
use bytes::Bytes;
use futures::{Future, FutureExt, Sink, SinkExt};
use futures::{future::ok, SinkExt};
use net2::TcpBuilder;
use tokio::future::ok;
use tokio_net::tcp::TcpStream;
fn unused_addr() -> net::SocketAddr {
@ -95,75 +94,71 @@ fn test_listen() {
let _ = h.join();
}
// #[test]
// #[cfg(unix)]
// fn test_start() {
// let addr = unused_addr();
// let (tx, rx) = mpsc::channel();
#[test]
#[cfg(unix)]
fn test_start() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
// let h = thread::spawn(move || {
// let sys = actix_rt::System::new("test");
// let srv: Server = Server::build()
// .backlog(100)
// .bind("test", addr, move || {
// service_fn_config(move |cfg: &ServerConfig| {
// assert_eq!(cfg.local_addr(), addr);
let h = thread::spawn(move || {
let sys = actix_rt::System::new("test");
let srv: Server = Server::build()
.backlog(100)
.bind("test", addr, move || {
service_fn_config(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
// let srv = into_service(
// (|io: Io<TcpStream>| {
// let t = async {
// let mut f = Framed::new(io.into_parts().0, BytesCodec);
// f.send(Bytes::from_static(b"test")).await.unwrap();
// Ok::<_, ()>(())
// };
// //ok::<_, ()>(())
// t
// }),
// );
let srv = service_fn2(|io: Io<TcpStream>| {
async {
let mut f = Framed::new(io.into_parts().0, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
}
});
// ok::<_, ()>(srv)
// })
// })
// .unwrap()
// .start();
ok::<_, ()>(srv)
})
})
.unwrap()
.start();
// let _ = tx.send((srv, actix_rt::System::current()));
// let _ = sys.run();
// });
// let (srv, sys) = rx.recv().unwrap();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
});
let (srv, sys) = rx.recv().unwrap();
// let mut buf = [1u8; 4];
// let mut conn = net::TcpStream::connect(addr).unwrap();
// let _ = conn.read_exact(&mut buf);
// assert_eq!(buf, b"test"[..]);
let mut buf = [1u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]);
// // pause
// let _ = srv.pause();
// thread::sleep(time::Duration::from_millis(200));
// let mut conn = net::TcpStream::connect(addr).unwrap();
// conn.set_read_timeout(Some(time::Duration::from_millis(100)))
// .unwrap();
// let res = conn.read_exact(&mut buf);
// assert!(res.is_err());
// pause
let _ = srv.pause();
thread::sleep(time::Duration::from_millis(200));
let mut conn = net::TcpStream::connect(addr).unwrap();
conn.set_read_timeout(Some(time::Duration::from_millis(100)))
.unwrap();
let res = conn.read_exact(&mut buf);
assert!(res.is_err());
// // resume
// let _ = srv.resume();
// thread::sleep(time::Duration::from_millis(100));
// assert!(net::TcpStream::connect(addr).is_ok());
// assert!(net::TcpStream::connect(addr).is_ok());
// assert!(net::TcpStream::connect(addr).is_ok());
// resume
let _ = srv.resume();
thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok());
// let mut buf = [0u8; 4];
// let mut conn = net::TcpStream::connect(addr).unwrap();
// let _ = conn.read_exact(&mut buf);
// assert_eq!(buf, b"test"[..]);
let mut buf = [0u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]);
// // stop
// let _ = srv.stop(false);
// thread::sleep(time::Duration::from_millis(100));
// assert!(net::TcpStream::connect(addr).is_err());
// stop
let _ = srv.stop(false);
thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err());
// thread::sleep(time::Duration::from_millis(100));
// let _ = sys.stop();
// let _ = h.join();
// }
thread::sleep(time::Duration::from_millis(100));
let _ = sys.stop();
let _ = h.join();
}

View File

@ -8,7 +8,7 @@ use pin_project::pin_project;
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Create `ServiceFactory` for function that can act as a Service
/// Create `ServiceFactory` for function that can act as a `Service`
pub fn service_fn<F, Fut, Req, Res, Err, Cfg>(
f: F,
) -> impl ServiceFactory<Config = Cfg, Request = Req, Response = Res, Error = Err, InitError = ()>
@ -20,6 +20,16 @@ where
NewServiceFn::new(f)
}
pub fn service_fn2<F, Fut, Req, Res, Err>(
f: F,
) -> impl Service<Request = Req, Response = Res, Error = Err>
where
F: FnMut(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
ServiceFn::new(f)
}
/// Create `ServiceFactory` for function that can produce services
pub fn service_fn_factory<S, F, Cfg, Fut, Err>(
f: F,
@ -88,7 +98,7 @@ where
impl<F, Fut, Req, Res, Err> Service for ServiceFn<F, Fut, Req, Res, Err>
where
F: FnMut(Req) -> Fut + Clone,
F: FnMut(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
type Request = Req;
@ -96,7 +106,7 @@ where
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
@ -107,7 +117,7 @@ where
impl<F, Fut, Req, Res, Err> IntoService<ServiceFn<F, Fut, Req, Res, Err>> for F
where
F: FnMut(Req) -> Fut + Clone,
F: FnMut(Req) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
fn into_service(self) -> ServiceFn<F, Fut, Req, Res, Err> {

View File

@ -22,7 +22,7 @@ mod transform_err;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
pub use self::fn_service::{service_fn, service_fn_config, service_fn_factory};
pub use self::fn_service::{service_fn, service_fn2, service_fn_config, service_fn_factory};
pub use self::into::{into_factory, into_service, ServiceFactoryMapper, ServiceMapper};
pub use self::map_config::{map_config, unit_config, MappedConfig};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};