Compare commits

...

8 Commits

Author SHA1 Message Date
Joel Wurtz d038ef59f5
Merge adf3a06805 into 98d7d0b46b 2025-08-29 22:48:11 +01:00
Rob Ede 98d7d0b46b
chore(actix-files): prepare release 0.6.7 2025-08-29 22:31:48 +01:00
Rob Ede 4966a54e05
refactor(files): rename read_mode_threshold fn 2025-08-29 22:30:47 +01:00
Andrew Scott 00b0f8f700
feat(actix-files): opt-in filesize threshold for faster synchronous reads (#3706)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2025-08-29 21:52:34 +01:00
励志买套上海苏河湾大平层 3c2907da41
docs(middleware): complete middleware author's guide (#3680)
Add comprehensive documentation for middleware development in Actix Web, including:
- Detailed explanation of middleware concepts and execution flow
- Clear description of middleware traits and their responsibilities
- Guidelines for body type handling
- Best practices for middleware development
- Error handling recommendations
- Usage scenarios and anti-patterns

Co-authored-by: chenjjiaa <chenjjiaaa@gmail.com>
2025-08-29 20:12:05 +00:00
George Pollard 5041cd1c65
Make 'ws' feature of actix-http optional in actix-web (#3734)
* Make 'ws' feature of actix-http optional

* Update CHANGES.md

* Update actix-web-actors

* Update CHANGES.md

* nits

* nits

---------

Co-authored-by: Rob Ede <robjtede@icloud.com>
2025-08-29 02:50:05 +00:00
Thales d3c46537b3
fix(http): Wake Payload when feeding error or EOF (#3749)
* fix(http): Add failing tests to demonstrate the payload problem

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>

* fix(http): Wake Payload when feeding error or eof

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>

---------

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>
2025-08-29 02:47:03 +00:00
Joel Wurtz adf3a06805
feat(awc): allow to retrieve request head in client response 2024-12-16 15:40:23 +01:00
27 changed files with 334 additions and 98 deletions

View File

@ -9,4 +9,5 @@ words:
- rustls
- rustup
- serde
- uring
- zstd

31
Cargo.lock generated
View File

@ -44,7 +44,7 @@ dependencies = [
[[package]]
name = "actix-files"
version = "0.6.6"
version = "0.6.7"
dependencies = [
"actix-http",
"actix-rt",
@ -891,18 +891,18 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.45"
version = "4.5.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318"
checksum = "2c5e4fcf9c21d2e544ca1ee9d8552de13019a42aa7dbf32747fa7aaf1df76e57"
dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.5.44"
version = "4.5.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8"
checksum = "fecb53a0e6fcfb055f686001bc2e2592fa527efaf38dbe81a6a9563562e57d41"
dependencies = [
"anstyle",
"clap_lex",
@ -1482,7 +1482,7 @@ dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
"wasi 0.14.3+wasi-0.2.4",
]
[[package]]
@ -2305,9 +2305,9 @@ dependencies = [
[[package]]
name = "potential_utf"
version = "0.1.2"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585"
checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a"
dependencies = [
"zerovec",
]
@ -3461,11 +3461,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "wasi"
version = "0.14.2+wasi-0.2.4"
version = "0.14.3+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95"
dependencies = [
"wit-bindgen-rt",
"wit-bindgen",
]
[[package]]
@ -3873,13 +3873,10 @@ dependencies = [
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
name = "wit-bindgen"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
dependencies = [
"bitflags 2.9.3",
]
checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814"
[[package]]
name = "writeable"

View File

@ -2,6 +2,9 @@
## Unreleased
## 0.6.7
- Add `{Files, NamedFile}::read_mode_threshold()` methods to allow faster synchronous reads of small files.
- Minimum supported Rust version (MSRV) is now 1.75.
## 0.6.6

View File

@ -1,6 +1,6 @@
[package]
name = "actix-files"
version = "0.6.6"
version = "0.6.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Rob Ede <robjtede@icloud.com>"]
description = "Static file serving for Actix Web"
keywords = ["actix", "http", "async", "futures"]

View File

@ -3,11 +3,11 @@
<!-- prettier-ignore-start -->
[![crates.io](https://img.shields.io/crates/v/actix-files?label=latest)](https://crates.io/crates/actix-files)
[![Documentation](https://docs.rs/actix-files/badge.svg?version=0.6.6)](https://docs.rs/actix-files/0.6.6)
[![Documentation](https://docs.rs/actix-files/badge.svg?version=0.6.7)](https://docs.rs/actix-files/0.6.7)
![Version](https://img.shields.io/badge/rustc-1.72+-ab6000.svg)
![License](https://img.shields.io/crates/l/actix-files.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-files/0.6.6/status.svg)](https://deps.rs/crate/actix-files/0.6.6)
[![dependency status](https://deps.rs/crate/actix-files/0.6.7/status.svg)](https://deps.rs/crate/actix-files/0.6.7)
[![Download](https://img.shields.io/crates/d/actix-files.svg)](https://crates.io/crates/actix-files)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -14,6 +14,12 @@ use pin_project_lite::pin_project;
use super::named::File;
#[derive(Debug, Clone, Copy)]
pub(crate) enum ReadMode {
Sync,
Async,
}
pin_project! {
/// Adapter to read a `std::file::File` in chunks.
#[doc(hidden)]
@ -24,6 +30,7 @@ pin_project! {
state: ChunkedReadFileState<Fut>,
counter: u64,
callback: F,
read_mode: ReadMode,
}
}
@ -57,6 +64,7 @@ pub(crate) fn new_chunked_read(
size: u64,
offset: u64,
file: File,
read_mode_threshold: u64,
) -> impl Stream<Item = Result<Bytes, Error>> {
ChunkedReadFile {
size,
@ -69,31 +77,50 @@ pub(crate) fn new_chunked_read(
},
counter: 0,
callback: chunked_read_file_callback,
read_mode: if size < read_mode_threshold {
ReadMode::Sync
} else {
ReadMode::Async
},
}
}
#[cfg(not(feature = "experimental-io-uring"))]
async fn chunked_read_file_callback(
fn chunked_read_file_callback_sync(
mut file: File,
offset: u64,
max_bytes: usize,
) -> Result<(File, Bytes), Error> {
) -> Result<(File, Bytes), io::Error> {
use io::{Read as _, Seek as _};
let res = actix_web::web::block(move || {
let mut buf = Vec::with_capacity(max_bytes);
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
} else {
Ok((file, Bytes::from(buf)))
if n_bytes == 0 {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
} else {
Ok((file, Bytes::from(buf)))
}
}
#[cfg(not(feature = "experimental-io-uring"))]
#[inline]
async fn chunked_read_file_callback(
file: File,
offset: u64,
max_bytes: usize,
read_mode: ReadMode,
) -> Result<(File, Bytes), Error> {
let res = match read_mode {
ReadMode::Sync => chunked_read_file_callback_sync(file, offset, max_bytes)?,
ReadMode::Async => {
actix_web::web::block(move || chunked_read_file_callback_sync(file, offset, max_bytes))
.await??
}
})
.await??;
};
Ok(res)
}
@ -171,7 +198,7 @@ where
#[cfg(not(feature = "experimental-io-uring"))]
impl<F, Fut> Stream for ChunkedReadFile<F, Fut>
where
F: Fn(File, u64, usize) -> Fut,
F: Fn(File, u64, usize, ReadMode) -> Fut,
Fut: Future<Output = Result<(File, Bytes), Error>>,
{
type Item = Result<Bytes, Error>;
@ -193,7 +220,7 @@ where
.take()
.expect("ChunkedReadFile polled after completion");
let fut = (this.callback)(file, offset, max_bytes);
let fut = (this.callback)(file, offset, max_bytes, *this.read_mode);
this.state
.project_replace(ChunkedReadFileState::Future { fut });

View File

@ -49,6 +49,7 @@ pub struct Files {
use_guards: Option<Rc<dyn Guard>>,
guards: Vec<Rc<dyn Guard>>,
hidden_files: bool,
read_mode_threshold: u64,
}
impl fmt::Debug for Files {
@ -73,6 +74,7 @@ impl Clone for Files {
use_guards: self.use_guards.clone(),
guards: self.guards.clone(),
hidden_files: self.hidden_files,
read_mode_threshold: self.read_mode_threshold,
}
}
}
@ -119,6 +121,7 @@ impl Files {
use_guards: None,
guards: Vec::new(),
hidden_files: false,
read_mode_threshold: 0,
}
}
@ -204,6 +207,23 @@ impl Files {
self
}
/// Sets the size threshold that determines file read mode (sync/async).
///
/// When a file is smaller than the threshold (bytes), the reader will switch from synchronous
/// (blocking) file-reads to async reads to avoid blocking the main-thread when processing large
/// files.
///
/// Tweaking this value according to your expected usage may lead to signifiant performance
/// gains (or losses in other handlers, if `size` is too high).
///
/// When the `experimental-io-uring` crate feature is enabled, file reads are always async.
///
/// Default is 0, meaning all files are read asynchronously.
pub fn read_mode_threshold(mut self, size: u64) -> Self {
self.read_mode_threshold = size;
self
}
/// Specifies whether to use ETag or not.
///
/// Default is true.
@ -367,6 +387,7 @@ impl ServiceFactory<ServiceRequest> for Files {
file_flags: self.file_flags,
guards: self.use_guards.clone(),
hidden_files: self.hidden_files,
size_threshold: self.read_mode_threshold,
};
if let Some(ref default) = *self.default.borrow() {

View File

@ -80,6 +80,7 @@ pub struct NamedFile {
pub(crate) content_type: Mime,
pub(crate) content_disposition: ContentDisposition,
pub(crate) encoding: Option<ContentEncoding>,
pub(crate) read_mode_threshold: u64,
}
#[cfg(not(feature = "experimental-io-uring"))]
@ -200,6 +201,7 @@ impl NamedFile {
encoding,
status_code: StatusCode::OK,
flags: Flags::default(),
read_mode_threshold: 0,
})
}
@ -353,6 +355,23 @@ impl NamedFile {
self
}
/// Sets the size threshold that determines file read mode (sync/async).
///
/// When a file is smaller than the threshold (bytes), the reader will switch from synchronous
/// (blocking) file-reads to async reads to avoid blocking the main-thread when processing large
/// files.
///
/// Tweaking this value according to your expected usage may lead to signifiant performance
/// gains (or losses in other handlers, if `size` is too high).
///
/// When the `experimental-io-uring` crate feature is enabled, file reads are always async.
///
/// Default is 0, meaning all files are read asynchronously.
pub fn read_mode_threshold(mut self, size: u64) -> Self {
self.read_mode_threshold = size;
self
}
/// Specifies whether to return `ETag` header in response.
///
/// Default is true.
@ -440,7 +459,8 @@ impl NamedFile {
res.insert_header((header::CONTENT_ENCODING, current_encoding.as_str()));
}
let reader = chunked::new_chunked_read(self.md.len(), 0, self.file);
let reader =
chunked::new_chunked_read(self.md.len(), 0, self.file, self.read_mode_threshold);
return res.streaming(reader);
}
@ -577,7 +597,7 @@ impl NamedFile {
.map_into_boxed_body();
}
let reader = chunked::new_chunked_read(length, offset, self.file);
let reader = chunked::new_chunked_read(length, offset, self.file, self.read_mode_threshold);
if offset != 0 || length != self.md.len() {
res.status(StatusCode::PARTIAL_CONTENT);

View File

@ -39,6 +39,7 @@ pub struct FilesServiceInner {
pub(crate) file_flags: named::Flags,
pub(crate) guards: Option<Rc<dyn Guard>>,
pub(crate) hidden_files: bool,
pub(crate) size_threshold: u64,
}
impl fmt::Debug for FilesServiceInner {
@ -70,7 +71,9 @@ impl FilesService {
named_file.flags = self.file_flags;
let (req, _) = req.into_parts();
let res = named_file.into_response(&req);
let res = named_file
.read_mode_threshold(self.size_threshold)
.into_response(&req);
ServiceResponse::new(req, res)
}
@ -169,17 +172,7 @@ impl Service<ServiceRequest> for FilesService {
}
} else {
match NamedFile::open_async(&path).await {
Ok(mut named_file) => {
if let Some(ref mime_override) = this.mime_override {
let new_disposition = mime_override(&named_file.content_type.type_());
named_file.content_disposition.disposition = new_disposition;
}
named_file.flags = this.file_flags;
let (req, _) = req.into_parts();
let res = named_file.into_response(&req);
Ok(ServiceResponse::new(req, res))
}
Ok(named_file) => Ok(this.serve_named_file(req, named_file)),
Err(err) => this.handle_err(err, req).await,
}
}

View File

@ -2,6 +2,8 @@
## Unreleased
- Properly wake Payload receivers when feeding errors or EOF
## 3.11.1
- Prevent more hangs after client disconnects.

View File

@ -156,7 +156,7 @@ serde_json = "1.0"
static_assertions = "1"
tls-openssl = { package = "openssl", version = "0.10.55" }
tls-rustls_023 = { package = "rustls", version = "0.23" }
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] }
tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }
[lints]
workspace = true

View File

@ -188,16 +188,16 @@ impl Decoder for ClientPayloadCodec {
}
}
impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
impl Encoder<Message<(&mut RequestHeadType, BodySize)>> for ClientCodec {
type Error = io::Error;
fn encode(
&mut self,
item: Message<(RequestHeadType, BodySize)>,
item: Message<(&mut RequestHeadType, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
Message::Item((mut head, length)) => {
Message::Item((head, length)) => {
let inner = &mut self.inner;
inner.version = head.as_ref().version;
inner
@ -219,7 +219,7 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
inner.encoder.encode(
dst,
&mut head,
head,
false,
false,
inner.version,

View File

@ -200,11 +200,13 @@ impl Inner {
#[inline]
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
self.wake();
}
#[inline]
fn feed_eof(&mut self) {
self.eof = true;
self.wake();
}
#[inline]
@ -253,8 +255,13 @@ impl Inner {
#[cfg(test)]
mod tests {
use std::{task::Poll, time::Duration};
use actix_rt::time::timeout;
use actix_utils::future::poll_fn;
use futures_util::{FutureExt, StreamExt};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use tokio::sync::oneshot;
use super::*;
@ -263,6 +270,67 @@ mod tests {
assert_impl_all!(Inner: Unpin, Send, Sync);
const WAKE_TIMEOUT: Duration = Duration::from_secs(2);
fn prepare_waking_test(
mut payload: Payload,
expected: Option<Result<(), ()>>,
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
let (tx, rx) = oneshot::channel();
let handle = actix_rt::spawn(async move {
// Make sure to poll once to set the waker
poll_fn(|cx| {
assert!(payload.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
tx.send(()).unwrap();
// actix-rt is single-threaded, so this won't race with `rx.await`
let mut pend_once = false;
poll_fn(|_| {
if pend_once {
Poll::Ready(())
} else {
// Return pending without storing wakers, we already did on the previous
// `poll_fn`, now this task will only continue if the `sender` wakes us
pend_once = true;
Poll::Pending
}
})
.await;
let got = payload.next().now_or_never().unwrap();
match expected {
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
Some(Err(_)) => assert!(got.unwrap().is_err()),
None => assert!(got.is_none()),
}
});
(rx, handle)
}
#[actix_rt::test]
async fn wake_on_error() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
rx.await.unwrap();
sender.set_error(PayloadError::Incomplete(None));
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}
#[actix_rt::test]
async fn wake_on_eof() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, None);
rx.await.unwrap();
sender.feed_eof();
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}
#[actix_rt::test]
async fn test_unread_data() {
let (_, mut payload) = Payload::create(false);

View File

@ -24,7 +24,7 @@ allowed_external_types = [
actix = { version = ">=0.12, <0.14", default-features = false }
actix-codec = "0.5"
actix-http = "3"
actix-web = { version = "4", default-features = false }
actix-web = { version = "4", default-features = false, features = ["ws"] }
bytes = "1"
bytestring = "1"

View File

@ -4,6 +4,7 @@
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist.
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user.
- Add `ws` crate feature (on-by-default) which forwards to `actix-http` and guards some of its `ResponseError` impls.
## 4.11.0

View File

@ -67,6 +67,7 @@ default = [
"http2",
"unicode",
"compat",
"ws",
]
# Brotli algorithm content-encoding support
@ -85,9 +86,12 @@ cookies = ["dep:cookie"]
# Secure & signed cookies
secure-cookies = ["cookies", "cookie/secure"]
# HTTP/2 support (including h2c).
# HTTP/2 support (including h2c)
http2 = ["actix-http/http2"]
# WebSocket support
ws = ["actix-http/ws"]
# TLS via OpenSSL
openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"]
@ -131,7 +135,7 @@ actix-service = "2"
actix-tls = { version = "3.4", default-features = false, optional = true }
actix-utils = "3"
actix-http = { version = "3.11", features = ["ws"] }
actix-http = "3.11"
actix-router = { version = "0.5.3", default-features = false, features = ["http"] }
actix-web-codegen = { version = "4.3", optional = true, default-features = false }

View File

@ -7,7 +7,6 @@ use std::{
io::{self, Write as _},
};
use actix_http::Response;
use bytes::BytesMut;
use crate::{
@ -126,20 +125,24 @@ impl ResponseError for actix_http::error::PayloadError {
}
}
impl ResponseError for actix_http::ws::ProtocolError {}
impl ResponseError for actix_http::error::ContentTypeError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
#[cfg(feature = "ws")]
impl ResponseError for actix_http::ws::HandshakeError {
fn error_response(&self) -> HttpResponse<BoxBody> {
Response::from(self).map_into_boxed_body().into()
actix_http::Response::from(self)
.map_into_boxed_body()
.into()
}
}
#[cfg(feature = "ws")]
impl ResponseError for actix_http::ws::ProtocolError {}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -2,16 +2,80 @@
## What Is A Middleware?
Middleware in Actix Web is a powerful mechanism that allows you to add additional behavior to request/response processing. It enables you to:
- Pre-process incoming requests (e.g., path normalization, authentication)
- Post-process outgoing responses (e.g., logging, compression)
- Modify application state through ServiceRequest
- Access external services (e.g., sessions, caching)
Middleware is registered for each App, Scope, or Resource and executed in the reverse order of registration. This means the last registered middleware is the first to process the request.
## Middleware Traits
Actix Web's middleware system is built on two main traits:
1. `Transform<S, Req>`: The builder trait that creates the actual Service. It's responsible for:
- Creating new middleware instances
- Assembling the middleware chain
- Handling initialization errors
2. `Service<Req>`: The trait that represents the actual middleware functionality. It:
- Processes requests and responses
- Can modify both request and response
- Can short-circuit request processing
- Must be implemented for the middleware to work
## Understanding Body Types
When working with middleware, it's important to understand body types:
- Middleware can work with different body types for requests and responses
- The `MessageBody` trait is used to handle different body types
- You can use `EitherBody` when you need to handle multiple body types
- Be careful with body consumption - once a body is consumed, it cannot be read again
## Best Practices
1. Keep middleware focused and single-purpose
2. Handle errors appropriately and propagate them correctly
3. Be mindful of performance impact
4. Use appropriate body types and handle them correctly
5. Consider middleware ordering carefully
6. Document your middleware's behavior and requirements
7. Test your middleware thoroughly
## Error Propagation
Proper error handling is crucial in middleware:
1. Always propagate errors from the inner service
2. Use appropriate error types
3. Handle initialization errors
4. Consider using custom error types for specific middleware errors
5. Document error conditions and handling
## When To (Not) Use Middleware
Use middleware when you need to:
- Add cross-cutting concerns
- Modify requests/responses globally
- Add authentication/authorization
- Add logging or monitoring
- Handle compression or caching
Avoid middleware when:
- The functionality is specific to a single route
- The operation is better handled by a service
- The overhead would be too high
- The functionality can be implemented more simply
## Author's References
- `EitherBody` + when is middleware appropriate: https://discord.com/channels/771444961383153695/952016890723729428
- Actix Web Documentation: https://docs.rs/actix-web
- Service Trait Documentation: https://docs.rs/actix-service
- MessageBody Trait Documentation: https://docs.rs/actix-web/latest/actix_web/body/trait.MessageBody.html

View File

@ -12,6 +12,7 @@
- Do not send `Host` header on HTTP/2 requests, as it is not required, and some web servers may reject it.
- Update `brotli` dependency to `7`.
- Minimum supported Rust version (MSRV) is now 1.75.
- Allow to retrieve request head used to send the http request on `ClientResponse`
## 3.5.1

View File

@ -243,7 +243,7 @@ where
self,
head: H,
body: RB,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
) -> LocalBoxFuture<'static, Result<(RequestHeadType, ResponseHead, Payload), SendRequestError>>
where
H: Into<RequestHeadType> + 'static,
RB: MessageBody + 'static,
@ -273,17 +273,24 @@ where
head: H,
) -> LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Connection<A, B>, ClientCodec>), SendRequestError>,
Result<
(
RequestHeadType,
ResponseHead,
Framed<Connection<A, B>, ClientCodec>,
),
SendRequestError,
>,
> {
Box::pin(async move {
match self {
Connection::Tcp(ConnectionType::H1(ref _conn)) => {
let (head, framed) = h1proto::open_tunnel(self, head.into()).await?;
Ok((head, framed))
let (head, res_head, framed) = h1proto::open_tunnel(self, head.into()).await?;
Ok((head, res_head, framed))
}
Connection::Tls(ConnectionType::H1(ref _conn)) => {
let (head, framed) = h1proto::open_tunnel(self, head.into()).await?;
Ok((head, framed))
let (head, res_head, framed) = h1proto::open_tunnel(self, head.into()).await?;
Ok((head, res_head, framed))
}
Connection::Tls(ConnectionType::H2(mut conn)) => {
conn.release();

View File

@ -28,7 +28,7 @@ pub(crate) async fn send_request<Io, B>(
io: H1Connection<Io>,
mut head: RequestHeadType,
body: B,
) -> Result<(ResponseHead, Payload), SendRequestError>
) -> Result<(RequestHeadType, ResponseHead, Payload), SendRequestError>
where
Io: ConnectionIo,
B: MessageBody,
@ -86,7 +86,7 @@ where
// special handle for EXPECT request.
let (do_send, mut res_head) = if is_expect {
pin_framed.send((head, body.size()).into()).await?;
pin_framed.send((&mut head, body.size()).into()).await?;
let head = poll_fn(|cx| pin_framed.as_mut().poll_next(cx))
.await
@ -96,7 +96,7 @@ where
// and current head would be used as final response head.
(head.status == StatusCode::CONTINUE, Some(head))
} else {
pin_framed.feed((head, body.size()).into()).await?;
pin_framed.feed((&mut head, body.size()).into()).await?;
(true, None)
};
@ -118,17 +118,16 @@ where
res_head = Some(head);
}
let head = res_head.unwrap();
match pin_framed.codec_ref().message_type() {
h1::MessageType::None => {
let keep_alive = pin_framed.codec_ref().keep_alive();
pin_framed.io_mut().on_release(keep_alive);
Ok((head, Payload::None))
Ok((head, res_head.unwrap(), Payload::None))
}
_ => Ok((
head,
res_head.unwrap(),
Payload::Stream {
payload: Box::pin(PlStream::new(framed)),
},
@ -138,21 +137,21 @@ where
pub(crate) async fn open_tunnel<Io>(
io: Io,
head: RequestHeadType,
) -> Result<(ResponseHead, Framed<Io, h1::ClientCodec>), SendRequestError>
mut head: RequestHeadType,
) -> Result<(RequestHeadType, ResponseHead, Framed<Io, h1::ClientCodec>), SendRequestError>
where
Io: ConnectionIo,
{
// create Framed and send request.
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, BodySize::None).into()).await?;
framed.send((&mut head, BodySize::None).into()).await?;
// read response head.
let head = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx))
let res_head = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx))
.await
.ok_or(ConnectError::Disconnected)??;
Ok((head, framed))
Ok((head, res_head, framed))
}
/// send request body to the peer

View File

@ -29,7 +29,7 @@ pub(crate) async fn send_request<Io, B>(
mut io: H2Connection<Io>,
head: RequestHeadType,
body: B,
) -> Result<(ResponseHead, Payload), SendRequestError>
) -> Result<(RequestHeadType, ResponseHead, Payload), SendRequestError>
where
Io: ConnectionIo,
B: MessageBody,
@ -129,10 +129,10 @@ where
let (parts, body) = resp.into_parts();
let payload = if head_req { Payload::None } else { body.into() };
let mut head = ResponseHead::new(parts.status);
head.version = parts.version;
head.headers = parts.headers.into();
Ok((head, payload))
let mut res_head = ResponseHead::new(parts.status);
res_head.version = parts.version;
res_head.headers = parts.headers.into();
Ok((head, res_head, payload))
}
async fn send_body<B>(body: B, mut send: SendStream<Bytes>) -> Result<(), SendRequestError>

View File

@ -49,7 +49,11 @@ pub enum ConnectResponse {
/// Tunnel used for WebSocket communication.
///
/// Contains response head and framed HTTP/1.1 codec.
Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
Tunnel(
RequestHeadType,
ResponseHead,
Framed<BoxedSocket, ClientCodec>,
),
}
impl ConnectResponse {
@ -70,9 +74,15 @@ impl ConnectResponse {
///
/// # Panics
/// Panics if enum variant is not `Tunnel`.
pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
pub fn into_tunnel_response(
self,
) -> (
RequestHeadType,
ResponseHead,
Framed<BoxedSocket, ClientCodec>,
) {
match self {
ConnectResponse::Tunnel(head, framed) => (head, framed),
ConnectResponse::Tunnel(req, head, framed) => (req, head, framed),
_ => {
panic!("TunnelResponse only reachable with ConnectResponse::TunnelResponse variant")
}
@ -133,12 +143,12 @@ pin_project_lite::pin_project! {
req: Option<ConnectRequest>
},
Client {
fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
fut: LocalBoxFuture<'static, Result<(RequestHeadType, ResponseHead, Payload), SendRequestError>>
},
Tunnel {
fut: LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
Result<(RequestHeadType, ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
>,
}
}
@ -181,16 +191,16 @@ where
}
ConnectRequestProj::Client { fut } => {
let (head, payload) = ready!(fut.as_mut().poll(cx))?;
let (req, head, payload) = ready!(fut.as_mut().poll(cx))?;
Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new(
head, payload,
req, head, payload,
))))
}
ConnectRequestProj::Tunnel { fut } => {
let (head, framed) = ready!(fut.as_mut().poll(cx))?;
let (req, head, framed) = ready!(fut.as_mut().poll(cx))?;
let framed = framed.into_map_io(|io| Box::new(io) as _);
Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
Poll::Ready(Ok(ConnectResponse::Tunnel(req, head, framed)))
}
}
}

View File

@ -329,6 +329,7 @@ mod tests {
let res = client.get(srv.url("/")).send().await.unwrap();
assert_eq!(res.status().as_u16(), 400);
assert_eq!(res.req_head().uri.path(), "/test");
}
#[actix_rt::test]

View File

@ -8,7 +8,7 @@ use std::{
use actix_http::{
error::PayloadError, header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage, Payload,
ResponseHead, StatusCode, Version,
RequestHead, RequestHeadType, ResponseHead, StatusCode, Version,
};
use actix_rt::time::{sleep, Sleep};
use bytes::Bytes;
@ -23,6 +23,7 @@ use crate::cookie::{Cookie, ParseError as CookieParseError};
pin_project! {
/// Client Response
pub struct ClientResponse<S = BoxedPayloadStream> {
pub(crate) req_head: RequestHeadType,
pub(crate) head: ResponseHead,
#[pin]
pub(crate) payload: Payload<S>,
@ -34,8 +35,9 @@ pin_project! {
impl<S> ClientResponse<S> {
/// Create new Request instance
pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
pub(crate) fn new(req_head: RequestHeadType, head: ResponseHead, payload: Payload<S>) -> Self {
ClientResponse {
req_head,
head,
payload,
timeout: ResponseTimeout::default(),
@ -43,6 +45,12 @@ impl<S> ClientResponse<S> {
}
}
/// Returns the request head used to send the request.
#[inline]
pub fn req_head(&self) -> &RequestHead {
self.req_head.as_ref()
}
#[inline]
pub(crate) fn head(&self) -> &ResponseHead {
&self.head
@ -77,6 +85,7 @@ impl<S> ClientResponse<S> {
ClientResponse {
payload,
req_head: self.req_head,
head: self.head,
timeout: self.timeout,
extensions: self.extensions,
@ -105,6 +114,7 @@ impl<S> ClientResponse<S> {
Self {
payload: self.payload,
head: self.head,
req_head: self.req_head,
timeout,
extensions: self.extensions,
}

View File

@ -1,6 +1,8 @@
//! Test helpers for actix http client to use during testing.
use actix_http::{h1, header::TryIntoHeaderPair, Payload, ResponseHead, StatusCode, Version};
use actix_http::{
h1, header::TryIntoHeaderPair, Payload, RequestHead, ResponseHead, StatusCode, Version,
};
use bytes::Bytes;
#[cfg(feature = "cookies")]
@ -9,6 +11,7 @@ use crate::ClientResponse;
/// Test `ClientResponse` builder
pub struct TestResponse {
req_head: RequestHead,
head: ResponseHead,
#[cfg(feature = "cookies")]
cookies: CookieJar,
@ -18,6 +21,7 @@ pub struct TestResponse {
impl Default for TestResponse {
fn default() -> TestResponse {
TestResponse {
req_head: RequestHead::default(),
head: ResponseHead::new(StatusCode::OK),
#[cfg(feature = "cookies")]
cookies: CookieJar::new(),
@ -86,10 +90,10 @@ impl TestResponse {
}
if let Some(pl) = self.payload {
ClientResponse::new(head, pl)
ClientResponse::new(self.req_head.into(), head, pl)
} else {
let (_, payload) = h1::Payload::create(true);
ClientResponse::new(head, payload.into())
ClientResponse::new(self.req_head.into(), head, payload.into())
}
}
}

View File

@ -351,7 +351,7 @@ impl WebsocketsRequest {
fut.await?
};
let (head, framed) = res.into_tunnel_response();
let (req_head, head, framed) = res.into_tunnel_response();
// verify response
if head.status != StatusCode::SWITCHING_PROTOCOLS {
@ -411,7 +411,7 @@ impl WebsocketsRequest {
// response and ws framed
Ok((
ClientResponse::new(head, Payload::None),
ClientResponse::new(req_head, head, Payload::None),
framed.into_map_codec(|_| {
if server_mode {
ws::Codec::new().max_size(max_size)