diff --git a/Cargo.toml b/Cargo.toml index f8e43727..f032478a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "actix-codec", "actix-connect", - "actix-ioframe", "actix-rt", "actix-macros", "actix-service", @@ -19,7 +18,6 @@ members = [ [patch.crates-io] actix-codec = { path = "actix-codec" } actix-connect = { path = "actix-connect" } -actix-ioframe = { path = "actix-ioframe" } actix-rt = { path = "actix-rt" } actix-macros = { path = "actix-macros" } actix-server = { path = "actix-server" } diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md deleted file mode 100644 index 1b332a52..00000000 --- a/actix-ioframe/CHANGES.md +++ /dev/null @@ -1,33 +0,0 @@ -# Changes - -## [0.5.0] - 2019-12-29 - -* Simplify state management - -* Allow to set custom output stream - -* Removed disconnect callback - -## [0.4.1] - 2019-12-11 - -* Disconnect callback accepts owned state - -## [0.4.0] - 2019-12-11 - -* Remove `E` param - -## [0.3.0-alpha.3] - 2019-12-07 - -* Migrate to tokio 0.2 - -## [0.3.0-alpha.2] - 2019-12-02 - -* Migrate to `std::future` - -## [0.1.1] - 2019-10-14 - -* Re-register task on every dispatcher poll. - -## [0.1.0] - 2019-09-25 - -* Initial release diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml deleted file mode 100644 index 0cf431a6..00000000 --- a/actix-ioframe/Cargo.toml +++ /dev/null @@ -1,33 +0,0 @@ -[package] -name = "actix-ioframe" -version = "0.5.0" -authors = ["Nikolay Kim "] -description = "Actix framed service" -keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-ioframe/" -categories = ["network-programming", "asynchronous"] -license = "MIT OR Apache-2.0" -edition = "2018" - -[lib] -name = "actix_ioframe" -path = "src/lib.rs" - -[dependencies] -actix-service = "1.0.1" -actix-codec = "0.2.0" -actix-utils = "1.0.4" -actix-rt = "1.0.0" -bytes = "0.5.3" -either = "1.5.3" -futures-sink = { version = "0.3.4", default-features = false } -futures-core = { version = "0.3.4", default-features = false } -pin-project = "0.4.17" -log = "0.4" - -[dev-dependencies] -actix-connect = "2.0.0-alpha.2" -actix-testing = "1.0.0" -futures-util = { version = "0.3.4", default-features = false } diff --git a/actix-ioframe/LICENSE-APACHE b/actix-ioframe/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-ioframe/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-ioframe/LICENSE-MIT b/actix-ioframe/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-ioframe/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-ioframe/README.md b/actix-ioframe/README.md new file mode 100644 index 00000000..f45c590d --- /dev/null +++ b/actix-ioframe/README.md @@ -0,0 +1,3 @@ +# actix-ioframe + +**This crate has been deprecated and removed.** diff --git a/actix-ioframe/src/connect.rs b/actix-ioframe/src/connect.rs deleted file mode 100644 index 4e2980d1..00000000 --- a/actix-ioframe/src/connect.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_utils::mpsc::Receiver; -use futures_core::stream::Stream; - -pub struct Connect -where - Codec: Encoder + Decoder, -{ - io: Io, - _t: PhantomData, -} - -impl Connect -where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, -{ - pub(crate) fn new(io: Io) -> Self { - Self { - io, - _t: PhantomData, - } - } - - pub fn codec( - self, - codec: Codec, - ) -> ConnectResult::Item>> { - ConnectResult { - state: (), - out: None, - framed: Framed::new(self.io, codec), - } - } -} - -#[pin_project::pin_project] -pub struct ConnectResult { - pub(crate) state: St, - pub(crate) out: Option, - #[pin] - pub(crate) framed: Framed, -} - -impl ConnectResult { - #[inline] - pub fn get_ref(&self) -> &Io { - self.framed.get_ref() - } - - #[inline] - pub fn get_mut(&mut self) -> &mut Io { - self.framed.get_mut() - } - - pub fn out(self, out: U) -> ConnectResult - where - U: Stream::Item> + Unpin, - { - ConnectResult { - state: self.state, - framed: self.framed, - out: Some(out), - } - } - - #[inline] - pub fn state(self, state: S) -> ConnectResult { - ConnectResult { - state, - framed: self.framed, - out: self.out, - } - } -} - -impl Stream for ConnectResult -where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, -{ - type Item = Result<::Item, ::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().framed.next_item(cx) - } -} - -impl futures_sink::Sink<::Item> - for ConnectResult -where - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, -{ - type Error = ::Error; - - fn poll_ready(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - if self.as_mut().project().framed.is_write_ready() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn start_send( - self: Pin<&mut Self>, - item: ::Item, - ) -> Result<(), Self::Error> { - self.project().framed.write(item) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().project().framed.flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().project().framed.close(cx) - } -} diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs deleted file mode 100644 index b7d5dd9f..00000000 --- a/actix-ioframe/src/dispatcher.rs +++ /dev/null @@ -1,248 +0,0 @@ -//! Framed dispatcher service and related utilities -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_service::Service; -use actix_utils::mpsc; -use futures_core::stream::Stream; -use pin_project::pin_project; -use log::debug; - -use crate::error::ServiceError; - -type Request = ::Item; -type Response = ::Item; - -/// FramedTransport - is a future that reads frames from Framed object -/// and pass then to the service. -#[pin_project] -pub(crate) struct Dispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - service: S, - sink: Option, - state: FramedState, - #[pin] - framed: Framed, - rx: mpsc::Receiver::Item, S::Error>>, -} - -impl Dispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - pub(crate) fn new(framed: Framed, service: S, sink: Option) -> Self { - Dispatcher { - sink, - service, - framed, - rx: mpsc::channel().1, - state: FramedState::Processing, - } - } -} - -enum FramedState { - Processing, - Error(ServiceError), - FramedError(ServiceError), - FlushAndStop, - Stopping, -} - -impl FramedState { - fn take_error(&mut self) -> ServiceError { - match std::mem::replace(self, FramedState::Processing) { - FramedState::Error(err) => err, - _ => panic!(), - } - } - - fn take_framed_error(&mut self) -> ServiceError { - match std::mem::replace(self, FramedState::Processing) { - FramedState::FramedError(err) => err, - _ => panic!(), - } - } -} - -impl Dispatcher -where - S: Service, Response = Option>>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool { - loop { - let this = self.as_mut().project(); - match this.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - let item = match this.framed.next_item(cx) { - Poll::Ready(Some(Ok(el))) => el, - Poll::Ready(Some(Err(err))) => { - *this.state = FramedState::FramedError(ServiceError::Decoder(err)); - return true; - } - Poll::Pending => return false, - Poll::Ready(None) => { - log::trace!("Client disconnected"); - *this.state = FramedState::Stopping; - return true; - } - }; - - let tx = this.rx.sender(); - let fut = this.service.call(item); - actix_rt::spawn(async move { - let item = fut.await; - let item = match item { - Ok(Some(item)) => Ok(item), - Ok(None) => return, - Err(err) => Err(err), - }; - let _ = tx.send(item); - }); - } - Poll::Pending => return false, - Poll::Ready(Err(err)) => { - *this.state = FramedState::Error(ServiceError::Service(err)); - return true; - } - } - } - } - - /// write to framed object - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool { - loop { - let mut this = self.as_mut().project(); - while !this.framed.is_write_buf_full() { - match Pin::new(&mut this.rx).poll_next(cx) { - Poll::Ready(Some(Ok(msg))) => { - if let Err(err) = this.framed.as_mut().write(msg) { - *this.state = FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - continue; - } - Poll::Ready(Some(Err(err))) => { - *this.state = FramedState::Error(ServiceError::Service(err)); - return true; - } - Poll::Ready(None) | Poll::Pending => (), - } - - if this.sink.is_some() { - match Pin::new(this.sink.as_mut().unwrap()).poll_next(cx) { - Poll::Ready(Some(msg)) => { - if let Err(err) = this.framed.as_mut().write(msg) { - *this.state = - FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - continue; - } - Poll::Ready(None) => { - let _ = this.sink.take(); - *this.state = FramedState::FlushAndStop; - return true; - } - Poll::Pending => (), - } - } - break; - } - - if !this.framed.is_write_buf_empty() { - match this.framed.as_mut().flush(cx) { - Poll::Pending => break, - Poll::Ready(Ok(_)) => (), - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - *this.state = FramedState::FramedError(ServiceError::Encoder(err)); - return true; - } - } - } else { - break; - } - } - false - } - - pub(crate) fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let mut this = self.as_mut().project(); - match this.state { - FramedState::Processing => loop { - let read = self.as_mut().poll_read(cx); - let write = self.as_mut().poll_write(cx); - if read || write { - continue; - } else { - return Poll::Pending; - } - }, - FramedState::Error(_) => { - // flush write buffer - if !this.framed.is_write_buf_empty() { - if let Poll::Pending = this.framed.flush(cx) { - return Poll::Pending; - } - } - Poll::Ready(Err(this.state.take_error())) - } - FramedState::FlushAndStop => { - // drain service responses - match Pin::new(this.rx).poll_next(cx) { - Poll::Ready(Some(Ok(msg))) => { - if this.framed.as_mut().write(msg).is_err() { - return Poll::Ready(Ok(())); - } - } - Poll::Ready(Some(Err(_))) => return Poll::Ready(Ok(())), - Poll::Ready(None) | Poll::Pending => (), - } - - // flush io - if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - } - Poll::Pending => { - return Poll::Pending; - } - Poll::Ready(_) => (), - } - }; - Poll::Ready(Ok(())) - } - FramedState::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())), - FramedState::Stopping => Poll::Ready(Ok(())), - } - } -} diff --git a/actix-ioframe/src/error.rs b/actix-ioframe/src/error.rs deleted file mode 100644 index 3eb0fed9..00000000 --- a/actix-ioframe/src/error.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::fmt; - -use actix_codec::{Decoder, Encoder}; - -/// Framed service errors -pub enum ServiceError { - /// Inner service error - Service(E), - /// Encoder parse error - Encoder(::Error), - /// Decoder parse error - Decoder(::Error), -} - -impl From for ServiceError { - fn from(err: E) -> Self { - ServiceError::Service(err) - } -} - -impl fmt::Debug for ServiceError -where - E: fmt::Debug, - ::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - ServiceError::Service(ref e) => write!(fmt, "ServiceError::Service({:?})", e), - ServiceError::Encoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e), - ServiceError::Decoder(ref e) => write!(fmt, "ServiceError::Encoder({:?})", e), - } - } -} - -impl fmt::Display for ServiceError -where - E: fmt::Display, - ::Error: fmt::Debug, - ::Error: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - ServiceError::Service(ref e) => write!(fmt, "{}", e), - ServiceError::Encoder(ref e) => write!(fmt, "{:?}", e), - ServiceError::Decoder(ref e) => write!(fmt, "{:?}", e), - } - } -} diff --git a/actix-ioframe/src/lib.rs b/actix-ioframe/src/lib.rs deleted file mode 100644 index 3f82a29f..00000000 --- a/actix-ioframe/src/lib.rs +++ /dev/null @@ -1,11 +0,0 @@ -// #![deny(rust_2018_idioms, warnings)] -#![allow(clippy::type_complexity, clippy::too_many_arguments)] - -mod connect; -mod dispatcher; -mod error; -mod service; - -pub use self::connect::{Connect, ConnectResult}; -pub use self::error::ServiceError; -pub use self::service::{Builder, FactoryBuilder}; diff --git a/actix-ioframe/src/service.rs b/actix-ioframe/src/service.rs deleted file mode 100644 index f3b5ab85..00000000 --- a/actix-ioframe/src/service.rs +++ /dev/null @@ -1,413 +0,0 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; -use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -use either::Either; -use futures_core::{ready, stream::Stream}; - -use crate::connect::{Connect, ConnectResult}; -use crate::dispatcher::Dispatcher; -use crate::error::ServiceError; - -type RequestItem = ::Item; -type ResponseItem = Option<::Item>; - -/// Service builder - structure that follows the builder pattern -/// for building instances for framed services. -pub struct Builder { - connect: C, - _t: PhantomData<(St, Io, Codec, Out)>, -} - -impl Builder -where - C: Service, Response = ConnectResult>, - Io: AsyncRead + AsyncWrite, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - /// Construct framed handler service with specified connect service - pub fn new(connect: F) -> Builder - where - F: IntoService, - Io: AsyncRead + AsyncWrite, - C: Service, Response = ConnectResult>, - Codec: Decoder + Encoder, - Out: Stream::Item>, - { - Builder { - connect: connect.into_service(), - _t: PhantomData, - } - } - - /// Provide stream items handler service and construct service factory. - pub fn build(self, service: F) -> FramedServiceImpl - where - F: IntoServiceFactory, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - { - FramedServiceImpl { - connect: self.connect, - handler: Rc::new(service.into_factory()), - _t: PhantomData, - } - } -} - -/// Service builder - structure that follows the builder pattern -/// for building instances for framed services. -pub struct FactoryBuilder { - connect: C, - _t: PhantomData<(St, Io, Codec, Out)>, -} - -impl FactoryBuilder -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - Codec: Decoder + Encoder, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - /// Construct framed handler new service with specified connect service - pub fn new(connect: F) -> FactoryBuilder - where - F: IntoServiceFactory, - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - Codec: Decoder + Encoder, - Out: Stream::Item> + Unpin, - { - FactoryBuilder { - connect: connect.into_factory(), - _t: PhantomData, - } - } - - pub fn build(self, service: F) -> FramedService - where - F: IntoServiceFactory, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - { - FramedService { - connect: self.connect, - handler: Rc::new(service.into_factory()), - _t: PhantomData, - } - } -} - -pub struct FramedService { - connect: C, - handler: Rc, - _t: PhantomData<(St, Io, Codec, Out, Cfg)>, -} - -impl ServiceFactory - for FramedService -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Config = Cfg; - type Request = Io; - type Response = (); - type Error = ServiceError; - type InitError = C::InitError; - type Service = FramedServiceImpl; - type Future = FramedServiceResponse; - - fn new_service(&self, _: Cfg) -> Self::Future { - // create connect service and then create service impl - FramedServiceResponse { - fut: self.connect.new_service(()), - handler: self.handler.clone(), - } - } -} - -#[pin_project::pin_project] -pub struct FramedServiceResponse -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - #[pin] - fut: C::Future, - handler: Rc, -} - -impl Future for FramedServiceResponse -where - Io: AsyncRead + AsyncWrite, - C: ServiceFactory< - Config = (), - Request = Connect, - Response = ConnectResult, - >, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Output = Result, C::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let connect = ready!(this.fut.poll(cx))?; - - Poll::Ready(Ok(FramedServiceImpl { - connect, - handler: this.handler.clone(), - _t: PhantomData, - })) - } -} - -pub struct FramedServiceImpl { - connect: C, - handler: Rc, - _t: PhantomData<(St, Io, Codec, Out)>, -} - -impl Service for FramedServiceImpl -where - Io: AsyncRead + AsyncWrite, - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Codec: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Request = Io; - type Response = (); - type Error = ServiceError; - type Future = FramedServiceImplResponse; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.connect.poll_ready(cx).map_err(|e| e.into()) - } - - fn call(&mut self, req: Io) -> Self::Future { - FramedServiceImplResponse { - inner: FramedServiceImplResponseInner::Connect( - self.connect.call(Connect::new(req)), - self.handler.clone(), - ), - } - } -} - -#[pin_project::pin_project] -pub struct FramedServiceImplResponse -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - #[pin] - inner: FramedServiceImplResponseInner, -} - -impl Future for FramedServiceImplResponse -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - type Output = Result<(), ServiceError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - loop { - match this.inner.poll(cx) { - Either::Left(new) => { - this = self.as_mut().project(); - this.inner.set(new) - } - Either::Right(poll) => return poll, - }; - } - } -} - -#[pin_project::pin_project(project = FramedServiceImplResponseInnerProj)] -enum FramedServiceImplResponseInner -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - Connect(#[pin] C::Future, Rc), - Handler(#[pin] T::Future, Option>, Option), - Dispatcher(#[pin] Dispatcher), -} - -impl FramedServiceImplResponseInner -where - C: Service, Response = ConnectResult>, - T: ServiceFactory< - Config = St, - Request = RequestItem, - Response = ResponseItem, - Error = C::Error, - InitError = C::Error, - >, - ::Error: 'static, - ::Future: 'static, - Io: AsyncRead + AsyncWrite, - Codec: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, - Out: Stream::Item> + Unpin, -{ - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Either< - FramedServiceImplResponseInner, - Poll>>, - > { - match self.project() { - FramedServiceImplResponseInnerProj::Connect(fut, handler) => match fut.poll(cx) { - Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( - handler.new_service(res.state), - Some(res.framed), - res.out, - )), - Poll::Pending => Either::Right(Poll::Pending), - Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), - }, - FramedServiceImplResponseInnerProj::Handler(fut, framed, out) => { - match fut.poll(cx) { - Poll::Ready(Ok(handler)) => { - Either::Left(FramedServiceImplResponseInner::Dispatcher( - Dispatcher::new(framed.take().unwrap(), handler, out.take()), - )) - } - Poll::Pending => Either::Right(Poll::Pending), - Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), - } - } - FramedServiceImplResponseInnerProj::Dispatcher(fut) => { - Either::Right(fut.poll(cx)) - } - } - } -} diff --git a/actix-ioframe/tests/test_server.rs b/actix-ioframe/tests/test_server.rs deleted file mode 100644 index 9d3775b3..00000000 --- a/actix-ioframe/tests/test_server.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::cell::Cell; -use std::rc::Rc; - -use actix_codec::BytesCodec; -use actix_service::{fn_factory_with_config, fn_service, IntoService, Service}; -use actix_testing::TestServer; -use actix_utils::mpsc; -use bytes::{Bytes, BytesMut}; -use futures_util::future::ok; - -use actix_ioframe::{Builder, Connect, FactoryBuilder}; - -#[derive(Clone)] -struct State(Option>); - -#[actix_rt::test] -async fn test_basic() { - let client_item = Rc::new(Cell::new(false)); - - let srv = TestServer::with(move || { - FactoryBuilder::new(fn_service(|conn: Connect<_, _>| { - ok(conn.codec(BytesCodec).state(State(None))) - })) - // echo - .build(fn_service(|t: BytesMut| ok(Some(t.freeze())))) - }); - - let item = client_item.clone(); - let mut client = Builder::new(fn_service(move |conn: Connect<_, _>| { - async move { - let (tx, rx) = mpsc::channel(); - let _ = tx.send(Bytes::from_static(b"Hello")); - Ok(conn.codec(BytesCodec).out(rx).state(State(Some(tx)))) - } - })) - .build(fn_factory_with_config(move |mut cfg: State| { - let item = item.clone(); - ok((move |t: BytesMut| { - assert_eq!(t.freeze(), Bytes::from_static(b"Hello")); - item.set(true); - // drop Sender, which will close connection - cfg.0.take(); - ok::<_, ()>(None) - }) - .into_service()) - })); - - let conn = actix_connect::default_connector() - .call(actix_connect::Connect::with(String::new(), srv.addr())) - .await - .unwrap(); - - client.call(conn.into_parts().0).await.unwrap(); - assert!(client_item.get()); -}