diff --git a/Cargo.toml b/Cargo.toml index b8157421c..571a8be35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ time = "0.1" encoding = "0.2" language-tags = "0.2" lazy_static = "1.0" +crossbeam-channel = "0.2" url = { version="1.7", features=["query_encoding"] } cookie = { version="0.10", features=["percent-encode"] } brotli2 = { version="^0.3.2", optional = true } diff --git a/src/client/mod.rs b/src/client/mod.rs index b40fa2ece..d73826853 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -31,6 +31,7 @@ mod pipeline; mod request; mod response; mod writer; +pub mod sync; pub use self::body::{ClientBody, ClientBodyStream}; pub use self::connector::{ diff --git a/src/client/request.rs b/src/client/request.rs index bc8feb3e7..3a5824cef 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -250,6 +250,11 @@ impl ClientRequest { send } } + + /// Transforms request into synchronous. + pub fn sync(self) -> super::sync::ClientRequest { + super::sync::ClientRequest(self) + } } impl fmt::Debug for ClientRequest { diff --git a/src/client/response.rs b/src/client/response.rs index f76d058e5..4e2bc9989 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -51,6 +51,14 @@ impl ClientResponse { self.1 = Some(pl); } + pub(crate) fn into_parts(self) -> (Rc>, Option>) { + (self.0, self.1) + } + + pub(crate) fn from_parts(msg: ClientMessage, pipeline: Option>) -> Self { + ClientResponse(Rc::new(UnsafeCell::new(msg)), pipeline) + } + #[inline] fn as_ref(&self) -> &ClientMessage { unsafe { &*self.0.get() } diff --git a/src/client/sync.rs b/src/client/sync.rs new file mode 100644 index 000000000..3972e5fe4 --- /dev/null +++ b/src/client/sync.rs @@ -0,0 +1,263 @@ +//! Sync version of client's HTTP primitives +extern crate serde; +extern crate futures; +extern crate bytes; +extern crate tokio; +extern crate http; +extern crate encoding; +extern crate mime; +extern crate serde_urlencoded; +extern crate serde_json; +extern crate crossbeam_channel; + +use std::rc::Rc; +use std::io; +use std::thread; +use std::ops::{Deref, DerefMut}; + +use self::mime::Mime; +use self::encoding::all::UTF_8; +use self::encoding::label::encoding_from_whatwg_label; +use self::encoding::types::{DecoderTrap, Encoding}; +use self::encoding::EncodingRef; +use self::http::{header, HeaderMap}; +use self::bytes::Bytes; +use self::serde::de::DeserializeOwned; +use self::futures::{Future, Stream}; +use self::futures::sync::{oneshot}; + +use super::response::ClientMessage; +use super::pipeline::Pipeline; +use ::httpmessage::HttpMessage; +use ::error::{JsonPayloadError, PayloadError, UrlencodedError, ContentTypeError}; +use ::dev::{JsonBody, MessageBody, UrlEncoded}; +use super::SendRequestError; +mod async { + pub use super::super::request::ClientRequest; + pub use super::super::response::ClientResponse; +} + +/// An synchronous adaptor for body +pub trait SyncBody: Future { + /// Reads entire body synchronously. + /// + /// Note: You should not attempt to read the whole future on the same thread + /// as actix event loop. + fn collect(self) -> Result where Self: Sized { + self.wait() + } +} + +impl SyncBody for MessageBody + where T: HttpMessage + Stream + 'static {} + +impl SyncBody for UrlEncoded + where T: HttpMessage + Stream + 'static {} + +impl SyncBody for JsonBody + where T: HttpMessage + Stream + 'static {} + +/// An synchronous HTTP Client Request +pub struct ClientRequest(pub async::ClientRequest); + +impl Deref for ClientRequest { + type Target = async::ClientRequest; + + fn deref(&self) -> &async::ClientRequest { + &self.0 + } +} + +impl DerefMut for ClientRequest { + fn deref_mut(&mut self) -> &mut async::ClientRequest { + &mut self.0 + } +} + +/// An synchronous HTTP Client Response +pub struct ClientResponse { + sender: ClientSender, + message: ClientMessage, + pipeline: Option>, +} + +impl ClientResponse { + fn new(message: ClientMessage, pipeline: Option>, sender: ClientSender) -> Self { + Self { + sender, + message, + pipeline + } + } + + ///Transforms self into asynchronous response + pub fn into_async(self) -> async::ClientResponse { + async::ClientResponse::from_parts(self.message, self.pipeline) + } + + /// Retrieves headers. + pub fn headers(&self) -> &HeaderMap { + &self.message.headers + } + + /// Read the request content type. If request does not contain + /// *Content-Type* header, empty str get returned. + pub fn content_type(&self) -> &str { + if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + return content_type.split(';').next().unwrap().trim(); + } + } + "" + } + + /// Convert the request content type to a known mime type. + fn mime_type(&self) -> Result, ContentTypeError> { + if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + return match content_type.parse() { + Ok(mt) => Ok(Some(mt)), + Err(_) => Err(ContentTypeError::ParseError), + }; + } else { + return Err(ContentTypeError::ParseError); + } + } + Ok(None) + } + + /// Get content type encoding + /// + /// UTF-8 is used by default, If request charset is not set. + fn encoding(&self) -> Result { + if let Some(mime_type) = self.mime_type()? { + if let Some(charset) = mime_type.get_param("charset") { + if let Some(enc) = encoding_from_whatwg_label(charset.as_str()) { + Ok(enc) + } else { + Err(ContentTypeError::UnknownEncoding) + } + } else { + Ok(UTF_8) + } + } else { + Ok(UTF_8) + } + } + + ///Synchronously receive response body as raw bytes. + pub fn sync_body(self) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender.send((SyncJob::CollectBody(self.message, self.pipeline), sender)); + + match receiver.wait() { + Ok(rsp) => match rsp { + SyncJobResult::Body(result) => result, + _ => unreachable!() + }, + Err(_canceled) => panic!("worker thread panicked!"), + } + } + + ///Synchronously receive response body as url encoded form + pub fn sync_urlencoded(self) -> Result { + // check content type + if self.content_type().to_lowercase() != "application/x-www-form-urlencoded" { + return Err(UrlencodedError::ContentType); + } + let encoding = self.encoding().map_err(|_| UrlencodedError::ContentType)?; + + let body = self.sync_body().map_err(|error| UrlencodedError::from(error))?; + + let enc: *const Encoding = encoding as *const Encoding; + if enc == UTF_8 { + serde_urlencoded::from_bytes::(&body).map_err(|_| UrlencodedError::Parse) + } else { + let body = encoding .decode(&body, DecoderTrap::Strict) .map_err(|_| UrlencodedError::Parse)?; + serde_urlencoded::from_str::(&body).map_err(|_| UrlencodedError::Parse) + } + } + + ///Synchronously receive response body as json. + pub fn sync_json(self) -> Result { + let json = match self.mime_type() { + Ok(Some(mime)) => mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON), + _ => false + }; + + if !json { + return Err(JsonPayloadError::ContentType); + } + + let body = self.sync_body().map_err(|error| JsonPayloadError::from(error))?; + serde_json::from_slice::(&body).map_err(|error| JsonPayloadError::from(error)) + } +} + +enum SyncJob { + Request(async::ClientRequest), + CollectBody(ClientMessage, Option>), +} + +enum SyncJobResult { + Response(ClientMessage, Option>), + SendError(SendRequestError), + Body(Result), +} + +type ClientSender = crossbeam_channel::Sender<(SyncJob, oneshot::Sender)>; +///Synchronous HTTP Client +pub struct Client { + sender: ClientSender, + _worker: thread::JoinHandle<()> +} + +impl Client { + ///Creates new instance. + pub fn new() -> io::Result { + let (sender, receiver) = crossbeam_channel::unbounded(); + let worker = thread::Builder::new().name("actix-web-sync-worker".into()).spawn(move || { + while let Some((req, receiver)) = receiver.recv() { + let receiver: oneshot::Sender = receiver; + let _ = match req { + SyncJob::Request(req) => match req.send().wait() { + Ok(rsp) => { + let (msg, pipeline) = rsp.into_parts(); + //TODO: Consider Do we need Rc actually? + let msg = match Rc::try_unwrap(msg) { + Ok(msg) => msg.into_inner(), + Err(_) => panic!("Unable to unwrap") + }; + receiver.send(SyncJobResult::Response(msg, pipeline)) + }, + Err(error) => receiver.send(SyncJobResult::SendError(error)) + }, + SyncJob::CollectBody(message, pipeline) => { + let body = async::ClientResponse::from_parts(message, pipeline).body(); + receiver.send(SyncJobResult::Body(body.collect())) + }, + }; + } + })?; + + Ok(Self { + sender: sender, + _worker: worker + }) + } + + ///Sends HTTP request synchronously. + pub fn send(&self, request: async::ClientRequest) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender.send((SyncJob::Request(request), sender)); + + match receiver.wait() { + Ok(rsp) => match rsp { + SyncJobResult::Response(message, pipeline) => Ok(ClientResponse::new(message, pipeline, self.sender.clone())), + SyncJobResult::SendError(error) => Err(error), + _ => unreachable!() + }, + Err(_canceled) => panic!("worker thread panicked!"), + } + } +} diff --git a/tests/test_client.rs b/tests/test_client.rs index 3128bb942..91ec750fd 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -458,3 +458,18 @@ fn test_default_headers() { "\"" ))); } + +#[test] +fn test_sync_client() { + use actix_web::client::sync; + + actix::System::run(|| { + actix::Arbiter::system().do_send(actix::msgs::SystemExit(0)); + + let client = sync::Client::new().expect("To create sync client"); + let srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR))); + let request = srv.get().finish().expect("To create request"); + //TODO: seems to fail to spawn job + let response = client.send(request).expect("To get successful response"); + }); +}