diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index e81d1402..95a24764 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -16,14 +16,10 @@ name = "actix_codec" path = "src/lib.rs" [dependencies] -actix-rt = { version = "2.0.0", default-features = false } -actix-service = "2.0.0-beta.5" - bitflags = "1.2.1" bytes = "1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } -local-channel = "0.1.0" log = "0.4" pin-project-lite = "0.2" tokio = "1" diff --git a/actix-codec/src/dispatcher.rs b/actix-codec/src/dispatcher.rs deleted file mode 100644 index 6aea7e22..00000000 --- a/actix-codec/src/dispatcher.rs +++ /dev/null @@ -1,349 +0,0 @@ -//! Framed dispatcher service and related utilities. - -#![allow(type_alias_bounds)] - -use core::{ - fmt, - future::Future, - mem, - pin::Pin, - task::{Context, Poll}, -}; - -use actix_service::{IntoService, Service}; -use futures_core::stream::Stream; -use local_channel::mpsc; -use log::debug; -use pin_project_lite::pin_project; - -use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; - -/// Framed transport errors -pub enum DispatcherError + Decoder, I> { - /// Inner service error. - Service(E), - - /// Frame encoding error. - Encoder(>::Error), - - /// Frame decoding error. - Decoder(::Error), -} - -impl + Decoder, I> From for DispatcherError { - fn from(err: E) -> Self { - DispatcherError::Service(err) - } -} - -impl + Decoder, I> fmt::Debug for DispatcherError -where - E: fmt::Debug, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - DispatcherError::Service(ref e) => write!(fmt, "DispatcherError::Service({:?})", e), - DispatcherError::Encoder(ref e) => write!(fmt, "DispatcherError::Encoder({:?})", e), - DispatcherError::Decoder(ref e) => write!(fmt, "DispatcherError::Decoder({:?})", e), - } - } -} - -impl + Decoder, I> fmt::Display for DispatcherError -where - E: fmt::Display, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - DispatcherError::Service(ref e) => write!(fmt, "{}", e), - DispatcherError::Encoder(ref e) => write!(fmt, "{:?}", e), - DispatcherError::Decoder(ref e) => write!(fmt, "{:?}", e), - } - } -} - -/// Message type wrapper for signalling end of message stream. -pub enum Message { - /// Message item. - Item(T), - - /// Signal from service to flush all messages and stop processing. - Close, -} - -pin_project! { - /// A future that reads frames from a [`Framed`] object and passes them to a [`Service`]. - pub struct Dispatcher - where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead, - T: AsyncWrite, - U: Encoder, - U: Decoder, - I: 'static, - >::Error: fmt::Debug, - { - service: S, - state: State, - #[pin] - framed: Framed, - rx: mpsc::Receiver, S::Error>>, - tx: mpsc::Sender, S::Error>>, - } -} - -enum State -where - S: Service<::Item>, - U: Encoder + Decoder, -{ - Processing, - Error(DispatcherError), - FramedError(DispatcherError), - FlushAndStop, - Stopping, -} - -impl State -where - S: Service<::Item>, - U: Encoder + Decoder, -{ - fn take_error(&mut self) -> DispatcherError { - match mem::replace(self, State::Processing) { - State::Error(err) => err, - _ => panic!(), - } - } - - fn take_framed_error(&mut self) -> DispatcherError { - match mem::replace(self, State::Processing) { - State::FramedError(err) => err, - _ => panic!(), - } - } -} - -impl Dispatcher -where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - ::Error: fmt::Debug, - >::Error: fmt::Debug, -{ - /// Create new `Dispatcher`. - pub fn new(framed: Framed, service: F) -> Self - where - F: IntoService::Item>, - { - let (tx, rx) = mpsc::channel(); - Dispatcher { - framed, - rx, - tx, - service: service.into_service(), - state: State::Processing, - } - } - - /// Construct new `Dispatcher` instance with customer `mpsc::Receiver` - pub fn with_rx( - framed: Framed, - service: F, - rx: mpsc::Receiver, S::Error>>, - ) -> Self - where - F: IntoService::Item>, - { - let tx = rx.sender(); - Dispatcher { - framed, - rx, - tx, - service: service.into_service(), - state: State::Processing, - } - } - - /// Get sender handle. - pub fn tx(&self) -> mpsc::Sender, S::Error>> { - self.tx.clone() - } - - /// Get reference to a service wrapped by `Dispatcher` instance. - pub fn service(&self) -> &S { - &self.service - } - - /// Get mutable reference to a service wrapped by `Dispatcher` instance. - pub fn service_mut(&mut self) -> &mut S { - &mut self.service - } - - /// Get reference to a framed instance wrapped by `Dispatcher` instance. - pub fn framed(&self) -> &Framed { - &self.framed - } - - /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. - pub fn framed_mut(&mut self) -> &mut Framed { - &mut self.framed - } - - /// Read from framed object. - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool - where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - >::Error: fmt::Debug, - { - loop { - let this = self.as_mut().project(); - match this.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - let item = match this.framed.next_item(cx) { - Poll::Ready(Some(Ok(el))) => el, - Poll::Ready(Some(Err(err))) => { - *this.state = State::FramedError(DispatcherError::Decoder(err)); - return true; - } - Poll::Pending => return false, - Poll::Ready(None) => { - *this.state = State::Stopping; - return true; - } - }; - - let tx = this.tx.clone(); - let fut = this.service.call(item); - actix_rt::spawn(async move { - let item = fut.await; - let _ = tx.send(item.map(Message::Item)); - }); - } - Poll::Pending => return false, - Poll::Ready(Err(err)) => { - *this.state = State::Error(DispatcherError::Service(err)); - return true; - } - } - } - } - - /// Write to framed object. - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool - where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - >::Error: fmt::Debug, - { - loop { - let mut this = self.as_mut().project(); - while !this.framed.is_write_buf_full() { - match Pin::new(&mut this.rx).poll_next(cx) { - Poll::Ready(Some(Ok(Message::Item(msg)))) => { - if let Err(err) = this.framed.as_mut().write(msg) { - *this.state = State::FramedError(DispatcherError::Encoder(err)); - return true; - } - } - Poll::Ready(Some(Ok(Message::Close))) => { - *this.state = State::FlushAndStop; - return true; - } - Poll::Ready(Some(Err(err))) => { - *this.state = State::Error(DispatcherError::Service(err)); - return true; - } - Poll::Ready(None) | Poll::Pending => break, - } - } - - if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Pending => break, - Poll::Ready(Ok(_)) => {} - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - *this.state = State::FramedError(DispatcherError::Encoder(err)); - return true; - } - } - } else { - break; - } - } - - false - } -} - -impl Future for Dispatcher -where - S: Service<::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - I: 'static, - >::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - type Output = Result<(), DispatcherError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - let this = self.as_mut().project(); - - return match this.state { - State::Processing => { - if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) { - continue; - } else { - Poll::Pending - } - } - State::Error(_) => { - // flush write buffer - if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() { - return Poll::Pending; - } - Poll::Ready(Err(this.state.take_error())) - } - State::FlushAndStop => { - if !this.framed.is_write_buf_empty() { - this.framed.flush(cx).map(|res| { - if let Err(err) = res { - debug!("Error sending data: {:?}", err); - } - - Ok(()) - }) - } else { - Poll::Ready(Ok(())) - } - } - State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())), - State::Stopping => Poll::Ready(Ok(())), - }; - } - } -} diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index c3c36342..dec30ba6 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -13,7 +13,6 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] mod bcodec; -pub mod dispatcher; mod framed; pub use self::bcodec::BytesCodec; diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 6d763d79..620cbf51 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -22,7 +22,6 @@ path = "src/lib.rs" default = [] [dependencies] -actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.5" actix-utils = "3.0.0-beta.2" @@ -35,7 +34,9 @@ slab = "0.4" tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] +actix-codec = "0.4.0-beta.1" actix-rt = "2.0.0" + bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }