From 5271ba5ebd2520f8020655460868887a4ecf7d08 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 14 Nov 2019 14:55:48 +0600 Subject: [PATCH] migrate actix-ioframe to std::future --- actix-codec/src/framed.rs | 23 +- actix-ioframe/Cargo.toml | 14 +- actix-ioframe/src/connect.rs | 58 ++-- actix-ioframe/src/dispatcher.rs | 466 ++++++++++++++++------------- actix-ioframe/src/service.rs | 273 ++++++++++------- actix-ioframe/src/sink.rs | 18 +- actix-ioframe/tests/test_server.rs | 26 +- actix-service/src/apply.rs | 8 +- actix-utils/Cargo.toml | 2 +- actix-utils/src/framed.rs | 22 +- 10 files changed, 528 insertions(+), 382 deletions(-) diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 0346605e..7152b459 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -223,17 +223,22 @@ impl Framed { } } -impl Framed -where - T: AsyncRead + Unpin, - U: Decoder + Unpin, -{ - pub fn poll_next_item( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { +impl Framed { + pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> + where + T: AsyncRead + Unpin, + U: Decoder + Unpin, + { Pin::new(&mut self.inner).poll_next(cx) } + + pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> + where + T: AsyncWrite + Unpin, + U: Encoder + Unpin, + { + Pin::new(self.inner.get_mut()).poll_flush(cx) + } } impl Stream for Framed diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 09cbac6d..a7baf1e0 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -18,18 +18,20 @@ name = "actix_ioframe" path = "src/lib.rs" [dependencies] -actix-service = "0.4.1" -actix-codec = "0.1.2" +actix-service = "1.0.0-alpha.1" +actix-codec = "0.2.0-alpha.1" +actix-utils = "0.5.0-alpha.1" bytes = "0.4" either = "1.5.2" futures = "0.3.1" +pin-project = "0.4.5" tokio-executor = "=0.2.0-alpha.6" log = "0.4" [dev-dependencies] -actix-rt = "0.2.2" -actix-connect = "0.3.0" +actix-rt = "1.0.0-alpha.1" +actix-connect = "1.0.0-alpha.1" actix-testing = "0.2.0" actix-server-config = "0.2.0" -tokio-tcp = "0.1" -tokio-timer = "0.2" +tokio-net = "=0.2.0-alpha.6" +tokio-timer = "=0.3.0-alpha.6" diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs index d7f403c2..703c3b14 100644 --- a/actix-ioframe/src/connect.rs +++ b/actix-ioframe/src/connect.rs @@ -1,7 +1,11 @@ use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use futures::unsync::mpsc; +use actix_utils::mpsc; +use futures::Stream; +use pin_project::pin_project; use crate::dispatcher::FramedMessage; use crate::sink::Sink; @@ -13,7 +17,7 @@ pub struct Connect { impl Connect where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, { pub(crate) fn new(io: Io) -> Self { Self { @@ -24,9 +28,9 @@ where pub fn codec(self, codec: Codec) -> ConnectResult where - Codec: Encoder + Decoder, + Codec: Encoder + Decoder + Unpin, { - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = mpsc::channel(); let sink = Sink::new(tx); ConnectResult { @@ -38,10 +42,12 @@ where } } +#[pin_project] pub struct ConnectResult { pub(crate) state: St, + #[pin] pub(crate) framed: Framed, - pub(crate) rx: mpsc::UnboundedReceiver::Item>>, + pub(crate) rx: mpsc::Receiver::Item>>, pub(crate) sink: Sink<::Item>, } @@ -72,39 +78,41 @@ impl ConnectResult { } } -impl futures::Stream for ConnectResult +impl Stream for ConnectResult where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, { - type Item = ::Item; - type Error = ::Error; + type Item = Result<::Item, ::Error>; - fn poll(&mut self) -> futures::Poll, Self::Error> { - self.framed.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().framed.poll_next(cx) } } -impl futures::Sink for ConnectResult +impl futures::Sink<::Item> for ConnectResult where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, { - type SinkItem = ::Item; - type SinkError = ::Error; + type Error = ::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().framed.poll_ready(cx) + } fn start_send( - &mut self, - item: Self::SinkItem, - ) -> futures::StartSend { - self.framed.start_send(item) + self: Pin<&mut Self>, + item: ::Item, + ) -> Result<(), Self::Error> { + self.project().framed.start_send(item) } - fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { - self.framed.poll_complete() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().framed.poll_flush(cx) } - fn close(&mut self) -> futures::Poll<(), Self::SinkError> { - self.framed.close() + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().framed.poll_close(cx) } } diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index 05a7d05f..da9957b3 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -1,13 +1,17 @@ //! Framed dispatcher service and related utilities use std::collections::VecDeque; +use std::future::Future; use std::mem; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures::task::AtomicTask; -use futures::unsync::{mpsc, oneshot}; -use futures::{Async, Future, Poll, Sink as FutureSink, Stream}; +use actix_utils::task::LocalWaker; +use actix_utils::{mpsc, oneshot}; +use futures::future::ready; +use futures::{FutureExt, Sink as FutureSink, Stream}; use log::debug; use crate::cell::Cell; @@ -27,13 +31,14 @@ pub(crate) enum FramedMessage { /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. +#[pin_project::pin_project] pub(crate) struct FramedDispatcher where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -42,7 +47,7 @@ where state: State, dispatch_state: FramedState, framed: Framed, - rx: Option::Item>>>, + rx: Option::Item>>>, inner: Cell::Item, S::Error>>, disconnect: Option>, } @@ -52,8 +57,8 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -61,7 +66,7 @@ where framed: Framed, state: State, service: F, - rx: mpsc::UnboundedReceiver::Item>>, + rx: mpsc::Receiver::Item>>, sink: Sink<::Item>, disconnect: Option>, ) -> Self { @@ -75,13 +80,13 @@ where dispatch_state: FramedState::Processing, inner: Cell::new(FramedDispatcherInner { buf: VecDeque::new(), - task: AtomicTask::new(), + task: LocalWaker::new(), }), } } } -enum FramedState { +enum FramedState { Processing, Error(ServiceError), FramedError(ServiceError), @@ -89,7 +94,7 @@ enum FramedState { Stopping, } -impl FramedState { +impl FramedState { fn stop(&mut self, tx: Option>) { match self { FramedState::FlushAndStop(ref mut vec) => { @@ -115,149 +120,7 @@ impl FramedState { struct FramedDispatcherInner { buf: VecDeque>, - task: AtomicTask, -} - -impl FramedDispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, -{ - fn disconnect(&mut self, error: bool) { - if let Some(ref disconnect) = self.disconnect { - (&*disconnect)(&mut *self.state.get_mut(), error); - } - } - - fn poll_read(&mut self) -> bool { - loop { - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - let item = match self.framed.poll() { - Ok(Async::Ready(Some(el))) => el, - Err(err) => { - self.dispatch_state = - FramedState::FramedError(ServiceError::Decoder(err)); - return true; - } - Ok(Async::NotReady) => return false, - Ok(Async::Ready(None)) => { - log::trace!("Client disconnected"); - self.dispatch_state = FramedState::Stopping; - return true; - } - }; - - let mut cell = self.inner.clone(); - tokio_current_thread::spawn( - self.service - .call(Item::new(self.state.clone(), self.sink.clone(), item)) - .then(move |item| { - let item = match item { - Ok(Some(item)) => Ok(item), - Ok(None) => return Ok(()), - Err(err) => Err(err), - }; - unsafe { - let inner = cell.get_mut(); - inner.buf.push_back(item); - inner.task.notify(); - } - Ok(()) - }), - ); - } - Ok(Async::NotReady) => return false, - Err(err) => { - self.dispatch_state = FramedState::Error(ServiceError::Service(err)); - return true; - } - } - } - } - - /// write to framed object - fn poll_write(&mut self) -> bool { - let inner = unsafe { self.inner.get_mut() }; - let mut rx_done = self.rx.is_none(); - let mut buf_empty = inner.buf.is_empty(); - loop { - while !self.framed.is_write_buf_full() { - if !buf_empty { - match inner.buf.pop_front().unwrap() { - Ok(msg) => { - if let Err(err) = self.framed.force_send(msg) { - self.dispatch_state = - FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - buf_empty = inner.buf.is_empty(); - } - Err(err) => { - self.dispatch_state = - FramedState::Error(ServiceError::Service(err)); - return true; - } - } - } - - if !rx_done && self.rx.is_some() { - match self.rx.as_mut().unwrap().poll() { - Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => { - if let Err(err) = self.framed.force_send(msg) { - self.dispatch_state = - FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - } - Ok(Async::Ready(Some(FramedMessage::Close))) => { - self.dispatch_state.stop(None); - return true; - } - Ok(Async::Ready(Some(FramedMessage::WaitClose(tx)))) => { - self.dispatch_state.stop(Some(tx)); - return true; - } - Ok(Async::Ready(None)) => { - rx_done = true; - let _ = self.rx.take(); - } - Ok(Async::NotReady) => rx_done = true, - Err(_e) => { - rx_done = true; - let _ = self.rx.take(); - } - } - } - - if rx_done && buf_empty { - break; - } - } - - if !self.framed.is_write_buf_empty() { - match self.framed.poll_complete() { - Ok(Async::NotReady) => break, - Err(err) => { - debug!("Error sending data: {:?}", err); - self.dispatch_state = - FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - Ok(Async::Ready(_)) => (), - } - } else { - break; - } - } - - false - } + task: LocalWaker, } impl Future for FramedDispatcher @@ -265,63 +128,266 @@ where S: Service, Response = Option>>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { - type Item = (); - type Error = ServiceError; + type Output = Result<(), ServiceError>; - fn poll(&mut self) -> Poll { - unsafe { self.inner.get_ref().task.register() }; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + unsafe { self.inner.get_ref().task.register(cx.waker()) }; - match mem::replace(&mut self.dispatch_state, FramedState::Processing) { - FramedState::Processing => { - if self.poll_read() || self.poll_write() { - self.poll() - } else { - Ok(Async::NotReady) - } + let this = self.project(); + poll( + cx, + this.service, + this.state, + this.sink, + this.framed, + this.dispatch_state, + this.rx, + this.inner, + this.disconnect, + ) + } +} + +fn poll( + cx: &mut Context, + srv: &mut S, + state: &mut State, + sink: &mut Sink<::Item>, + framed: &mut Framed, + dispatch_state: &mut FramedState, + rx: &mut Option::Item>>>, + inner: &mut Cell::Item, S::Error>>, + disconnect: &mut Option>, +) -> Poll>> +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + match mem::replace(dispatch_state, FramedState::Processing) { + FramedState::Processing => { + if poll_read(cx, srv, state, sink, framed, dispatch_state, inner) + || poll_write(cx, framed, dispatch_state, rx, inner) + { + poll( + cx, + srv, + state, + sink, + framed, + dispatch_state, + rx, + inner, + disconnect, + ) + } else { + Poll::Pending } - FramedState::Error(err) => { - if self.framed.is_write_buf_empty() - || (self.poll_write() || self.framed.is_write_buf_empty()) - { - self.disconnect(true); - Err(err) - } else { - self.dispatch_state = FramedState::Error(err); - Ok(Async::NotReady) + } + FramedState::Error(err) => { + if framed.is_write_buf_empty() + || (poll_write(cx, framed, dispatch_state, rx, inner) + || framed.is_write_buf_empty()) + { + if let Some(ref disconnect) = disconnect { + (&*disconnect)(&mut *state.get_mut(), true); } + Poll::Ready(Err(err)) + } else { + *dispatch_state = FramedState::Error(err); + Poll::Pending } - FramedState::FlushAndStop(mut vec) => { - if !self.framed.is_write_buf_empty() { - match self.framed.poll_complete() { - Err(err) => { - debug!("Error sending data: {:?}", err); - } - Ok(Async::NotReady) => { - self.dispatch_state = FramedState::FlushAndStop(vec); - return Ok(Async::NotReady); - } - Ok(Async::Ready(_)) => (), + } + FramedState::FlushAndStop(mut vec) => { + if !framed.is_write_buf_empty() { + match Pin::new(framed).poll_flush(cx) { + Poll::Ready(Err(err)) => { + debug!("Error sending data: {:?}", err); + } + Poll::Pending => { + *dispatch_state = FramedState::FlushAndStop(vec); + return Poll::Pending; + } + Poll::Ready(_) => (), + } + }; + for tx in vec.drain(..) { + let _ = tx.send(()); + } + if let Some(ref disconnect) = disconnect { + (&*disconnect)(&mut *state.get_mut(), false); + } + Poll::Ready(Ok(())) + } + FramedState::FramedError(err) => { + if let Some(ref disconnect) = disconnect { + (&*disconnect)(&mut *state.get_mut(), true); + } + Poll::Ready(Err(err)) + } + FramedState::Stopping => { + if let Some(ref disconnect) = disconnect { + (&*disconnect)(&mut *state.get_mut(), false); + } + Poll::Ready(Ok(())) + } + } +} + +fn poll_read( + cx: &mut Context, + srv: &mut S, + state: &mut State, + sink: &mut Sink<::Item>, + framed: &mut Framed, + dispatch_state: &mut FramedState, + inner: &mut Cell::Item, S::Error>>, +) -> bool +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + loop { + match srv.poll_ready(cx) { + Poll::Ready(Ok(_)) => { + let item = match framed.next_item(cx) { + Poll::Ready(Some(Ok(el))) => el, + Poll::Ready(Some(Err(err))) => { + *dispatch_state = FramedState::FramedError(ServiceError::Decoder(err)); + return true; + } + Poll::Pending => return false, + Poll::Ready(None) => { + log::trace!("Client disconnected"); + *dispatch_state = FramedState::Stopping; + return true; } }; - for tx in vec.drain(..) { - let _ = tx.send(()); - } - self.disconnect(false); - Ok(Async::Ready(())) + + let mut cell = inner.clone(); + tokio_executor::current_thread::spawn( + srv.call(Item::new(state.clone(), sink.clone(), item)) + .then(move |item| { + let item = match item { + Ok(Some(item)) => Ok(item), + Ok(None) => return ready(()), + Err(err) => Err(err), + }; + unsafe { + let inner = cell.get_mut(); + inner.buf.push_back(item); + inner.task.wake(); + } + ready(()) + }), + ); } - FramedState::FramedError(err) => { - self.disconnect(true); - Err(err) - } - FramedState::Stopping => { - self.disconnect(false); - Ok(Async::Ready(())) + Poll::Pending => return false, + Poll::Ready(Err(err)) => { + *dispatch_state = FramedState::Error(ServiceError::Service(err)); + return true; } } } } + +/// write to framed object +fn poll_write( + cx: &mut Context, + framed: &mut Framed, + dispatch_state: &mut FramedState, + rx: &mut Option::Item>>>, + inner: &mut Cell::Item, S::Error>>, +) -> bool +where + S: Service, Response = Option>>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + let inner = unsafe { inner.get_mut() }; + let mut rx_done = rx.is_none(); + let mut buf_empty = inner.buf.is_empty(); + loop { + while !framed.is_write_buf_full() { + if !buf_empty { + match inner.buf.pop_front().unwrap() { + Ok(msg) => { + if let Err(err) = framed.force_send(msg) { + *dispatch_state = + FramedState::FramedError(ServiceError::Encoder(err)); + return true; + } + buf_empty = inner.buf.is_empty(); + } + Err(err) => { + *dispatch_state = FramedState::Error(ServiceError::Service(err)); + return true; + } + } + } + + if !rx_done && rx.is_some() { + match Pin::new(rx.as_mut().unwrap()).poll_next(cx) { + Poll::Ready(Some(FramedMessage::Message(msg))) => { + if let Err(err) = framed.force_send(msg) { + *dispatch_state = + FramedState::FramedError(ServiceError::Encoder(err)); + return true; + } + } + Poll::Ready(Some(FramedMessage::Close)) => { + dispatch_state.stop(None); + return true; + } + Poll::Ready(Some(FramedMessage::WaitClose(tx))) => { + dispatch_state.stop(Some(tx)); + return true; + } + Poll::Ready(None) => { + rx_done = true; + let _ = rx.take(); + } + Poll::Pending => rx_done = true, + } + } + + if rx_done && buf_empty { + break; + } + } + + if !framed.is_write_buf_empty() { + match framed.flush(cx) { + Poll::Pending => break, + Poll::Ready(Err(err)) => { + debug!("Error sending data: {:?}", err); + *dispatch_state = FramedState::FramedError(ServiceError::Encoder(err)); + return true; + } + Poll::Ready(_) => (), + } + } else { + break; + } + } + + false +} diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs index 98dbd366..96cdab47 100644 --- a/actix-ioframe/src/service.rs +++ b/actix-ioframe/src/service.rs @@ -1,9 +1,14 @@ +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; -use actix_service::{IntoNewService, IntoService, NewService, Service}; -use futures::{Async, Future, Poll}; +use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; +use either::Either; +use futures::future::{FutureExt, LocalBoxFuture}; +use pin_project::{pin_project, project}; use crate::connect::{Connect, ConnectResult}; use crate::dispatcher::FramedDispatcher; @@ -27,9 +32,9 @@ impl Builder { pub fn service(self, connect: F) -> ServiceBuilder where F: IntoService, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, - Codec: Decoder + Encoder, + Codec: Decoder + Encoder + Unpin, { ServiceBuilder { connect: connect.into_service(), @@ -41,19 +46,19 @@ impl Builder { /// Construct framed handler new service with specified connect service pub fn factory(self, connect: F) -> NewServiceBuilder where - F: IntoNewService, - Io: AsyncRead + AsyncWrite, - C: NewService< + F: IntoServiceFactory, + Io: AsyncRead + AsyncWrite + Unpin, + C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, C::Error: 'static, C::Future: 'static, - Codec: Decoder + Encoder, + Codec: Decoder + Encoder + Unpin, { NewServiceBuilder { - connect: connect.into_new_service(), + connect: connect.into_factory(), disconnect: None, _t: PhantomData, } @@ -69,10 +74,10 @@ pub struct ServiceBuilder { impl ServiceBuilder where St: 'static, - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, - Codec: Decoder + Encoder, + Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -93,8 +98,8 @@ where service: F, ) -> impl Service> where - F: IntoNewService, - T: NewService< + F: IntoServiceFactory, + T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, @@ -104,7 +109,7 @@ where { FramedServiceImpl { connect: self.connect, - handler: Rc::new(service.into_new_service()), + handler: Rc::new(service.into_factory()), disconnect: self.disconnect.clone(), _t: PhantomData, } @@ -120,11 +125,15 @@ pub struct NewServiceBuilder { impl NewServiceBuilder where St: 'static, - Io: AsyncRead + AsyncWrite, - C: NewService, Response = ConnectResult>, + Io: AsyncRead + AsyncWrite + Unpin, + C: ServiceFactory< + Config = (), + Request = Connect, + Response = ConnectResult, + >, C::Error: 'static, C::Future: 'static, - Codec: Decoder + Encoder, + Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -142,15 +151,15 @@ where pub fn finish( self, service: F, - ) -> impl NewService< + ) -> impl ServiceFactory< Config = Cfg, Request = Io, Response = (), Error = ServiceError, > where - F: IntoNewService, - T: NewService< + F: IntoServiceFactory, + T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, @@ -160,7 +169,7 @@ where { FramedService { connect: self.connect, - handler: Rc::new(service.into_new_service()), + handler: Rc::new(service.into_factory()), disconnect: self.disconnect, _t: PhantomData, } @@ -174,21 +183,25 @@ pub(crate) struct FramedService { _t: PhantomData<(St, Io, Codec, Cfg)>, } -impl NewService for FramedService +impl ServiceFactory for FramedService where St: 'static, - Io: AsyncRead + AsyncWrite, - C: NewService, Response = ConnectResult>, + Io: AsyncRead + AsyncWrite + Unpin, + C: ServiceFactory< + Config = (), + Request = Connect, + Response = ConnectResult, + >, C::Error: 'static, C::Future: 'static, - T: NewService< + T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, > + 'static, - Codec: Decoder + Encoder, + Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -198,23 +211,24 @@ where type Error = ServiceError; type InitError = C::InitError; type Service = FramedServiceImpl; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: &Cfg) -> Self::Future { let handler = self.handler.clone(); let disconnect = self.disconnect.clone(); // create connect service and then create service impl - Box::new( - self.connect - .new_service(&()) - .map(move |connect| FramedServiceImpl { + self.connect + .new_service(&()) + .map(move |result| { + result.map(move |connect| FramedServiceImpl { connect, handler, disconnect, _t: PhantomData, - }), - ) + }) + }) + .boxed_local() } } @@ -227,18 +241,18 @@ pub struct FramedServiceImpl { impl Service for FramedServiceImpl where - Io: AsyncRead + AsyncWrite, + Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, - T: NewService< + T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, - Codec: Decoder + Encoder, + <::Service as Service>::Future: 'static, + Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -247,8 +261,8 @@ where type Error = ServiceError; type Future = FramedServiceImplResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.connect.poll_ready().map_err(|e| e.into()) + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + self.connect.poll_ready(cx).map_err(|e| e.into()) } fn call(&mut self, req: Io) -> Self::Future { @@ -256,108 +270,155 @@ where inner: FramedServiceImplResponseInner::Connect( self.connect.call(Connect::new(req)), self.handler.clone(), + self.disconnect.clone(), ), - disconnect: self.disconnect.clone(), } } } +#[pin_project] pub struct FramedServiceImplResponse where C: Service, Response = ConnectResult>, C::Error: 'static, - T: NewService< + T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { inner: FramedServiceImplResponseInner, - disconnect: Option>, -} - -enum FramedServiceImplResponseInner -where - C: Service, Response = ConnectResult>, - C::Error: 'static, - T: NewService< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - <::Service as Service>::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, -{ - Connect(C::Future, Rc), - Handler(T::Future, Option>), - Dispatcher(FramedDispatcher), } impl Future for FramedServiceImplResponse where C: Service, Response = ConnectResult>, C::Error: 'static, - T: NewService< + T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, - <::Service as Service>::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { - type Item = (); - type Error = ServiceError; + type Output = Result<(), ServiceError>; - fn poll(&mut self) -> Poll { - match self.inner { - FramedServiceImplResponseInner::Connect(ref mut fut, ref handler) => { - match fut.poll()? { - Async::Ready(res) => { - self.inner = FramedServiceImplResponseInner::Handler( - handler.new_service(&res.state), - Some(res), - ); - self.poll() - } - Async::NotReady => Ok(Async::NotReady), - } - } - FramedServiceImplResponseInner::Handler(ref mut fut, ref mut res) => { - match fut.poll()? { - Async::Ready(handler) => { - let res = res.take().unwrap(); - self.inner = - FramedServiceImplResponseInner::Dispatcher(FramedDispatcher::new( - res.framed, - State::new(res.state), - handler, - res.rx, - res.sink, - self.disconnect.clone(), - )); - self.poll() - } - Async::NotReady => Ok(Async::NotReady), - } - } - FramedServiceImplResponseInner::Dispatcher(ref mut fut) => fut.poll(), + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + loop { + match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) { + Either::Left(new) => this.inner = new, + Either::Right(poll) => return poll, + }; + } + } +} + +#[pin_project] +enum FramedServiceImplResponseInner +where + C: Service, Response = ConnectResult>, + C::Error: 'static, + T: ServiceFactory< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + Connect(#[pin] C::Future, Rc, Option>), + Handler( + #[pin] T::Future, + Option>, + Option>, + ), + Dispatcher(FramedDispatcher), +} + +impl FramedServiceImplResponseInner +where + C: Service, Response = ConnectResult>, + C::Error: 'static, + T: ServiceFactory< + Config = St, + Request = RequestItem, + Response = ResponseItem, + Error = C::Error, + InitError = C::Error, + >, + <::Service as Service>::Future: 'static, + Io: AsyncRead + AsyncWrite + Unpin, + Codec: Encoder + Decoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + #[project] + fn poll( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Either< + FramedServiceImplResponseInner, + Poll>>, + > { + #[project] + match self.project() { + FramedServiceImplResponseInner::Connect( + ref mut fut, + ref handler, + ref mut disconnect, + ) => match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( + handler.new_service(&res.state), + Some(res), + disconnect.take(), + )), + Poll::Pending => Either::Right(Poll::Pending), + Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), + }, + FramedServiceImplResponseInner::Handler( + ref mut fut, + ref mut res, + ref mut disconnect, + ) => match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(handler)) => { + let res = res.take().unwrap(); + Either::Left(FramedServiceImplResponseInner::Dispatcher( + FramedDispatcher::new( + res.framed, + State::new(res.state), + handler, + res.rx, + res.sink, + disconnect.take(), + ), + )) + } + Poll::Pending => Either::Right(Poll::Pending), + Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), + }, + FramedServiceImplResponseInner::Dispatcher(ref mut fut) => { + Either::Right(Pin::new(fut).poll(cx)) + } } } } diff --git a/actix-ioframe/src/sink.rs b/actix-ioframe/src/sink.rs index 4ebf7909..b3ecc968 100644 --- a/actix-ioframe/src/sink.rs +++ b/actix-ioframe/src/sink.rs @@ -1,11 +1,11 @@ use std::fmt; -use futures::unsync::{mpsc, oneshot}; -use futures::Future; +use actix_utils::{mpsc, oneshot}; +use futures::future::{Future, FutureExt}; use crate::dispatcher::FramedMessage; -pub struct Sink(mpsc::UnboundedSender>); +pub struct Sink(mpsc::Sender>); impl Clone for Sink { fn clone(&self) -> Self { @@ -14,26 +14,26 @@ impl Clone for Sink { } impl Sink { - pub(crate) fn new(tx: mpsc::UnboundedSender>) -> Self { + pub(crate) fn new(tx: mpsc::Sender>) -> Self { Sink(tx) } /// Close connection pub fn close(&self) { - let _ = self.0.unbounded_send(FramedMessage::Close); + let _ = self.0.send(FramedMessage::Close); } /// Close connection - pub fn wait_close(&self) -> impl Future { + pub fn wait_close(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(FramedMessage::WaitClose(tx)); + let _ = self.0.send(FramedMessage::WaitClose(tx)); - rx.map_err(|_| ()) + rx.map(|_| ()) } /// Send item pub fn send(&self, item: T) { - let _ = self.0.unbounded_send(FramedMessage::Message(item)); + let _ = self.0.send(FramedMessage::Message(item)); } } diff --git a/actix-ioframe/tests/test_server.rs b/actix-ioframe/tests/test_server.rs index c197bb56..f0e0d697 100644 --- a/actix-ioframe/tests/test_server.rs +++ b/actix-ioframe/tests/test_server.rs @@ -4,11 +4,11 @@ use std::time::Duration; use actix_codec::BytesCodec; use actix_server_config::Io; -use actix_service::{new_apply_fn, Service}; +use actix_service::{apply_fn_factory, service_fn, Service}; use actix_testing::{self as test, TestServer}; -use futures::Future; -use tokio_tcp::TcpStream; -use tokio_timer::sleep; +use futures::future::ok; +use tokio_net::tcp::TcpStream; +use tokio_timer::delay_for; use actix_ioframe::{Builder, Connect}; @@ -22,13 +22,15 @@ fn test_disconnect() -> std::io::Result<()> { let srv = TestServer::with(move || { let disconnect1 = disconnect1.clone(); - new_apply_fn( + apply_fn_factory( Builder::new() - .factory(|conn: Connect<_>| Ok(conn.codec(BytesCodec).state(State))) + .factory(service_fn(|conn: Connect<_>| { + ok(conn.codec(BytesCodec).state(State)) + })) .disconnect(move |_, _| { disconnect1.store(true, Ordering::Relaxed); }) - .finish(|_t| Ok(None)), + .finish(service_fn(|_t| ok(None))), |io: Io, srv| srv.call(io.into_parts().0), ) }); @@ -37,9 +39,9 @@ fn test_disconnect() -> std::io::Result<()> { .service(|conn: Connect<_>| { let conn = conn.codec(BytesCodec).state(State); conn.sink().close(); - Ok(conn) + ok(conn) }) - .finish(|_t| Ok(None)); + .finish(service_fn(|_t| ok(None))); let conn = test::block_on( actix_connect::default_connector() @@ -48,11 +50,7 @@ fn test_disconnect() -> std::io::Result<()> { .unwrap(); test::block_on(client.call(conn.into_parts().0)).unwrap(); - let _ = test::block_on( - sleep(Duration::from_millis(100)) - .map(|_| ()) - .map_err(|_| ()), - ); + let _ = test::block_on(delay_for(Duration::from_millis(100))); assert!(disconnect.load(Ordering::Relaxed)); Ok(()) diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index d3e01a41..fd2dab83 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -24,7 +24,13 @@ where pub fn apply_fn_factory( service: U, f: F, -) -> impl ServiceFactory +) -> impl ServiceFactory< + Config = T::Config, + Request = In, + Response = Out, + Error = Err, + InitError = T::InitError, +> where T: ServiceFactory, F: FnMut(In, &mut T::Service) -> R + Clone, diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 67053e12..c57000f4 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -25,7 +25,7 @@ either = "1.5.2" futures = "0.3.1" pin-project = "0.4.5" tokio-timer = "0.3.0-alpha.6" -tokio-executor = "=0.2.0-alpha.6" +tokio-executor = { version="=0.2.0-alpha.6", features=["current-thread"] } log = "0.4" [dev-dependencies] diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index daa40bb8..0951c3db 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -1,4 +1,5 @@ //! Framed dispatcher service and related utilities +#![allow(type_alias_bounds)] use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -265,7 +266,7 @@ where loop { match srv.poll_ready(cx) { Poll::Ready(Ok(_)) => { - let item = match framed.poll_next_item(cx) { + let item = match framed.next_item(cx) { Poll::Ready(Some(Ok(el))) => el, Poll::Ready(Some(Err(err))) => { *state = @@ -365,16 +366,15 @@ where } if !framed.is_write_buf_empty() { - // match this.framed.poll_flush(cx) { - // Poll::Pending => break, - // Poll::Ready(Err(err)) => { - // debug!("Error sending data: {:?}", err); - // self.state = - // TransportState::FramedError(FramedTransportError::Encoder(err)); - // return true; - // } - // Poll::Ready(Ok(_)) => (), - // } + match framed.flush(cx) { + Poll::Pending => break, + Poll::Ready(Err(err)) => { + debug!("Error sending data: {:?}", err); + *state = TransportState::FramedError(FramedTransportError::Encoder(err)); + return true; + } + Poll::Ready(Ok(_)) => (), + } } else { break; }