diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 980ff5e4..044483f4 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -1,14 +1,17 @@ //! Framed dispatcher service and related utilities +use std::collections::VecDeque; use std::marker::PhantomData; use std::mem; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; -use futures::unsync::mpsc; +use futures::task::AtomicTask; use futures::{Async, Future, Poll, Sink, Stream}; use log::debug; +use crate::cell::Cell; + type Request<U> = <U as Decoder>::Item; type Response<U> = <U as Encoder>::Item; @@ -19,13 +22,11 @@ pub struct FramedNewService<S, T, U> { impl<S, T, U> FramedNewService<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService<Request<U>, Response = Response<U>>, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Future: 'static, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { pub fn new<F1: IntoNewService<S, Request<U>>>(factory: F1) -> Self { Self { @@ -49,13 +50,11 @@ where impl<S, T, U> NewService<Framed<T, U>> for FramedNewService<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService<Request<U>, Response = Response<U>> + Clone, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Future: 'static, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { type Response = FramedTransport<S::Service, T, U>; type Error = S::InitError; @@ -90,13 +89,11 @@ where impl<S, T, U> Service<Framed<T, U>> for FramedService<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService<Request<U>, Response = Response<U>>, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Future: 'static, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { type Response = FramedTransport<S::Service, T, U>; type Error = S::InitError; @@ -118,13 +115,11 @@ where #[doc(hidden)] pub struct FramedServiceResponseFuture<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService<Request<U>, Response = Response<U>>, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Future: 'static, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { fut: S::Future, framed: Option<Framed<T, U>>, @@ -132,13 +127,11 @@ where impl<S, T, U> Future for FramedServiceResponseFuture<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, S: NewService<Request<U>, Response = Response<U>>, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Future: 'static, - <<S as NewService<Request<U>>>::Service as Service<Request<U>>>::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: 'static, + S::Service: 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { type Item = FramedTransport<S::Service, T, U>; type Error = S::InitError; @@ -171,16 +164,13 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> { /// and pass then to the service. pub struct FramedTransport<S, T, U> where - S: Service<Request<U>, Response = Response<U>>, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, + S: Service<Request<U>, Response = Response<U>> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Encoder + Decoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { - service: S, - state: TransportState<S, U>, - framed: Framed<T, U>, - request: Option<Request<U>>, - write_rx: mpsc::Receiver<Result<Response<U>, S::Error>>, - write_tx: mpsc::Sender<Result<Response<U>, S::Error>>, + inner: Cell<FramedTransportInner<S, T, U>>, + inner2: Cell<FramedTransportInner<S, T, U>>, } enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> { @@ -190,74 +180,31 @@ enum TransportState<S: Service<Request<U>>, U: Encoder + Decoder> { Stopping, } -impl<S, T, U> FramedTransport<S, T, U> +struct FramedTransportInner<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - S: Service<Request<U>, Response = Response<U>>, - S::Future: 'static, - S::Error: 'static, - <U as Encoder>::Error: 'static, + S: Service<Request<U>, Response = Response<U>> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Encoder + Decoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { - pub fn new<F: IntoService<S, Request<U>>>(framed: Framed<T, U>, service: F) -> Self { - let (write_tx, write_rx) = mpsc::channel(16); - FramedTransport { - framed, - write_rx, - write_tx, - service: service.into_service(), - state: TransportState::Processing, - request: None, - } - } - - /// Get reference to a service wrapped by `FramedTransport` instance. - pub fn get_ref(&self) -> &S { - &self.service - } - - /// Get mutable reference to a service wrapped by `FramedTransport` - /// instance. - pub fn get_mut(&mut self) -> &mut S { - &mut self.service - } - - /// Get reference to a framed instance wrapped by `FramedTransport` - /// instance. - pub fn get_framed(&self) -> &Framed<T, U> { - &self.framed - } - - /// Get mutable reference to a framed instance wrapped by `FramedTransport` - /// instance. - pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> { - &mut self.framed - } + service: S, + state: TransportState<S, U>, + framed: Framed<T, U>, + buf: VecDeque<Result<Response<U>, S::Error>>, + task: AtomicTask, } -impl<S, T, U> FramedTransport<S, T, U> +impl<S, T, U> FramedTransportInner<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - S: Service<Request<U>, Response = Response<U>>, - S::Future: 'static, - S::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: std::fmt::Debug + 'static, + S: Service<Request<U>, Response = Response<U>> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { - fn poll_service(&mut self) -> bool { - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - if let Some(item) = self.request.take() { - let sender = self.write_tx.clone(); - tokio_current_thread::spawn( - self.service - .call(item) - .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), - ); - } - - loop { + fn poll_service(&mut self, cell: &Cell<FramedTransportInner<S, T, U>>) -> bool { + loop { + match self.service.poll_ready() { + Ok(Async::Ready(_)) => loop { let item = match self.framed.poll() { Ok(Async::Ready(Some(el))) => el, Err(err) => { @@ -272,32 +219,21 @@ where } }; - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - let sender = self.write_tx.clone(); - tokio_current_thread::spawn( - self.service - .call(item) - .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), - ); - } - Ok(Async::NotReady) => { - self.request = Some(item); - return false; - } - Err(err) => { - self.state = - TransportState::Error(FramedTransportError::Service(err)); - return true; - } - } + self.task.register(); + let mut cell = cell.clone(); + tokio_current_thread::spawn(self.service.call(item).then(move |item| { + let inner = cell.get_mut(); + inner.buf.push_back(item); + inner.task.notify(); + Ok(()) + })); + }, + Ok(Async::NotReady) => return false, + Err(err) => { + self.state = TransportState::Error(FramedTransportError::Service(err)); + return true; } } - Ok(Async::NotReady) => false, - Err(err) => { - self.state = TransportState::Error(FramedTransportError::Service(err)); - true - } } } @@ -305,8 +241,8 @@ where fn poll_response(&mut self) -> bool { loop { while !self.framed.is_write_buf_full() { - match self.write_rx.poll() { - Ok(Async::Ready(Some(msg))) => match msg { + if let Some(msg) = self.buf.pop_front() { + match msg { Ok(msg) => { if let Err(err) = self.framed.force_send(msg) { self.state = TransportState::FramedError( @@ -320,10 +256,9 @@ where TransportState::Error(FramedTransportError::Service(err)); return true; } - }, - Ok(Async::NotReady) => break, - Err(_) => panic!("Bug in actix-net code"), - Ok(Async::Ready(None)) => panic!("Bug in actix-net code"), + } + } else { + break; } } @@ -347,35 +282,80 @@ where } } +impl<S, T, U> FramedTransport<S, T, U> +where + S: Service<Request<U>, Response = Response<U>> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, +{ + pub fn new<F: IntoService<S, Request<U>>>(framed: Framed<T, U>, service: F) -> Self { + let inner = Cell::new(FramedTransportInner { + framed, + service: service.into_service(), + state: TransportState::Processing, + buf: VecDeque::new(), + task: AtomicTask::new(), + }); + + FramedTransport { + inner2: inner.clone(), + inner, + } + } + + /// Get reference to a service wrapped by `FramedTransport` instance. + pub fn get_ref(&self) -> &S { + &self.inner.get_ref().service + } + + /// Get mutable reference to a service wrapped by `FramedTransport` + /// instance. + pub fn get_mut(&mut self) -> &mut S { + &mut self.inner.get_mut().service + } + + /// Get reference to a framed instance wrapped by `FramedTransport` + /// instance. + pub fn get_framed(&self) -> &Framed<T, U> { + &self.inner.get_ref().framed + } + + /// Get mutable reference to a framed instance wrapped by `FramedTransport` + /// instance. + pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> { + &mut self.inner.get_mut().framed + } +} + impl<S, T, U> Future for FramedTransport<S, T, U> where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - S: Service<Request<U>, Response = Response<U>>, - S::Future: 'static, - S::Error: 'static, - <U as Encoder>::Item: 'static, - <U as Encoder>::Error: std::fmt::Debug + 'static, + S: Service<Request<U>, Response = Response<U>> + 'static, + T: AsyncRead + AsyncWrite + 'static, + U: Decoder + Encoder + 'static, + <U as Encoder>::Error: std::fmt::Debug, { type Item = (); type Error = FramedTransportError<S::Error, U>; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match mem::replace(&mut self.state, TransportState::Processing) { + let inner = self.inner.get_mut(); + + match mem::replace(&mut inner.state, TransportState::Processing) { TransportState::Processing => { - if self.poll_service() || self.poll_response() { + if inner.poll_service(&self.inner2) || inner.poll_response() { self.poll() } else { Ok(Async::NotReady) } } TransportState::Error(err) => { - if self.framed.is_write_buf_empty() - || (self.poll_response() || self.framed.is_write_buf_empty()) + if inner.framed.is_write_buf_empty() + || (inner.poll_response() || inner.framed.is_write_buf_empty()) { Err(err) } else { - self.state = TransportState::Error(err); + inner.state = TransportState::Error(err); Ok(Async::NotReady) } }