From 18106170ce92eef9381436f10cb93c44f078e45f Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 17 May 2021 20:09:26 +0100 Subject: [PATCH] add streaming example --- actix-http/Cargo.toml | 1 + actix-http/examples/echo.rs | 6 ++-- actix-http/examples/echo2.rs | 6 ++-- actix-http/examples/hello-world.rs | 12 ++++---- actix-http/examples/streaming-error.rs | 40 ++++++++++++++++++++++++++ 5 files changed, 53 insertions(+), 12 deletions(-) create mode 100644 actix-http/examples/streaming-error.rs diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 1f7df39a6..a6f9d0caf 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -90,6 +90,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tls-openssl = { version = "0.10", package = "openssl" } tls-rustls = { version = "0.19", package = "rustls" } +async-stream = "0.3" [[example]] name = "ws" diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index 54a71a106..6cfe3a675 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -5,14 +5,13 @@ use actix_server::Server; use bytes::BytesMut; use futures_util::StreamExt as _; use http::header::HeaderValue; -use log::info; #[actix_rt::main] async fn main() -> io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() - .bind("echo", "127.0.0.1:8080", || { + .bind("echo", ("127.0.0.1", 8080), || { HttpService::build() .client_timeout(1000) .client_disconnect(1000) @@ -22,7 +21,8 @@ async fn main() -> io::Result<()> { body.extend_from_slice(&item?); } - info!("request body: {:?}", body); + log::info!("request body: {:?}", body); + Ok::<_, Error>( Response::build(StatusCode::OK) .insert_header(( diff --git a/actix-http/examples/echo2.rs b/actix-http/examples/echo2.rs index 3974cf20b..db195d65b 100644 --- a/actix-http/examples/echo2.rs +++ b/actix-http/examples/echo2.rs @@ -5,7 +5,6 @@ use actix_http::{Error, HttpService, Request, Response}; use actix_server::Server; use bytes::BytesMut; use futures_util::StreamExt as _; -use log::info; async fn handle_request(mut req: Request) -> Result, Error> { let mut body = BytesMut::new(); @@ -13,7 +12,8 @@ async fn handle_request(mut req: Request) -> Result, Error> { body.extend_from_slice(&item?) } - info!("request body: {:?}", body); + log::info!("request body: {:?}", body); + Ok(Response::build(StatusCode::OK) .insert_header(("x-head", HeaderValue::from_static("dummy value!"))) .body(body)) @@ -24,7 +24,7 @@ async fn main() -> io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() - .bind("echo", "127.0.0.1:8080", || { + .bind("echo", ("127.0.0.1", 8080), || { HttpService::build().finish(handle_request).tcp() })? .run() diff --git a/actix-http/examples/hello-world.rs b/actix-http/examples/hello-world.rs index d51de6f4e..732be377c 100644 --- a/actix-http/examples/hello-world.rs +++ b/actix-http/examples/hello-world.rs @@ -2,27 +2,27 @@ use std::io; use actix_http::{http::StatusCode, HttpService, Response}; use actix_server::Server; -use actix_utils::future; use http::header::HeaderValue; -use log::info; #[actix_rt::main] async fn main() -> io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() - .bind("hello-world", "127.0.0.1:8080", || { + .bind("hello-world", ("127.0.0.1", 8080), || { HttpService::build() .client_timeout(1000) .client_disconnect(1000) - .finish(|_req| { - info!("{:?}", _req); + .finish(|req| async move { + log::info!("{:?}", req); + let mut res = Response::build(StatusCode::OK); res.insert_header(( "x-head", HeaderValue::from_static("dummy value!"), )); - future::ok::<_, ()>(res.body("Hello world!")) + + Ok::<_, ()>(res.body("Hello world!")) }) .tcp() })? diff --git a/actix-http/examples/streaming-error.rs b/actix-http/examples/streaming-error.rs new file mode 100644 index 000000000..c33e57a96 --- /dev/null +++ b/actix-http/examples/streaming-error.rs @@ -0,0 +1,40 @@ +//! Example showing response body (chunked) stream erroring. +//! +//! Test using `nc` or `curl`. +//! ```sh +//! $ curl -vN 127.0.0.1:8080 +//! $ echo 'GET / HTTP/1.1\n\n' | nc 127.0.0.1 8080 +//! ``` + +use std::{io, time::Duration}; + +use actix_http::{body::BodyStream, Error, HttpService, Response}; +use actix_server::Server; +use async_stream::stream; +use bytes::Bytes; + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + Server::build() + .bind("streaming-error", ("127.0.0.1", 8080), || { + HttpService::build() + .finish(|req| async move { + log::info!("{:?}", req); + let res = Response::ok(); + + Ok::<_, ()>(res.set_body(BodyStream::new(stream! { + yield Ok(Bytes::from("123")); + yield Ok(Bytes::from("456")); + + actix_rt::time::sleep(Duration::from_millis(1000)).await; + + yield Err(Error::from(())); + }))) + }) + .tcp() + })? + .run() + .await +}