use std::{fmt, mem}; use std::rc::Rc; use std::cell::RefCell; use futures::{Async, Future, Poll}; use futures::task::{Task as FutureTask, current as current_task}; use body::{Body, BodyStream, Binary}; use context::Frame; use h1writer::{Writer, WriterState}; use error::{Error, UnexpectedTaskFrame}; use pipeline::MiddlewaresResponse; use httprequest::HttpRequest; use httpresponse::HttpResponse; #[derive(PartialEq, Debug)] enum TaskRunningState { Paused, Running, Done, } impl TaskRunningState { fn is_done(&self) -> bool { *self == TaskRunningState::Done } fn pause(&mut self) { if *self != TaskRunningState::Done { *self = TaskRunningState::Paused } } fn resume(&mut self) { if *self != TaskRunningState::Done { *self = TaskRunningState::Running } } } enum ResponseState { Reading, Ready(HttpResponse), Middlewares(MiddlewaresResponse), Prepared(Option), } enum IOState { Response, Payload(BodyStream), Context, Done, } enum TaskStream { None, Context(Box), Response(Box>), } impl IOState { fn is_done(&self) -> bool { match *self { IOState::Done => true, _ => false } } } impl ResponseState { fn is_reading(&self) -> bool { match *self { ResponseState::Reading => true, _ => false } } } impl fmt::Debug for ResponseState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { ResponseState::Reading => write!(f, "ResponseState::Reading"), ResponseState::Ready(_) => write!(f, "ResponseState::Ready"), ResponseState::Middlewares(_) => write!(f, "ResponseState::Middlewares"), ResponseState::Prepared(_) => write!(f, "ResponseState::Prepared"), } } } impl fmt::Debug for IOState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { IOState::Response => write!(f, "IOState::Response"), IOState::Payload(_) => write!(f, "IOState::Payload"), IOState::Context => write!(f, "IOState::Context"), IOState::Done => write!(f, "IOState::Done"), } } } pub(crate) trait IoContext: 'static { fn disconnected(&mut self); fn poll(&mut self) -> Poll, Error>; } /// Future that resolves when all buffered data get sent #[doc(hidden)] #[derive(Debug)] pub struct DrainFut { drained: bool, task: Option, } impl Default for DrainFut { fn default() -> DrainFut { DrainFut { drained: false, task: None, } } } impl DrainFut { fn set(&mut self) { self.drained = true; if let Some(task) = self.task.take() { task.notify() } } } impl Future for DrainFut { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { if self.drained { Ok(Async::Ready(())) } else { self.task = Some(current_task()); Ok(Async::NotReady) } } } pub struct Task { running: TaskRunningState, response: ResponseState, iostate: IOState, stream: TaskStream, drain: Vec>>, middlewares: Option, } #[doc(hidden)] impl Default for Task { fn default() -> Task { Task { running: TaskRunningState::Running, response: ResponseState::Reading, iostate: IOState::Response, drain: Vec::new(), stream: TaskStream::None, middlewares: None } } } impl Task { pub(crate) fn from_response>(response: R) -> Task { Task { running: TaskRunningState::Running, response: ResponseState::Ready(response.into()), iostate: IOState::Response, drain: Vec::new(), stream: TaskStream::None, middlewares: None } } pub(crate) fn from_error>(err: E) -> Task { Task::from_response(err.into()) } pub fn reply>(&mut self, response: R) { let state = &mut self.response; match *state { ResponseState::Reading => *state = ResponseState::Ready(response.into()), _ => panic!("Internal task state is broken"), } } pub fn error>(&mut self, err: E) { self.reply(err.into()) } pub(crate) fn context(&mut self, ctx: Box) { self.stream = TaskStream::Context(ctx); } pub fn async(&mut self, fut: F) where F: Future + 'static { self.stream = TaskStream::Response(Box::new(fut)); } pub(crate) fn response(&mut self) -> HttpResponse { match self.response { ResponseState::Prepared(ref mut state) => state.take().unwrap(), _ => panic!("Internal state is broken"), } } pub(crate) fn set_middlewares(&mut self, middlewares: MiddlewaresResponse) { self.middlewares = Some(middlewares) } pub(crate) fn disconnected(&mut self) { if let TaskStream::Context(ref mut ctx) = self.stream { ctx.disconnected(); } } pub(crate) fn poll_io(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll where T: Writer { trace!("POLL-IO frames resp: {:?}, io: {:?}, running: {:?}", self.response, self.iostate, self.running); if self.iostate.is_done() { // response is completed return Ok(Async::Ready(self.running.is_done())); } else if self.drain.is_empty() && self.running != TaskRunningState::Paused { // if task is paused, write buffer is probably full loop { let result = match mem::replace(&mut self.iostate, IOState::Done) { IOState::Response => { match self.poll_response(req) { Ok(Async::Ready(mut resp)) => { let result = io.start(req, &mut resp)?; match resp.replace_body(Body::Empty) { Body::Streaming(stream) | Body::Upgrade(stream) => self.iostate = IOState::Payload(stream), Body::StreamingContext | Body::UpgradeContext => self.iostate = IOState::Context, _ => (), } self.response = ResponseState::Prepared(Some(resp)); result }, Ok(Async::NotReady) => { self.iostate = IOState::Response; return Ok(Async::NotReady) } Err(err) => { let mut resp = err.into(); let result = io.start(req, &mut resp)?; match resp.replace_body(Body::Empty) { Body::Streaming(stream) | Body::Upgrade(stream) => self.iostate = IOState::Payload(stream), _ => (), } self.response = ResponseState::Prepared(Some(resp)); result } } }, IOState::Payload(mut body) => { // always poll stream if self.running == TaskRunningState::Running { match self.poll()? { Async::Ready(_) => self.running = TaskRunningState::Done, Async::NotReady => (), } } match body.poll() { Ok(Async::Ready(None)) => { self.iostate = IOState::Done; io.write_eof()?; break }, Ok(Async::Ready(Some(chunk))) => { self.iostate = IOState::Payload(body); io.write(chunk.as_ref())? } Ok(Async::NotReady) => { self.iostate = IOState::Payload(body); break }, Err(err) => return Err(err), } } IOState::Context => { match self.poll_context() { Ok(Async::Ready(None)) => { self.iostate = IOState::Done; self.running = TaskRunningState::Done; io.write_eof()?; break }, Ok(Async::Ready(Some(chunk))) => { self.iostate = IOState::Context; io.write(chunk.as_ref())? } Ok(Async::NotReady) => { self.iostate = IOState::Context; break } Err(err) => return Err(err), } } IOState::Done => break, }; match result { WriterState::Pause => { self.running.pause(); break } WriterState::Done => self.running.resume(), } } } // flush io match io.poll_complete() { Ok(Async::Ready(_)) => self.running.resume(), Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { debug!("Error sending data: {}", err); return Err(err.into()) } } // drain futures if !self.drain.is_empty() { for fut in &mut self.drain { fut.borrow_mut().set() } self.drain.clear(); } // response is completed if self.iostate.is_done() { if let ResponseState::Prepared(Some(ref mut resp)) = self.response { resp.set_response_size(io.written()) } Ok(Async::Ready(self.running.is_done())) } else { Ok(Async::NotReady) } } pub(crate) fn poll_response(&mut self, req: &mut HttpRequest) -> Poll { loop { let state = mem::replace(&mut self.response, ResponseState::Prepared(None)); match state { ResponseState::Ready(response) => { // run middlewares if let Some(mut middlewares) = self.middlewares.take() { match middlewares.response(req, response) { Ok(Some(response)) => return Ok(Async::Ready(response)), Ok(None) => { // middlewares need to run some futures self.response = ResponseState::Middlewares(middlewares); continue } Err(err) => return Err(err), } } else { return Ok(Async::Ready(response)) } } ResponseState::Middlewares(mut middlewares) => { // process middlewares match middlewares.poll(req) { Ok(Async::NotReady) => { self.response = ResponseState::Middlewares(middlewares); return Ok(Async::NotReady) }, Ok(Async::Ready(response)) => return Ok(Async::Ready(response)), Err(err) => return Err(err), } } _ => (), } self.response = state; match mem::replace(&mut self.stream, TaskStream::None) { TaskStream::None => return Ok(Async::NotReady), TaskStream::Context(mut context) => { loop { match context.poll() { Ok(Async::Ready(Some(frame))) => { match frame { Frame::Message(msg) => { if !self.response.is_reading() { error!("Unexpected message frame {:?}", msg); return Err(UnexpectedTaskFrame.into()) } self.stream = TaskStream::Context(context); self.response = ResponseState::Ready(msg); break }, Frame::Payload(_) => (), Frame::Drain(fut) => { self.drain.push(fut); self.stream = TaskStream::Context(context); break } } }, Ok(Async::Ready(None)) => { error!("Unexpected eof"); return Err(UnexpectedTaskFrame.into()) }, Ok(Async::NotReady) => { self.stream = TaskStream::Context(context); return Ok(Async::NotReady) }, Err(err) => return Err(err), } } }, TaskStream::Response(mut fut) => { match fut.poll() { Ok(Async::NotReady) => { self.stream = TaskStream::Response(fut); return Ok(Async::NotReady); }, Ok(Async::Ready(response)) => { self.response = ResponseState::Ready(response); } Err(err) => return Err(err) } } } } } pub(crate) fn poll(&mut self) -> Poll<(), Error> { match self.stream { TaskStream::None | TaskStream::Response(_) => Ok(Async::Ready(())), TaskStream::Context(ref mut context) => { loop { match context.poll() { Ok(Async::Ready(Some(_))) => (), Ok(Async::Ready(None)) => return Ok(Async::Ready(())), Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => return Err(err), } } }, } } fn poll_context(&mut self) -> Poll, Error> { match self.stream { TaskStream::None | TaskStream::Response(_) => Err(UnexpectedTaskFrame.into()), TaskStream::Context(ref mut context) => { match context.poll() { Ok(Async::Ready(Some(frame))) => { match frame { Frame::Message(msg) => { error!("Unexpected message frame {:?}", msg); Err(UnexpectedTaskFrame.into()) }, Frame::Payload(payload) => { Ok(Async::Ready(payload)) }, Frame::Drain(fut) => { self.drain.push(fut); Ok(Async::NotReady) } } }, Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), Err(err) => Err(err), } }, } } }