diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index aaf84d765..dafad3537 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -6,6 +6,7 @@ - Add `body::to_body_limit()` function. - Add `body::BodyLimitExceeded` error type. +- Add `body::channel`, `body::channel::Sender`, and `body::channel::Receiver` types. ## 3.3.1 - 2023-03-02 diff --git a/actix-http/src/body/channel.rs b/actix-http/src/body/channel.rs index 5ecfecda4..42e1bf9d8 100644 --- a/actix-http/src/body/channel.rs +++ b/actix-http/src/body/channel.rs @@ -3,6 +3,7 @@ use std::{error::Error as StdError, task::Poll}; use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}; use super::{BodySize, MessageBody}; + /// Creates an unbounded mpsc (multi-producer, single-consumer) channel for communicating between asynchronous tasks. /// /// This function returns a `Sender` half and a `Receiver` half that can be used as a body type, allowing for efficient streaming of data between tasks. The `Sender` can be cloned to allow sending to the same channel from multiple code locations, making it suitable for multi-producer scenarios. Only one `Receiver` is supported, adhering to the single-consumer principle. @@ -41,6 +42,7 @@ pub fn channel>>() -> (Sender, Receiver) { pub struct Sender { tx: UnboundedSender>, } + impl Sender { /// Constructs a new instance of the Sender struct with the specified UnboundedSender.\ /// UnboundedSender object representing the sender for underlying channel @@ -131,7 +133,6 @@ where #[cfg(test)] mod tests { - use actix_rt::pin; use std::io; @@ -261,7 +262,7 @@ mod tests { tx.send(Bytes::from_static(b"test")).unwrap(); tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); - let err = io::Error::new(io::ErrorKind::Other, "test"); + let err = io::Error::new(io::ErrorKind::Other, "error"); tx.close(Some(err)).unwrap(); @@ -269,9 +270,12 @@ mod tests { poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from_static(b"test")) ); - + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test2")) + ); let err = poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().err(); assert!(err.is_some()); - assert_eq!(err.unwrap().to_string(), "test"); + assert_eq!(err.unwrap().to_string(), "error"); } }