mirror of https://github.com/fafhrd91/actix-net
migrate actix-ioframe to std::future
This commit is contained in:
parent
a87591769c
commit
5271ba5ebd
|
@ -223,17 +223,22 @@ impl<T, U> Framed<T, U> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> Framed<T, U>
|
impl<T, U> Framed<T, U> {
|
||||||
where
|
pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
|
||||||
T: AsyncRead + Unpin,
|
where
|
||||||
U: Decoder + Unpin,
|
T: AsyncRead + Unpin,
|
||||||
{
|
U: Decoder + Unpin,
|
||||||
pub fn poll_next_item(
|
{
|
||||||
&mut self,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
) -> Poll<Option<Result<U::Item, U::Error>>> {
|
|
||||||
Pin::new(&mut self.inner).poll_next(cx)
|
Pin::new(&mut self.inner).poll_next(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
|
||||||
|
where
|
||||||
|
T: AsyncWrite + Unpin,
|
||||||
|
U: Encoder + Unpin,
|
||||||
|
{
|
||||||
|
Pin::new(self.inner.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> Stream for Framed<T, U>
|
impl<T, U> Stream for Framed<T, U>
|
||||||
|
|
|
@ -18,18 +18,20 @@ name = "actix_ioframe"
|
||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-service = "0.4.1"
|
actix-service = "1.0.0-alpha.1"
|
||||||
actix-codec = "0.1.2"
|
actix-codec = "0.2.0-alpha.1"
|
||||||
|
actix-utils = "0.5.0-alpha.1"
|
||||||
bytes = "0.4"
|
bytes = "0.4"
|
||||||
either = "1.5.2"
|
either = "1.5.2"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
|
pin-project = "0.4.5"
|
||||||
tokio-executor = "=0.2.0-alpha.6"
|
tokio-executor = "=0.2.0-alpha.6"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "0.2.2"
|
actix-rt = "1.0.0-alpha.1"
|
||||||
actix-connect = "0.3.0"
|
actix-connect = "1.0.0-alpha.1"
|
||||||
actix-testing = "0.2.0"
|
actix-testing = "0.2.0"
|
||||||
actix-server-config = "0.2.0"
|
actix-server-config = "0.2.0"
|
||||||
tokio-tcp = "0.1"
|
tokio-net = "=0.2.0-alpha.6"
|
||||||
tokio-timer = "0.2"
|
tokio-timer = "=0.3.0-alpha.6"
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||||
use futures::unsync::mpsc;
|
use actix_utils::mpsc;
|
||||||
|
use futures::Stream;
|
||||||
|
use pin_project::pin_project;
|
||||||
|
|
||||||
use crate::dispatcher::FramedMessage;
|
use crate::dispatcher::FramedMessage;
|
||||||
use crate::sink::Sink;
|
use crate::sink::Sink;
|
||||||
|
@ -13,7 +17,7 @@ pub struct Connect<Io, St = (), Codec = ()> {
|
||||||
|
|
||||||
impl<Io> Connect<Io>
|
impl<Io> Connect<Io>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
{
|
{
|
||||||
pub(crate) fn new(io: Io) -> Self {
|
pub(crate) fn new(io: Io) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -24,9 +28,9 @@ where
|
||||||
|
|
||||||
pub fn codec<Codec>(self, codec: Codec) -> ConnectResult<Io, (), Codec>
|
pub fn codec<Codec>(self, codec: Codec) -> ConnectResult<Io, (), Codec>
|
||||||
where
|
where
|
||||||
Codec: Encoder + Decoder,
|
Codec: Encoder + Decoder + Unpin,
|
||||||
{
|
{
|
||||||
let (tx, rx) = mpsc::unbounded();
|
let (tx, rx) = mpsc::channel();
|
||||||
let sink = Sink::new(tx);
|
let sink = Sink::new(tx);
|
||||||
|
|
||||||
ConnectResult {
|
ConnectResult {
|
||||||
|
@ -38,10 +42,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[pin_project]
|
||||||
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
|
pub struct ConnectResult<Io, St, Codec: Encoder + Decoder> {
|
||||||
pub(crate) state: St,
|
pub(crate) state: St,
|
||||||
|
#[pin]
|
||||||
pub(crate) framed: Framed<Io, Codec>,
|
pub(crate) framed: Framed<Io, Codec>,
|
||||||
pub(crate) rx: mpsc::UnboundedReceiver<FramedMessage<<Codec as Encoder>::Item>>,
|
pub(crate) rx: mpsc::Receiver<FramedMessage<<Codec as Encoder>::Item>>,
|
||||||
pub(crate) sink: Sink<<Codec as Encoder>::Item>,
|
pub(crate) sink: Sink<<Codec as Encoder>::Item>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,39 +78,41 @@ impl<Io, St, Codec: Encoder + Decoder> ConnectResult<Io, St, Codec> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io, St, Codec> futures::Stream for ConnectResult<Io, St, Codec>
|
impl<Io, St, Codec> Stream for ConnectResult<Io, St, Codec>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
Codec: Encoder + Decoder,
|
Codec: Encoder + Decoder + Unpin,
|
||||||
{
|
{
|
||||||
type Item = <Codec as Decoder>::Item;
|
type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>;
|
||||||
type Error = <Codec as Decoder>::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
self.framed.poll()
|
self.project().framed.poll_next(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io, St, Codec> futures::Sink for ConnectResult<Io, St, Codec>
|
impl<Io, St, Codec> futures::Sink<<Codec as Encoder>::Item> for ConnectResult<Io, St, Codec>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
Codec: Encoder + Decoder,
|
Codec: Encoder + Decoder + Unpin,
|
||||||
{
|
{
|
||||||
type SinkItem = <Codec as Encoder>::Item;
|
type Error = <Codec as Encoder>::Error;
|
||||||
type SinkError = <Codec as Encoder>::Error;
|
|
||||||
|
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.project().framed.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
fn start_send(
|
fn start_send(
|
||||||
&mut self,
|
self: Pin<&mut Self>,
|
||||||
item: Self::SinkItem,
|
item: <Codec as Encoder>::Item,
|
||||||
) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
|
) -> Result<(), Self::Error> {
|
||||||
self.framed.start_send(item)
|
self.project().framed.start_send(item)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
self.framed.poll_complete()
|
self.project().framed.poll_flush(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
|
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
self.framed.close()
|
self.project().framed.poll_close(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,17 @@
|
||||||
//! Framed dispatcher service and related utilities
|
//! Framed dispatcher service and related utilities
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::future::Future;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||||
use actix_service::{IntoService, Service};
|
use actix_service::{IntoService, Service};
|
||||||
use futures::task::AtomicTask;
|
use actix_utils::task::LocalWaker;
|
||||||
use futures::unsync::{mpsc, oneshot};
|
use actix_utils::{mpsc, oneshot};
|
||||||
use futures::{Async, Future, Poll, Sink as FutureSink, Stream};
|
use futures::future::ready;
|
||||||
|
use futures::{FutureExt, Sink as FutureSink, Stream};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
use crate::cell::Cell;
|
use crate::cell::Cell;
|
||||||
|
@ -27,13 +31,14 @@ pub(crate) enum FramedMessage<T> {
|
||||||
|
|
||||||
/// FramedTransport - is a future that reads frames from Framed object
|
/// FramedTransport - is a future that reads frames from Framed object
|
||||||
/// and pass then to the service.
|
/// and pass then to the service.
|
||||||
|
#[pin_project::pin_project]
|
||||||
pub(crate) struct FramedDispatcher<St, S, T, U>
|
pub(crate) struct FramedDispatcher<St, S, T, U>
|
||||||
where
|
where
|
||||||
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
||||||
S::Error: 'static,
|
S::Error: 'static,
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
U: Encoder + Decoder,
|
U: Encoder + Decoder + Unpin,
|
||||||
<U as Encoder>::Item: 'static,
|
<U as Encoder>::Item: 'static,
|
||||||
<U as Encoder>::Error: std::fmt::Debug,
|
<U as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -42,7 +47,7 @@ where
|
||||||
state: State<St>,
|
state: State<St>,
|
||||||
dispatch_state: FramedState<S, U>,
|
dispatch_state: FramedState<S, U>,
|
||||||
framed: Framed<T, U>,
|
framed: Framed<T, U>,
|
||||||
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
|
rx: Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
|
||||||
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
|
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
|
||||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||||
}
|
}
|
||||||
|
@ -52,8 +57,8 @@ where
|
||||||
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
||||||
S::Error: 'static,
|
S::Error: 'static,
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
U: Decoder + Encoder,
|
U: Decoder + Encoder + Unpin,
|
||||||
<U as Encoder>::Item: 'static,
|
<U as Encoder>::Item: 'static,
|
||||||
<U as Encoder>::Error: std::fmt::Debug,
|
<U as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -61,7 +66,7 @@ where
|
||||||
framed: Framed<T, U>,
|
framed: Framed<T, U>,
|
||||||
state: State<St>,
|
state: State<St>,
|
||||||
service: F,
|
service: F,
|
||||||
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
|
rx: mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>,
|
||||||
sink: Sink<<U as Encoder>::Item>,
|
sink: Sink<<U as Encoder>::Item>,
|
||||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
@ -75,13 +80,13 @@ where
|
||||||
dispatch_state: FramedState::Processing,
|
dispatch_state: FramedState::Processing,
|
||||||
inner: Cell::new(FramedDispatcherInner {
|
inner: Cell::new(FramedDispatcherInner {
|
||||||
buf: VecDeque::new(),
|
buf: VecDeque::new(),
|
||||||
task: AtomicTask::new(),
|
task: LocalWaker::new(),
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum FramedState<S: Service, U: Encoder + Decoder> {
|
enum FramedState<S: Service, U: Encoder + Decoder + Unpin> {
|
||||||
Processing,
|
Processing,
|
||||||
Error(ServiceError<S::Error, U>),
|
Error(ServiceError<S::Error, U>),
|
||||||
FramedError(ServiceError<S::Error, U>),
|
FramedError(ServiceError<S::Error, U>),
|
||||||
|
@ -89,7 +94,7 @@ enum FramedState<S: Service, U: Encoder + Decoder> {
|
||||||
Stopping,
|
Stopping,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
|
impl<S: Service, U: Encoder + Decoder + Unpin> FramedState<S, U> {
|
||||||
fn stop(&mut self, tx: Option<oneshot::Sender<()>>) {
|
fn stop(&mut self, tx: Option<oneshot::Sender<()>>) {
|
||||||
match self {
|
match self {
|
||||||
FramedState::FlushAndStop(ref mut vec) => {
|
FramedState::FlushAndStop(ref mut vec) => {
|
||||||
|
@ -115,149 +120,7 @@ impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
|
||||||
|
|
||||||
struct FramedDispatcherInner<I, E> {
|
struct FramedDispatcherInner<I, E> {
|
||||||
buf: VecDeque<Result<I, E>>,
|
buf: VecDeque<Result<I, E>>,
|
||||||
task: AtomicTask,
|
task: LocalWaker,
|
||||||
}
|
|
||||||
|
|
||||||
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
|
|
||||||
where
|
|
||||||
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
|
||||||
S::Error: 'static,
|
|
||||||
S::Future: 'static,
|
|
||||||
T: AsyncRead + AsyncWrite,
|
|
||||||
U: Decoder + Encoder,
|
|
||||||
<U as Encoder>::Item: 'static,
|
|
||||||
<U as Encoder>::Error: std::fmt::Debug,
|
|
||||||
{
|
|
||||||
fn disconnect(&mut self, error: bool) {
|
|
||||||
if let Some(ref disconnect) = self.disconnect {
|
|
||||||
(&*disconnect)(&mut *self.state.get_mut(), error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_read(&mut self) -> bool {
|
|
||||||
loop {
|
|
||||||
match self.service.poll_ready() {
|
|
||||||
Ok(Async::Ready(_)) => {
|
|
||||||
let item = match self.framed.poll() {
|
|
||||||
Ok(Async::Ready(Some(el))) => el,
|
|
||||||
Err(err) => {
|
|
||||||
self.dispatch_state =
|
|
||||||
FramedState::FramedError(ServiceError::Decoder(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => return false,
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
log::trace!("Client disconnected");
|
|
||||||
self.dispatch_state = FramedState::Stopping;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut cell = self.inner.clone();
|
|
||||||
tokio_current_thread::spawn(
|
|
||||||
self.service
|
|
||||||
.call(Item::new(self.state.clone(), self.sink.clone(), item))
|
|
||||||
.then(move |item| {
|
|
||||||
let item = match item {
|
|
||||||
Ok(Some(item)) => Ok(item),
|
|
||||||
Ok(None) => return Ok(()),
|
|
||||||
Err(err) => Err(err),
|
|
||||||
};
|
|
||||||
unsafe {
|
|
||||||
let inner = cell.get_mut();
|
|
||||||
inner.buf.push_back(item);
|
|
||||||
inner.task.notify();
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => return false,
|
|
||||||
Err(err) => {
|
|
||||||
self.dispatch_state = FramedState::Error(ServiceError::Service(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// write to framed object
|
|
||||||
fn poll_write(&mut self) -> bool {
|
|
||||||
let inner = unsafe { self.inner.get_mut() };
|
|
||||||
let mut rx_done = self.rx.is_none();
|
|
||||||
let mut buf_empty = inner.buf.is_empty();
|
|
||||||
loop {
|
|
||||||
while !self.framed.is_write_buf_full() {
|
|
||||||
if !buf_empty {
|
|
||||||
match inner.buf.pop_front().unwrap() {
|
|
||||||
Ok(msg) => {
|
|
||||||
if let Err(err) = self.framed.force_send(msg) {
|
|
||||||
self.dispatch_state =
|
|
||||||
FramedState::FramedError(ServiceError::Encoder(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
buf_empty = inner.buf.is_empty();
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
self.dispatch_state =
|
|
||||||
FramedState::Error(ServiceError::Service(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !rx_done && self.rx.is_some() {
|
|
||||||
match self.rx.as_mut().unwrap().poll() {
|
|
||||||
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
|
|
||||||
if let Err(err) = self.framed.force_send(msg) {
|
|
||||||
self.dispatch_state =
|
|
||||||
FramedState::FramedError(ServiceError::Encoder(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(FramedMessage::Close))) => {
|
|
||||||
self.dispatch_state.stop(None);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(FramedMessage::WaitClose(tx)))) => {
|
|
||||||
self.dispatch_state.stop(Some(tx));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) => {
|
|
||||||
rx_done = true;
|
|
||||||
let _ = self.rx.take();
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => rx_done = true,
|
|
||||||
Err(_e) => {
|
|
||||||
rx_done = true;
|
|
||||||
let _ = self.rx.take();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if rx_done && buf_empty {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.framed.is_write_buf_empty() {
|
|
||||||
match self.framed.poll_complete() {
|
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
Err(err) => {
|
|
||||||
debug!("Error sending data: {:?}", err);
|
|
||||||
self.dispatch_state =
|
|
||||||
FramedState::FramedError(ServiceError::Encoder(err));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(_)) => (),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<St, S, T, U> Future for FramedDispatcher<St, S, T, U>
|
impl<St, S, T, U> Future for FramedDispatcher<St, S, T, U>
|
||||||
|
@ -265,63 +128,266 @@ where
|
||||||
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
||||||
S::Error: 'static,
|
S::Error: 'static,
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
T: AsyncRead + AsyncWrite,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
U: Decoder + Encoder,
|
U: Decoder + Encoder + Unpin,
|
||||||
<U as Encoder>::Item: 'static,
|
<U as Encoder>::Item: 'static,
|
||||||
<U as Encoder>::Error: std::fmt::Debug,
|
<U as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Output = Result<(), ServiceError<S::Error, U>>;
|
||||||
type Error = ServiceError<S::Error, U>;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
unsafe { self.inner.get_ref().task.register() };
|
unsafe { self.inner.get_ref().task.register(cx.waker()) };
|
||||||
|
|
||||||
match mem::replace(&mut self.dispatch_state, FramedState::Processing) {
|
let this = self.project();
|
||||||
FramedState::Processing => {
|
poll(
|
||||||
if self.poll_read() || self.poll_write() {
|
cx,
|
||||||
self.poll()
|
this.service,
|
||||||
} else {
|
this.state,
|
||||||
Ok(Async::NotReady)
|
this.sink,
|
||||||
}
|
this.framed,
|
||||||
|
this.dispatch_state,
|
||||||
|
this.rx,
|
||||||
|
this.inner,
|
||||||
|
this.disconnect,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll<St, S, T, U>(
|
||||||
|
cx: &mut Context,
|
||||||
|
srv: &mut S,
|
||||||
|
state: &mut State<St>,
|
||||||
|
sink: &mut Sink<<U as Encoder>::Item>,
|
||||||
|
framed: &mut Framed<T, U>,
|
||||||
|
dispatch_state: &mut FramedState<S, U>,
|
||||||
|
rx: &mut Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
|
||||||
|
inner: &mut Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
|
||||||
|
disconnect: &mut Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||||
|
) -> Poll<Result<(), ServiceError<S::Error, U>>>
|
||||||
|
where
|
||||||
|
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
||||||
|
S::Error: 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
|
U: Decoder + Encoder + Unpin,
|
||||||
|
<U as Encoder>::Item: 'static,
|
||||||
|
<U as Encoder>::Error: std::fmt::Debug,
|
||||||
|
{
|
||||||
|
match mem::replace(dispatch_state, FramedState::Processing) {
|
||||||
|
FramedState::Processing => {
|
||||||
|
if poll_read(cx, srv, state, sink, framed, dispatch_state, inner)
|
||||||
|
|| poll_write(cx, framed, dispatch_state, rx, inner)
|
||||||
|
{
|
||||||
|
poll(
|
||||||
|
cx,
|
||||||
|
srv,
|
||||||
|
state,
|
||||||
|
sink,
|
||||||
|
framed,
|
||||||
|
dispatch_state,
|
||||||
|
rx,
|
||||||
|
inner,
|
||||||
|
disconnect,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
FramedState::Error(err) => {
|
}
|
||||||
if self.framed.is_write_buf_empty()
|
FramedState::Error(err) => {
|
||||||
|| (self.poll_write() || self.framed.is_write_buf_empty())
|
if framed.is_write_buf_empty()
|
||||||
{
|
|| (poll_write(cx, framed, dispatch_state, rx, inner)
|
||||||
self.disconnect(true);
|
|| framed.is_write_buf_empty())
|
||||||
Err(err)
|
{
|
||||||
} else {
|
if let Some(ref disconnect) = disconnect {
|
||||||
self.dispatch_state = FramedState::Error(err);
|
(&*disconnect)(&mut *state.get_mut(), true);
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
}
|
||||||
|
Poll::Ready(Err(err))
|
||||||
|
} else {
|
||||||
|
*dispatch_state = FramedState::Error(err);
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
FramedState::FlushAndStop(mut vec) => {
|
}
|
||||||
if !self.framed.is_write_buf_empty() {
|
FramedState::FlushAndStop(mut vec) => {
|
||||||
match self.framed.poll_complete() {
|
if !framed.is_write_buf_empty() {
|
||||||
Err(err) => {
|
match Pin::new(framed).poll_flush(cx) {
|
||||||
debug!("Error sending data: {:?}", err);
|
Poll::Ready(Err(err)) => {
|
||||||
}
|
debug!("Error sending data: {:?}", err);
|
||||||
Ok(Async::NotReady) => {
|
}
|
||||||
self.dispatch_state = FramedState::FlushAndStop(vec);
|
Poll::Pending => {
|
||||||
return Ok(Async::NotReady);
|
*dispatch_state = FramedState::FlushAndStop(vec);
|
||||||
}
|
return Poll::Pending;
|
||||||
Ok(Async::Ready(_)) => (),
|
}
|
||||||
|
Poll::Ready(_) => (),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for tx in vec.drain(..) {
|
||||||
|
let _ = tx.send(());
|
||||||
|
}
|
||||||
|
if let Some(ref disconnect) = disconnect {
|
||||||
|
(&*disconnect)(&mut *state.get_mut(), false);
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
FramedState::FramedError(err) => {
|
||||||
|
if let Some(ref disconnect) = disconnect {
|
||||||
|
(&*disconnect)(&mut *state.get_mut(), true);
|
||||||
|
}
|
||||||
|
Poll::Ready(Err(err))
|
||||||
|
}
|
||||||
|
FramedState::Stopping => {
|
||||||
|
if let Some(ref disconnect) = disconnect {
|
||||||
|
(&*disconnect)(&mut *state.get_mut(), false);
|
||||||
|
}
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_read<St, S, T, U>(
|
||||||
|
cx: &mut Context,
|
||||||
|
srv: &mut S,
|
||||||
|
state: &mut State<St>,
|
||||||
|
sink: &mut Sink<<U as Encoder>::Item>,
|
||||||
|
framed: &mut Framed<T, U>,
|
||||||
|
dispatch_state: &mut FramedState<S, U>,
|
||||||
|
inner: &mut Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
|
||||||
|
) -> bool
|
||||||
|
where
|
||||||
|
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
||||||
|
S::Error: 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
|
U: Decoder + Encoder + Unpin,
|
||||||
|
<U as Encoder>::Item: 'static,
|
||||||
|
<U as Encoder>::Error: std::fmt::Debug,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
match srv.poll_ready(cx) {
|
||||||
|
Poll::Ready(Ok(_)) => {
|
||||||
|
let item = match framed.next_item(cx) {
|
||||||
|
Poll::Ready(Some(Ok(el))) => el,
|
||||||
|
Poll::Ready(Some(Err(err))) => {
|
||||||
|
*dispatch_state = FramedState::FramedError(ServiceError::Decoder(err));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Poll::Pending => return false,
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
log::trace!("Client disconnected");
|
||||||
|
*dispatch_state = FramedState::Stopping;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
for tx in vec.drain(..) {
|
|
||||||
let _ = tx.send(());
|
let mut cell = inner.clone();
|
||||||
}
|
tokio_executor::current_thread::spawn(
|
||||||
self.disconnect(false);
|
srv.call(Item::new(state.clone(), sink.clone(), item))
|
||||||
Ok(Async::Ready(()))
|
.then(move |item| {
|
||||||
|
let item = match item {
|
||||||
|
Ok(Some(item)) => Ok(item),
|
||||||
|
Ok(None) => return ready(()),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
};
|
||||||
|
unsafe {
|
||||||
|
let inner = cell.get_mut();
|
||||||
|
inner.buf.push_back(item);
|
||||||
|
inner.task.wake();
|
||||||
|
}
|
||||||
|
ready(())
|
||||||
|
}),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
FramedState::FramedError(err) => {
|
Poll::Pending => return false,
|
||||||
self.disconnect(true);
|
Poll::Ready(Err(err)) => {
|
||||||
Err(err)
|
*dispatch_state = FramedState::Error(ServiceError::Service(err));
|
||||||
}
|
return true;
|
||||||
FramedState::Stopping => {
|
|
||||||
self.disconnect(false);
|
|
||||||
Ok(Async::Ready(()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// write to framed object
|
||||||
|
fn poll_write<St, S, T, U>(
|
||||||
|
cx: &mut Context,
|
||||||
|
framed: &mut Framed<T, U>,
|
||||||
|
dispatch_state: &mut FramedState<S, U>,
|
||||||
|
rx: &mut Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
|
||||||
|
inner: &mut Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
|
||||||
|
) -> bool
|
||||||
|
where
|
||||||
|
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
|
||||||
|
S::Error: 'static,
|
||||||
|
S::Future: 'static,
|
||||||
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
|
U: Decoder + Encoder + Unpin,
|
||||||
|
<U as Encoder>::Item: 'static,
|
||||||
|
<U as Encoder>::Error: std::fmt::Debug,
|
||||||
|
{
|
||||||
|
let inner = unsafe { inner.get_mut() };
|
||||||
|
let mut rx_done = rx.is_none();
|
||||||
|
let mut buf_empty = inner.buf.is_empty();
|
||||||
|
loop {
|
||||||
|
while !framed.is_write_buf_full() {
|
||||||
|
if !buf_empty {
|
||||||
|
match inner.buf.pop_front().unwrap() {
|
||||||
|
Ok(msg) => {
|
||||||
|
if let Err(err) = framed.force_send(msg) {
|
||||||
|
*dispatch_state =
|
||||||
|
FramedState::FramedError(ServiceError::Encoder(err));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
buf_empty = inner.buf.is_empty();
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
*dispatch_state = FramedState::Error(ServiceError::Service(err));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rx_done && rx.is_some() {
|
||||||
|
match Pin::new(rx.as_mut().unwrap()).poll_next(cx) {
|
||||||
|
Poll::Ready(Some(FramedMessage::Message(msg))) => {
|
||||||
|
if let Err(err) = framed.force_send(msg) {
|
||||||
|
*dispatch_state =
|
||||||
|
FramedState::FramedError(ServiceError::Encoder(err));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(FramedMessage::Close)) => {
|
||||||
|
dispatch_state.stop(None);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(FramedMessage::WaitClose(tx))) => {
|
||||||
|
dispatch_state.stop(Some(tx));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
rx_done = true;
|
||||||
|
let _ = rx.take();
|
||||||
|
}
|
||||||
|
Poll::Pending => rx_done = true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if rx_done && buf_empty {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !framed.is_write_buf_empty() {
|
||||||
|
match framed.flush(cx) {
|
||||||
|
Poll::Pending => break,
|
||||||
|
Poll::Ready(Err(err)) => {
|
||||||
|
debug!("Error sending data: {:?}", err);
|
||||||
|
*dispatch_state = FramedState::FramedError(ServiceError::Encoder(err));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Poll::Ready(_) => (),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
|
@ -1,9 +1,14 @@
|
||||||
|
use std::future::Future;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
|
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder};
|
||||||
use actix_service::{IntoNewService, IntoService, NewService, Service};
|
use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory};
|
||||||
use futures::{Async, Future, Poll};
|
use either::Either;
|
||||||
|
use futures::future::{FutureExt, LocalBoxFuture};
|
||||||
|
use pin_project::{pin_project, project};
|
||||||
|
|
||||||
use crate::connect::{Connect, ConnectResult};
|
use crate::connect::{Connect, ConnectResult};
|
||||||
use crate::dispatcher::FramedDispatcher;
|
use crate::dispatcher::FramedDispatcher;
|
||||||
|
@ -27,9 +32,9 @@ impl<St, Codec> Builder<St, Codec> {
|
||||||
pub fn service<Io, C, F>(self, connect: F) -> ServiceBuilder<St, C, Io, Codec>
|
pub fn service<Io, C, F>(self, connect: F) -> ServiceBuilder<St, C, Io, Codec>
|
||||||
where
|
where
|
||||||
F: IntoService<C>,
|
F: IntoService<C>,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
Codec: Decoder + Encoder,
|
Codec: Decoder + Encoder + Unpin,
|
||||||
{
|
{
|
||||||
ServiceBuilder {
|
ServiceBuilder {
|
||||||
connect: connect.into_service(),
|
connect: connect.into_service(),
|
||||||
|
@ -41,19 +46,19 @@ impl<St, Codec> Builder<St, Codec> {
|
||||||
/// Construct framed handler new service with specified connect service
|
/// Construct framed handler new service with specified connect service
|
||||||
pub fn factory<Io, C, F>(self, connect: F) -> NewServiceBuilder<St, C, Io, Codec>
|
pub fn factory<Io, C, F>(self, connect: F) -> NewServiceBuilder<St, C, Io, Codec>
|
||||||
where
|
where
|
||||||
F: IntoNewService<C>,
|
F: IntoServiceFactory<C>,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
C: NewService<
|
C: ServiceFactory<
|
||||||
Config = (),
|
Config = (),
|
||||||
Request = Connect<Io>,
|
Request = Connect<Io>,
|
||||||
Response = ConnectResult<Io, St, Codec>,
|
Response = ConnectResult<Io, St, Codec>,
|
||||||
>,
|
>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
C::Future: 'static,
|
C::Future: 'static,
|
||||||
Codec: Decoder + Encoder,
|
Codec: Decoder + Encoder + Unpin,
|
||||||
{
|
{
|
||||||
NewServiceBuilder {
|
NewServiceBuilder {
|
||||||
connect: connect.into_new_service(),
|
connect: connect.into_factory(),
|
||||||
disconnect: None,
|
disconnect: None,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
|
@ -69,10 +74,10 @@ pub struct ServiceBuilder<St, C, Io, Codec> {
|
||||||
impl<St, C, Io, Codec> ServiceBuilder<St, C, Io, Codec>
|
impl<St, C, Io, Codec> ServiceBuilder<St, C, Io, Codec>
|
||||||
where
|
where
|
||||||
St: 'static,
|
St: 'static,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
Codec: Decoder + Encoder,
|
Codec: Decoder + Encoder + Unpin,
|
||||||
<Codec as Encoder>::Item: 'static,
|
<Codec as Encoder>::Item: 'static,
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -93,8 +98,8 @@ where
|
||||||
service: F,
|
service: F,
|
||||||
) -> impl Service<Request = Io, Response = (), Error = ServiceError<C::Error, Codec>>
|
) -> impl Service<Request = Io, Response = (), Error = ServiceError<C::Error, Codec>>
|
||||||
where
|
where
|
||||||
F: IntoNewService<T>,
|
F: IntoServiceFactory<T>,
|
||||||
T: NewService<
|
T: ServiceFactory<
|
||||||
Config = St,
|
Config = St,
|
||||||
Request = RequestItem<St, Codec>,
|
Request = RequestItem<St, Codec>,
|
||||||
Response = ResponseItem<Codec>,
|
Response = ResponseItem<Codec>,
|
||||||
|
@ -104,7 +109,7 @@ where
|
||||||
{
|
{
|
||||||
FramedServiceImpl {
|
FramedServiceImpl {
|
||||||
connect: self.connect,
|
connect: self.connect,
|
||||||
handler: Rc::new(service.into_new_service()),
|
handler: Rc::new(service.into_factory()),
|
||||||
disconnect: self.disconnect.clone(),
|
disconnect: self.disconnect.clone(),
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
|
@ -120,11 +125,15 @@ pub struct NewServiceBuilder<St, C, Io, Codec> {
|
||||||
impl<St, C, Io, Codec> NewServiceBuilder<St, C, Io, Codec>
|
impl<St, C, Io, Codec> NewServiceBuilder<St, C, Io, Codec>
|
||||||
where
|
where
|
||||||
St: 'static,
|
St: 'static,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
C: NewService<Config = (), Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: ServiceFactory<
|
||||||
|
Config = (),
|
||||||
|
Request = Connect<Io>,
|
||||||
|
Response = ConnectResult<Io, St, Codec>,
|
||||||
|
>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
C::Future: 'static,
|
C::Future: 'static,
|
||||||
Codec: Decoder + Encoder,
|
Codec: Decoder + Encoder + Unpin,
|
||||||
<Codec as Encoder>::Item: 'static,
|
<Codec as Encoder>::Item: 'static,
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -142,15 +151,15 @@ where
|
||||||
pub fn finish<F, T, Cfg>(
|
pub fn finish<F, T, Cfg>(
|
||||||
self,
|
self,
|
||||||
service: F,
|
service: F,
|
||||||
) -> impl NewService<
|
) -> impl ServiceFactory<
|
||||||
Config = Cfg,
|
Config = Cfg,
|
||||||
Request = Io,
|
Request = Io,
|
||||||
Response = (),
|
Response = (),
|
||||||
Error = ServiceError<C::Error, Codec>,
|
Error = ServiceError<C::Error, Codec>,
|
||||||
>
|
>
|
||||||
where
|
where
|
||||||
F: IntoNewService<T>,
|
F: IntoServiceFactory<T>,
|
||||||
T: NewService<
|
T: ServiceFactory<
|
||||||
Config = St,
|
Config = St,
|
||||||
Request = RequestItem<St, Codec>,
|
Request = RequestItem<St, Codec>,
|
||||||
Response = ResponseItem<Codec>,
|
Response = ResponseItem<Codec>,
|
||||||
|
@ -160,7 +169,7 @@ where
|
||||||
{
|
{
|
||||||
FramedService {
|
FramedService {
|
||||||
connect: self.connect,
|
connect: self.connect,
|
||||||
handler: Rc::new(service.into_new_service()),
|
handler: Rc::new(service.into_factory()),
|
||||||
disconnect: self.disconnect,
|
disconnect: self.disconnect,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
|
@ -174,21 +183,25 @@ pub(crate) struct FramedService<St, C, T, Io, Codec, Cfg> {
|
||||||
_t: PhantomData<(St, Io, Codec, Cfg)>,
|
_t: PhantomData<(St, Io, Codec, Cfg)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<St, C, T, Io, Codec, Cfg> NewService for FramedService<St, C, T, Io, Codec, Cfg>
|
impl<St, C, T, Io, Codec, Cfg> ServiceFactory for FramedService<St, C, T, Io, Codec, Cfg>
|
||||||
where
|
where
|
||||||
St: 'static,
|
St: 'static,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
C: NewService<Config = (), Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: ServiceFactory<
|
||||||
|
Config = (),
|
||||||
|
Request = Connect<Io>,
|
||||||
|
Response = ConnectResult<Io, St, Codec>,
|
||||||
|
>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
C::Future: 'static,
|
C::Future: 'static,
|
||||||
T: NewService<
|
T: ServiceFactory<
|
||||||
Config = St,
|
Config = St,
|
||||||
Request = RequestItem<St, Codec>,
|
Request = RequestItem<St, Codec>,
|
||||||
Response = ResponseItem<Codec>,
|
Response = ResponseItem<Codec>,
|
||||||
Error = C::Error,
|
Error = C::Error,
|
||||||
InitError = C::Error,
|
InitError = C::Error,
|
||||||
> + 'static,
|
> + 'static,
|
||||||
Codec: Decoder + Encoder,
|
Codec: Decoder + Encoder + Unpin,
|
||||||
<Codec as Encoder>::Item: 'static,
|
<Codec as Encoder>::Item: 'static,
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -198,23 +211,24 @@ where
|
||||||
type Error = ServiceError<C::Error, Codec>;
|
type Error = ServiceError<C::Error, Codec>;
|
||||||
type InitError = C::InitError;
|
type InitError = C::InitError;
|
||||||
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
|
type Service = FramedServiceImpl<St, C::Service, T, Io, Codec>;
|
||||||
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||||
|
|
||||||
fn new_service(&self, _: &Cfg) -> Self::Future {
|
fn new_service(&self, _: &Cfg) -> Self::Future {
|
||||||
let handler = self.handler.clone();
|
let handler = self.handler.clone();
|
||||||
let disconnect = self.disconnect.clone();
|
let disconnect = self.disconnect.clone();
|
||||||
|
|
||||||
// create connect service and then create service impl
|
// create connect service and then create service impl
|
||||||
Box::new(
|
self.connect
|
||||||
self.connect
|
.new_service(&())
|
||||||
.new_service(&())
|
.map(move |result| {
|
||||||
.map(move |connect| FramedServiceImpl {
|
result.map(move |connect| FramedServiceImpl {
|
||||||
connect,
|
connect,
|
||||||
handler,
|
handler,
|
||||||
disconnect,
|
disconnect,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}),
|
})
|
||||||
)
|
})
|
||||||
|
.boxed_local()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,18 +241,18 @@ pub struct FramedServiceImpl<St, C, T, Io, Codec> {
|
||||||
|
|
||||||
impl<St, C, T, Io, Codec> Service for FramedServiceImpl<St, C, T, Io, Codec>
|
impl<St, C, T, Io, Codec> Service for FramedServiceImpl<St, C, T, Io, Codec>
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
T: NewService<
|
T: ServiceFactory<
|
||||||
Config = St,
|
Config = St,
|
||||||
Request = RequestItem<St, Codec>,
|
Request = RequestItem<St, Codec>,
|
||||||
Response = ResponseItem<Codec>,
|
Response = ResponseItem<Codec>,
|
||||||
Error = C::Error,
|
Error = C::Error,
|
||||||
InitError = C::Error,
|
InitError = C::Error,
|
||||||
>,
|
>,
|
||||||
<<T as NewService>::Service as Service>::Future: 'static,
|
<<T as ServiceFactory>::Service as Service>::Future: 'static,
|
||||||
Codec: Decoder + Encoder,
|
Codec: Decoder + Encoder + Unpin,
|
||||||
<Codec as Encoder>::Item: 'static,
|
<Codec as Encoder>::Item: 'static,
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
|
@ -247,8 +261,8 @@ where
|
||||||
type Error = ServiceError<C::Error, Codec>;
|
type Error = ServiceError<C::Error, Codec>;
|
||||||
type Future = FramedServiceImplResponse<St, Io, Codec, C, T>;
|
type Future = FramedServiceImplResponse<St, Io, Codec, C, T>;
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||||
self.connect.poll_ready().map_err(|e| e.into())
|
self.connect.poll_ready(cx).map_err(|e| e.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Io) -> Self::Future {
|
fn call(&mut self, req: Io) -> Self::Future {
|
||||||
|
@ -256,108 +270,155 @@ where
|
||||||
inner: FramedServiceImplResponseInner::Connect(
|
inner: FramedServiceImplResponseInner::Connect(
|
||||||
self.connect.call(Connect::new(req)),
|
self.connect.call(Connect::new(req)),
|
||||||
self.handler.clone(),
|
self.handler.clone(),
|
||||||
|
self.disconnect.clone(),
|
||||||
),
|
),
|
||||||
disconnect: self.disconnect.clone(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[pin_project]
|
||||||
pub struct FramedServiceImplResponse<St, Io, Codec, C, T>
|
pub struct FramedServiceImplResponse<St, Io, Codec, C, T>
|
||||||
where
|
where
|
||||||
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
T: NewService<
|
T: ServiceFactory<
|
||||||
Config = St,
|
Config = St,
|
||||||
Request = RequestItem<St, Codec>,
|
Request = RequestItem<St, Codec>,
|
||||||
Response = ResponseItem<Codec>,
|
Response = ResponseItem<Codec>,
|
||||||
Error = C::Error,
|
Error = C::Error,
|
||||||
InitError = C::Error,
|
InitError = C::Error,
|
||||||
>,
|
>,
|
||||||
<<T as NewService>::Service as Service>::Future: 'static,
|
<<T as ServiceFactory>::Service as Service>::Future: 'static,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
Codec: Encoder + Decoder,
|
Codec: Encoder + Decoder + Unpin,
|
||||||
<Codec as Encoder>::Item: 'static,
|
<Codec as Encoder>::Item: 'static,
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
|
inner: FramedServiceImplResponseInner<St, Io, Codec, C, T>,
|
||||||
disconnect: Option<Rc<dyn Fn(&mut St, bool)>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
|
|
||||||
where
|
|
||||||
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
|
||||||
C::Error: 'static,
|
|
||||||
T: NewService<
|
|
||||||
Config = St,
|
|
||||||
Request = RequestItem<St, Codec>,
|
|
||||||
Response = ResponseItem<Codec>,
|
|
||||||
Error = C::Error,
|
|
||||||
InitError = C::Error,
|
|
||||||
>,
|
|
||||||
<<T as NewService>::Service as Service>::Future: 'static,
|
|
||||||
Io: AsyncRead + AsyncWrite,
|
|
||||||
Codec: Encoder + Decoder,
|
|
||||||
<Codec as Encoder>::Item: 'static,
|
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
|
||||||
{
|
|
||||||
Connect(C::Future, Rc<T>),
|
|
||||||
Handler(T::Future, Option<ConnectResult<Io, St, Codec>>),
|
|
||||||
Dispatcher(FramedDispatcher<St, T::Service, Io, Codec>),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
|
impl<St, Io, Codec, C, T> Future for FramedServiceImplResponse<St, Io, Codec, C, T>
|
||||||
where
|
where
|
||||||
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
C::Error: 'static,
|
C::Error: 'static,
|
||||||
T: NewService<
|
T: ServiceFactory<
|
||||||
Config = St,
|
Config = St,
|
||||||
Request = RequestItem<St, Codec>,
|
Request = RequestItem<St, Codec>,
|
||||||
Response = ResponseItem<Codec>,
|
Response = ResponseItem<Codec>,
|
||||||
Error = C::Error,
|
Error = C::Error,
|
||||||
InitError = C::Error,
|
InitError = C::Error,
|
||||||
>,
|
>,
|
||||||
<<T as NewService>::Service as Service>::Future: 'static,
|
<<T as ServiceFactory>::Service as Service>::Future: 'static,
|
||||||
Io: AsyncRead + AsyncWrite,
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
Codec: Encoder + Decoder,
|
Codec: Encoder + Decoder + Unpin,
|
||||||
<Codec as Encoder>::Item: 'static,
|
<Codec as Encoder>::Item: 'static,
|
||||||
<Codec as Encoder>::Error: std::fmt::Debug,
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Output = Result<(), ServiceError<C::Error, Codec>>;
|
||||||
type Error = ServiceError<C::Error, Codec>;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
match self.inner {
|
let this = self.get_mut();
|
||||||
FramedServiceImplResponseInner::Connect(ref mut fut, ref handler) => {
|
|
||||||
match fut.poll()? {
|
loop {
|
||||||
Async::Ready(res) => {
|
match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) {
|
||||||
self.inner = FramedServiceImplResponseInner::Handler(
|
Either::Left(new) => this.inner = new,
|
||||||
handler.new_service(&res.state),
|
Either::Right(poll) => return poll,
|
||||||
Some(res),
|
};
|
||||||
);
|
}
|
||||||
self.poll()
|
}
|
||||||
}
|
}
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
|
||||||
}
|
#[pin_project]
|
||||||
}
|
enum FramedServiceImplResponseInner<St, Io, Codec, C, T>
|
||||||
FramedServiceImplResponseInner::Handler(ref mut fut, ref mut res) => {
|
where
|
||||||
match fut.poll()? {
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
Async::Ready(handler) => {
|
C::Error: 'static,
|
||||||
let res = res.take().unwrap();
|
T: ServiceFactory<
|
||||||
self.inner =
|
Config = St,
|
||||||
FramedServiceImplResponseInner::Dispatcher(FramedDispatcher::new(
|
Request = RequestItem<St, Codec>,
|
||||||
res.framed,
|
Response = ResponseItem<Codec>,
|
||||||
State::new(res.state),
|
Error = C::Error,
|
||||||
handler,
|
InitError = C::Error,
|
||||||
res.rx,
|
>,
|
||||||
res.sink,
|
<<T as ServiceFactory>::Service as Service>::Future: 'static,
|
||||||
self.disconnect.clone(),
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
));
|
Codec: Encoder + Decoder + Unpin,
|
||||||
self.poll()
|
<Codec as Encoder>::Item: 'static,
|
||||||
}
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
Async::NotReady => Ok(Async::NotReady),
|
{
|
||||||
}
|
Connect(#[pin] C::Future, Rc<T>, Option<Rc<dyn Fn(&mut St, bool)>>),
|
||||||
}
|
Handler(
|
||||||
FramedServiceImplResponseInner::Dispatcher(ref mut fut) => fut.poll(),
|
#[pin] T::Future,
|
||||||
|
Option<ConnectResult<Io, St, Codec>>,
|
||||||
|
Option<Rc<dyn Fn(&mut St, bool)>>,
|
||||||
|
),
|
||||||
|
Dispatcher(FramedDispatcher<St, T::Service, Io, Codec>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St, Io, Codec, C, T> FramedServiceImplResponseInner<St, Io, Codec, C, T>
|
||||||
|
where
|
||||||
|
C: Service<Request = Connect<Io>, Response = ConnectResult<Io, St, Codec>>,
|
||||||
|
C::Error: 'static,
|
||||||
|
T: ServiceFactory<
|
||||||
|
Config = St,
|
||||||
|
Request = RequestItem<St, Codec>,
|
||||||
|
Response = ResponseItem<Codec>,
|
||||||
|
Error = C::Error,
|
||||||
|
InitError = C::Error,
|
||||||
|
>,
|
||||||
|
<<T as ServiceFactory>::Service as Service>::Future: 'static,
|
||||||
|
Io: AsyncRead + AsyncWrite + Unpin,
|
||||||
|
Codec: Encoder + Decoder + Unpin,
|
||||||
|
<Codec as Encoder>::Item: 'static,
|
||||||
|
<Codec as Encoder>::Error: std::fmt::Debug,
|
||||||
|
{
|
||||||
|
#[project]
|
||||||
|
fn poll(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context,
|
||||||
|
) -> Either<
|
||||||
|
FramedServiceImplResponseInner<St, Io, Codec, C, T>,
|
||||||
|
Poll<Result<(), ServiceError<C::Error, Codec>>>,
|
||||||
|
> {
|
||||||
|
#[project]
|
||||||
|
match self.project() {
|
||||||
|
FramedServiceImplResponseInner::Connect(
|
||||||
|
ref mut fut,
|
||||||
|
ref handler,
|
||||||
|
ref mut disconnect,
|
||||||
|
) => match Pin::new(fut).poll(cx) {
|
||||||
|
Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler(
|
||||||
|
handler.new_service(&res.state),
|
||||||
|
Some(res),
|
||||||
|
disconnect.take(),
|
||||||
|
)),
|
||||||
|
Poll::Pending => Either::Right(Poll::Pending),
|
||||||
|
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
|
||||||
|
},
|
||||||
|
FramedServiceImplResponseInner::Handler(
|
||||||
|
ref mut fut,
|
||||||
|
ref mut res,
|
||||||
|
ref mut disconnect,
|
||||||
|
) => match Pin::new(fut).poll(cx) {
|
||||||
|
Poll::Ready(Ok(handler)) => {
|
||||||
|
let res = res.take().unwrap();
|
||||||
|
Either::Left(FramedServiceImplResponseInner::Dispatcher(
|
||||||
|
FramedDispatcher::new(
|
||||||
|
res.framed,
|
||||||
|
State::new(res.state),
|
||||||
|
handler,
|
||||||
|
res.rx,
|
||||||
|
res.sink,
|
||||||
|
disconnect.take(),
|
||||||
|
),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Poll::Pending => Either::Right(Poll::Pending),
|
||||||
|
Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))),
|
||||||
|
},
|
||||||
|
FramedServiceImplResponseInner::Dispatcher(ref mut fut) => {
|
||||||
|
Either::Right(Pin::new(fut).poll(cx))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
use futures::unsync::{mpsc, oneshot};
|
use actix_utils::{mpsc, oneshot};
|
||||||
use futures::Future;
|
use futures::future::{Future, FutureExt};
|
||||||
|
|
||||||
use crate::dispatcher::FramedMessage;
|
use crate::dispatcher::FramedMessage;
|
||||||
|
|
||||||
pub struct Sink<T>(mpsc::UnboundedSender<FramedMessage<T>>);
|
pub struct Sink<T>(mpsc::Sender<FramedMessage<T>>);
|
||||||
|
|
||||||
impl<T> Clone for Sink<T> {
|
impl<T> Clone for Sink<T> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
|
@ -14,26 +14,26 @@ impl<T> Clone for Sink<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Sink<T> {
|
impl<T> Sink<T> {
|
||||||
pub(crate) fn new(tx: mpsc::UnboundedSender<FramedMessage<T>>) -> Self {
|
pub(crate) fn new(tx: mpsc::Sender<FramedMessage<T>>) -> Self {
|
||||||
Sink(tx)
|
Sink(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Close connection
|
/// Close connection
|
||||||
pub fn close(&self) {
|
pub fn close(&self) {
|
||||||
let _ = self.0.unbounded_send(FramedMessage::Close);
|
let _ = self.0.send(FramedMessage::Close);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Close connection
|
/// Close connection
|
||||||
pub fn wait_close(&self) -> impl Future<Item = (), Error = ()> {
|
pub fn wait_close(&self) -> impl Future<Output = ()> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let _ = self.0.unbounded_send(FramedMessage::WaitClose(tx));
|
let _ = self.0.send(FramedMessage::WaitClose(tx));
|
||||||
|
|
||||||
rx.map_err(|_| ())
|
rx.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send item
|
/// Send item
|
||||||
pub fn send(&self, item: T) {
|
pub fn send(&self, item: T) {
|
||||||
let _ = self.0.unbounded_send(FramedMessage::Message(item));
|
let _ = self.0.send(FramedMessage::Message(item));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,11 +4,11 @@ use std::time::Duration;
|
||||||
|
|
||||||
use actix_codec::BytesCodec;
|
use actix_codec::BytesCodec;
|
||||||
use actix_server_config::Io;
|
use actix_server_config::Io;
|
||||||
use actix_service::{new_apply_fn, Service};
|
use actix_service::{apply_fn_factory, service_fn, Service};
|
||||||
use actix_testing::{self as test, TestServer};
|
use actix_testing::{self as test, TestServer};
|
||||||
use futures::Future;
|
use futures::future::ok;
|
||||||
use tokio_tcp::TcpStream;
|
use tokio_net::tcp::TcpStream;
|
||||||
use tokio_timer::sleep;
|
use tokio_timer::delay_for;
|
||||||
|
|
||||||
use actix_ioframe::{Builder, Connect};
|
use actix_ioframe::{Builder, Connect};
|
||||||
|
|
||||||
|
@ -22,13 +22,15 @@ fn test_disconnect() -> std::io::Result<()> {
|
||||||
let srv = TestServer::with(move || {
|
let srv = TestServer::with(move || {
|
||||||
let disconnect1 = disconnect1.clone();
|
let disconnect1 = disconnect1.clone();
|
||||||
|
|
||||||
new_apply_fn(
|
apply_fn_factory(
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.factory(|conn: Connect<_>| Ok(conn.codec(BytesCodec).state(State)))
|
.factory(service_fn(|conn: Connect<_>| {
|
||||||
|
ok(conn.codec(BytesCodec).state(State))
|
||||||
|
}))
|
||||||
.disconnect(move |_, _| {
|
.disconnect(move |_, _| {
|
||||||
disconnect1.store(true, Ordering::Relaxed);
|
disconnect1.store(true, Ordering::Relaxed);
|
||||||
})
|
})
|
||||||
.finish(|_t| Ok(None)),
|
.finish(service_fn(|_t| ok(None))),
|
||||||
|io: Io<TcpStream>, srv| srv.call(io.into_parts().0),
|
|io: Io<TcpStream>, srv| srv.call(io.into_parts().0),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
@ -37,9 +39,9 @@ fn test_disconnect() -> std::io::Result<()> {
|
||||||
.service(|conn: Connect<_>| {
|
.service(|conn: Connect<_>| {
|
||||||
let conn = conn.codec(BytesCodec).state(State);
|
let conn = conn.codec(BytesCodec).state(State);
|
||||||
conn.sink().close();
|
conn.sink().close();
|
||||||
Ok(conn)
|
ok(conn)
|
||||||
})
|
})
|
||||||
.finish(|_t| Ok(None));
|
.finish(service_fn(|_t| ok(None)));
|
||||||
|
|
||||||
let conn = test::block_on(
|
let conn = test::block_on(
|
||||||
actix_connect::default_connector()
|
actix_connect::default_connector()
|
||||||
|
@ -48,11 +50,7 @@ fn test_disconnect() -> std::io::Result<()> {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
test::block_on(client.call(conn.into_parts().0)).unwrap();
|
test::block_on(client.call(conn.into_parts().0)).unwrap();
|
||||||
let _ = test::block_on(
|
let _ = test::block_on(delay_for(Duration::from_millis(100)));
|
||||||
sleep(Duration::from_millis(100))
|
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|_| ()),
|
|
||||||
);
|
|
||||||
assert!(disconnect.load(Ordering::Relaxed));
|
assert!(disconnect.load(Ordering::Relaxed));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -24,7 +24,13 @@ where
|
||||||
pub fn apply_fn_factory<T, F, R, In, Out, Err, U>(
|
pub fn apply_fn_factory<T, F, R, In, Out, Err, U>(
|
||||||
service: U,
|
service: U,
|
||||||
f: F,
|
f: F,
|
||||||
) -> impl ServiceFactory<Request = In, Response = Out, Error = Err>
|
) -> impl ServiceFactory<
|
||||||
|
Config = T::Config,
|
||||||
|
Request = In,
|
||||||
|
Response = Out,
|
||||||
|
Error = Err,
|
||||||
|
InitError = T::InitError,
|
||||||
|
>
|
||||||
where
|
where
|
||||||
T: ServiceFactory<Error = Err>,
|
T: ServiceFactory<Error = Err>,
|
||||||
F: FnMut(In, &mut T::Service) -> R + Clone,
|
F: FnMut(In, &mut T::Service) -> R + Clone,
|
||||||
|
|
|
@ -25,7 +25,7 @@ either = "1.5.2"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
pin-project = "0.4.5"
|
pin-project = "0.4.5"
|
||||||
tokio-timer = "0.3.0-alpha.6"
|
tokio-timer = "0.3.0-alpha.6"
|
||||||
tokio-executor = "=0.2.0-alpha.6"
|
tokio-executor = { version="=0.2.0-alpha.6", features=["current-thread"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
//! Framed dispatcher service and related utilities
|
//! Framed dispatcher service and related utilities
|
||||||
|
#![allow(type_alias_bounds)]
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
@ -265,7 +266,7 @@ where
|
||||||
loop {
|
loop {
|
||||||
match srv.poll_ready(cx) {
|
match srv.poll_ready(cx) {
|
||||||
Poll::Ready(Ok(_)) => {
|
Poll::Ready(Ok(_)) => {
|
||||||
let item = match framed.poll_next_item(cx) {
|
let item = match framed.next_item(cx) {
|
||||||
Poll::Ready(Some(Ok(el))) => el,
|
Poll::Ready(Some(Ok(el))) => el,
|
||||||
Poll::Ready(Some(Err(err))) => {
|
Poll::Ready(Some(Err(err))) => {
|
||||||
*state =
|
*state =
|
||||||
|
@ -365,16 +366,15 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if !framed.is_write_buf_empty() {
|
if !framed.is_write_buf_empty() {
|
||||||
// match this.framed.poll_flush(cx) {
|
match framed.flush(cx) {
|
||||||
// Poll::Pending => break,
|
Poll::Pending => break,
|
||||||
// Poll::Ready(Err(err)) => {
|
Poll::Ready(Err(err)) => {
|
||||||
// debug!("Error sending data: {:?}", err);
|
debug!("Error sending data: {:?}", err);
|
||||||
// self.state =
|
*state = TransportState::FramedError(FramedTransportError::Encoder(err));
|
||||||
// TransportState::FramedError(FramedTransportError::Encoder(err));
|
return true;
|
||||||
// return true;
|
}
|
||||||
// }
|
Poll::Ready(Ok(_)) => (),
|
||||||
// Poll::Ready(Ok(_)) => (),
|
}
|
||||||
// }
|
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue