From 6d2a5fea64f31b20d0a8afdecbcc8b83064fcf1f Mon Sep 17 00:00:00 2001 From: yinho999 Date: Tue, 18 Apr 2023 00:16:20 +0100 Subject: [PATCH 1/6] Add channel wrapper, docs and testing --- actix-http/src/body/channel.rs | 176 +++++++++++++++++++++++++++++++++ actix-http/src/body/mod.rs | 2 + 2 files changed, 178 insertions(+) create mode 100644 actix-http/src/body/channel.rs diff --git a/actix-http/src/body/channel.rs b/actix-http/src/body/channel.rs new file mode 100644 index 000000000..f6b821733 --- /dev/null +++ b/actix-http/src/body/channel.rs @@ -0,0 +1,176 @@ +use bytes::Bytes; +use std::{error::Error as StdError, task::Poll}; +use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}; + +use super::{BodySize, MessageBody}; +/// Returns a sender half and a receiver half that can be used as a body type. +/// +/// # Examples +/// ``` +/// use actix_web::{HttpResponse, web}; +/// use std::convert::Infallible; +/// use actix_web::body::channel; +/// +/// #[actix_rt::main] +/// async fn main() { +/// let (mut body_tx, body) = channel::(); +/// +/// let _ = web::block(move || { +/// body_tx.send(web::Bytes::from_static(b"body from another thread")).unwrap(); +/// }); +/// +/// +/// HttpResponse::Ok().body(body); +/// } +/// ``` +pub fn channel>>() -> (Sender, Receiver) { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + (Sender::new(tx), Receiver::new(rx)) +} + +/// Channel Sender wrapper +/// +/// Senders can be cloned to create multiple senders that will send to the same underlying channel. +/// Senders should be mutable, as they can be used to close the channel. +#[derive(Debug)] +pub struct Sender { + tx: UnboundedSender>, +} +impl Sender { + pub fn new(tx: UnboundedSender>) -> Self { + Self { tx } + } + /// Submits a chunk of bytes to the response body stream. + /// + /// # Errors + /// Errors if other side of channel body was dropped, returning `chunk`. + pub fn send(&mut self, chunk: Bytes) -> Result<(), Bytes> { + self.tx.send(Ok(chunk)).map_err(|SendError(err)| match err { + Ok(chunk) => chunk, + Err(_) => unreachable!(), + }) + } + + /// Closes the stream, optionally sending an error. + /// + /// # Errors + /// Errors if closing with error and other side of channel body was dropped, returning `error`. + pub fn close(self, err: Option) -> Result<(), E> { + if let Some(err) = err { + return self.tx.send(Err(err)).map_err(|SendError(err)| match err { + Ok(_) => unreachable!(), + Err(err) => err, + }); + } + Ok(()) + } +} + +/// Clones the underlying [`UnboundedSender`]. +/// This creates a new handle to the same channel, allowing a message to be sent on multiple +/// handles. +/// +/// The returned [`Sender`] is a [`Clone`] of the original [`Sender`]. +impl Clone for Sender { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + } + } +} + +/// Channel Receiver wrapper +#[derive(Debug)] +pub struct Receiver { + rx: UnboundedReceiver>, +} + +impl Receiver { + pub fn new(rx: UnboundedReceiver>) -> Self { + Self { rx } + } +} + +impl MessageBody for Receiver +where + E: Into> + 'static, +{ + type Error = E; + + #[inline] + fn size(&self) -> BodySize { + BodySize::Stream + } + + /// Attempts to pull out the next value of the underlying [`UnboundedReceiver`]. + /// If the underlying [`UnboundedReceiver`] is not ready, the current task is scheduled to + /// receive a notification when it is ready to make progress. + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll>> { + self.rx.poll_recv(cx) + } +} + +#[cfg(test)] +mod tests { + + use actix_rt::pin; + use std::io; + + use super::*; + use actix_utils::future::poll_fn; + static_assertions::assert_impl_all!(Sender: Send, Sync, Unpin); + static_assertions::assert_impl_all!(Receiver: Send, Sync, Unpin, MessageBody); + + #[actix_rt::test] + async fn test_body_channel() { + let (mut tx, rx) = channel::(); + let mut tx_cloned = tx.clone(); + let rx = rx.boxed(); + pin!(rx); + + assert_eq!(rx.size(), BodySize::Stream); + + tx.send(Bytes::from_static(b"test")).unwrap(); + tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); + tx.close(None).unwrap(); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test")) + ); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test2")) + ); + } + + #[actix_rt::test] + async fn test_body_channel_error() { + let (mut tx, rx) = channel::(); + let mut tx_cloned = tx.clone(); + let rx = rx.boxed(); + pin!(rx); + + assert_eq!(rx.size(), BodySize::Stream); + + tx.send(Bytes::from_static(b"test")).unwrap(); + tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); + + let err = io::Error::new(io::ErrorKind::Other, "test"); + + tx.close(Some(err)).unwrap(); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test")) + ); + + let err = poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().err(); + assert!(err.is_some()); + assert_eq!(err.unwrap().to_string(), "test"); + } +} diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index d1708b9d5..fb1b2c570 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -7,6 +7,7 @@ mod body_stream; mod boxed; +mod channel; mod either; mod message_body; mod none; @@ -16,6 +17,7 @@ mod utils; pub use self::body_stream::BodyStream; pub use self::boxed::BoxBody; +pub use self::channel::{channel, Receiver, Sender}; pub use self::either::EitherBody; pub use self::message_body::MessageBody; pub(crate) use self::message_body::MessageBodyMapErr; From 7038c4fe45e0aaf03ffa74a5758faa09a4921337 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 18 Apr 2023 21:56:00 +0100 Subject: [PATCH 2/6] Added Docs test for channel, sender and receiver structs and function. Adding Unit testing --- actix-http/src/body/channel.rs | 161 ++++++++++++++++++++++++++++++--- 1 file changed, 147 insertions(+), 14 deletions(-) diff --git a/actix-http/src/body/channel.rs b/actix-http/src/body/channel.rs index f6b821733..e5cf605d6 100644 --- a/actix-http/src/body/channel.rs +++ b/actix-http/src/body/channel.rs @@ -3,8 +3,15 @@ use std::{error::Error as StdError, task::Poll}; use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}; use super::{BodySize, MessageBody}; -/// Returns a sender half and a receiver half that can be used as a body type. +/// Creates an unbounded mpsc (multi-producer, single-consumer) channel wrapper for communicating between asynchronous tasks. /// +/// This function returns a `Sender` half and a `Receiver` half that can be used as a body type, allowing for efficient streaming of data between tasks. The `Sender` can be cloned to allow sending to the same channel from multiple code locations, making it suitable for multi-producer scenarios. Only one `Receiver` is supported, adhering to the single-consumer principle. +/// +/// Since the channel is unbounded, it does not provide backpressure support. This means that the `Sender` can keep sending data without waiting, even if the `Receiver` is not able to process it quickly enough, which may cause memory issues if not handled carefully. +/// +/// If the `Receiver` is disconnected while trying to send, the `send` method will return a `SendError`. Similarly, if the `Sender` is disconnected while trying to receive, the data from the `Sender` will return `None`, indicating that there's no more data to be received. +/// +/// This unbounded channel implementation is useful for streaming response bodies in web applications or other scenarios where it's essential to maintain a steady flow of data between asynchronous tasks. However, be cautious when using it in situations where the rate of incoming data may overwhelm the receiver, as it may lead to memory issues. /// # Examples /// ``` /// use actix_web::{HttpResponse, web}; @@ -37,13 +44,19 @@ pub struct Sender { tx: UnboundedSender>, } impl Sender { + /// Constructs a new instance of the Sender struct with the specified UnboundedSender. + /// # Input + /// An UnboundedSender object representing the underlying channel + /// # Outputs + /// A Sender wrapper object pub fn new(tx: UnboundedSender>) -> Self { Self { tx } } /// Submits a chunk of bytes to the response body stream. - /// - /// # Errors - /// Errors if other side of channel body was dropped, returning `chunk`. + /// # Input + /// A Bytes object representing the chunk of bytes to send + /// # Outputs + /// A Result<(), Bytes> object. If the sending is successful, the method returns Ok(()). If the other side of the channel body was dropped, the method returns an Err(chunk). pub fn send(&mut self, chunk: Bytes) -> Result<(), Bytes> { self.tx.send(Ok(chunk)).map_err(|SendError(err)| match err { Ok(chunk) => chunk, @@ -52,9 +65,10 @@ impl Sender { } /// Closes the stream, optionally sending an error. - /// - /// # Errors - /// Errors if closing with error and other side of channel body was dropped, returning `error`. + /// # Input + /// An optional error to send + /// # Outputs + /// A Result<(), E> object. If the closing is successful, the method returns Ok(()). If the other side of the channel body was dropped, the method returns an Err(error). pub fn close(self, err: Option) -> Result<(), E> { if let Some(err) = err { return self.tx.send(Err(err)).map_err(|SendError(err)| match err { @@ -86,17 +100,37 @@ pub struct Receiver { } impl Receiver { + /// Constructs a new instance of the Receiver struct with the specified UnboundedReceiver + /// # Input + /// An UnboundedReceiver object representing the underlying channel + /// + /// # Outputs + /// A Receiver wrapper object pub fn new(rx: UnboundedReceiver>) -> Self { Self { rx } } } +/// Drop the underlying [`UnboundedReceiver`]. +/// This will cause the [`Receiver`] to stop receiving messages. +impl Drop for Receiver { + fn drop(&mut self) { + self.rx.close(); + } +} + impl MessageBody for Receiver where E: Into> + 'static, { type Error = E; + /// Returns the body size of the Receiver as a BodySize object. + /// Since the Receiver is a stream, the method returns BodySize::Stream + /// # Input + /// None + /// # Output + /// A BodySize object representing the size of the Receiver #[inline] fn size(&self) -> BodySize { BodySize::Stream @@ -105,6 +139,13 @@ where /// Attempts to pull out the next value of the underlying [`UnboundedReceiver`]. /// If the underlying [`UnboundedReceiver`] is not ready, the current task is scheduled to /// receive a notification when it is ready to make progress. + /// # Input + /// A Pin object representing the Receiver + /// A Context object representing the current task + /// + /// # Output + /// A Poll object representing the result of the operation + /// If the poll is ready, it returns the next value of the underlying UnboundedReceiver. If the poll is not ready, the task is scheduled to receive a notification when it is ready to make progress. fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -127,25 +168,19 @@ mod tests { #[actix_rt::test] async fn test_body_channel() { let (mut tx, rx) = channel::(); - let mut tx_cloned = tx.clone(); let rx = rx.boxed(); pin!(rx); assert_eq!(rx.size(), BodySize::Stream); tx.send(Bytes::from_static(b"test")).unwrap(); - tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); + tx.close(None).unwrap(); assert_eq!( poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from_static(b"test")) ); - - assert_eq!( - poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), - Some(Bytes::from_static(b"test2")) - ); } #[actix_rt::test] @@ -173,4 +208,102 @@ mod tests { assert!(err.is_some()); assert_eq!(err.unwrap().to_string(), "test"); } + + #[actix_rt::test] + async fn test_dropped_sender() { + let (tx, rx) = channel::(); + let rx = rx.boxed(); + pin!(rx); + + drop(tx); + + let err = poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().err(); + assert!(err.is_some()); + assert_eq!(err.unwrap().to_string(), "channel closed"); + } + + #[actix_rt::test] + async fn test_dropped_receiver() { + let (mut tx, rx) = channel::(); + let rx = rx.boxed(); + pin!(rx); + + drop(rx); + + let err = tx.send(Bytes::from_static(b"test")).unwrap_err(); + assert_eq!(err, Bytes::from_static(b"test")); + } + + #[actix_rt::test] + async fn test_multiple_senders() { + let (mut tx, rx) = channel::(); + let mut tx_cloned = tx.clone(); + let rx = rx.boxed(); + pin!(rx); + + assert_eq!(rx.size(), BodySize::Stream); + + tx.send(Bytes::from_static(b"test")).unwrap(); + tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); + tx.close(None).unwrap(); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test")) + ); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test2")) + ); + } + + #[actix_rt::test] + async fn test_backpressure() { + let (mut tx, rx) = channel::(); + let mut tx_cloned = tx.clone(); + let rx = rx.boxed(); + pin!(rx); + + assert_eq!(rx.size(), BodySize::Stream); + + tx.send(Bytes::from_static(b"test")).unwrap(); + tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test")) + ); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test2")) + ); + } + + #[actix_rt::test] + async fn test_error_propagation() { + let (mut tx, rx) = channel::(); + let mut tx_cloned = tx.clone(); + let rx = rx.boxed(); + pin!(rx); + + assert_eq!(rx.size(), BodySize::Stream); + + tx.send(Bytes::from_static(b"test")).unwrap(); + tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); + + let err = io::Error::new(io::ErrorKind::Other, "test"); + + tx.close(Some(err)).unwrap(); + + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test")) + ); + + let err = poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().err(); + assert!(err.is_some()); + assert_eq!(err.unwrap().to_string(), "test"); + } } From ba58221310368236d563ac1d5f25c840f2671bcd Mon Sep 17 00:00:00 2001 From: yinho999 Date: Wed, 19 Apr 2023 22:08:00 +0100 Subject: [PATCH 3/6] Complete Unit Testing --- actix-http/src/body/channel.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/actix-http/src/body/channel.rs b/actix-http/src/body/channel.rs index e5cf605d6..5113316bd 100644 --- a/actix-http/src/body/channel.rs +++ b/actix-http/src/body/channel.rs @@ -16,7 +16,7 @@ use super::{BodySize, MessageBody}; /// ``` /// use actix_web::{HttpResponse, web}; /// use std::convert::Infallible; -/// use actix_web::body::channel; +/// use actix_http::body::channel; /// /// #[actix_rt::main] /// async fn main() { @@ -186,14 +186,12 @@ mod tests { #[actix_rt::test] async fn test_body_channel_error() { let (mut tx, rx) = channel::(); - let mut tx_cloned = tx.clone(); let rx = rx.boxed(); pin!(rx); assert_eq!(rx.size(), BodySize::Stream); tx.send(Bytes::from_static(b"test")).unwrap(); - tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); let err = io::Error::new(io::ErrorKind::Other, "test"); @@ -214,20 +212,15 @@ mod tests { let (tx, rx) = channel::(); let rx = rx.boxed(); pin!(rx); - drop(tx); - - let err = poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().err(); - assert!(err.is_some()); - assert_eq!(err.unwrap().to_string(), "channel closed"); + let received = poll_fn(|cx| rx.as_mut().poll_next(cx)).await; + assert!(received.is_none()); } #[actix_rt::test] async fn test_dropped_receiver() { let (mut tx, rx) = channel::(); let rx = rx.boxed(); - pin!(rx); - drop(rx); let err = tx.send(Bytes::from_static(b"test")).unwrap_err(); From 67cc0ee6b477e80276d53a756002a5dda0845c00 Mon Sep 17 00:00:00 2001 From: yinho999 Date: Fri, 21 Apr 2023 22:16:26 +0100 Subject: [PATCH 4/6] Proofread and restructure the documentation. --- actix-http/src/body/channel.rs | 73 +++++++++++----------------------- 1 file changed, 24 insertions(+), 49 deletions(-) diff --git a/actix-http/src/body/channel.rs b/actix-http/src/body/channel.rs index 5113316bd..5ecfecda4 100644 --- a/actix-http/src/body/channel.rs +++ b/actix-http/src/body/channel.rs @@ -3,7 +3,7 @@ use std::{error::Error as StdError, task::Poll}; use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}; use super::{BodySize, MessageBody}; -/// Creates an unbounded mpsc (multi-producer, single-consumer) channel wrapper for communicating between asynchronous tasks. +/// Creates an unbounded mpsc (multi-producer, single-consumer) channel for communicating between asynchronous tasks. /// /// This function returns a `Sender` half and a `Receiver` half that can be used as a body type, allowing for efficient streaming of data between tasks. The `Sender` can be cloned to allow sending to the same channel from multiple code locations, making it suitable for multi-producer scenarios. Only one `Receiver` is supported, adhering to the single-consumer principle. /// @@ -14,49 +14,40 @@ use super::{BodySize, MessageBody}; /// This unbounded channel implementation is useful for streaming response bodies in web applications or other scenarios where it's essential to maintain a steady flow of data between asynchronous tasks. However, be cautious when using it in situations where the rate of incoming data may overwhelm the receiver, as it may lead to memory issues. /// # Examples /// ``` -/// use actix_web::{HttpResponse, web}; -/// use std::convert::Infallible; -/// use actix_http::body::channel; +/// # use actix_web::{HttpResponse, web}; +/// # use std::convert::Infallible; +/// # use actix_http::body::channel; /// /// #[actix_rt::main] /// async fn main() { -/// let (mut body_tx, body) = channel::(); +/// let (mut body_tx, body) = channel::(); /// -/// let _ = web::block(move || { -/// body_tx.send(web::Bytes::from_static(b"body from another thread")).unwrap(); -/// }); +/// let _ = web::block(move || { +/// body_tx.send(web::Bytes::from_static(b"body from another thread")).unwrap(); +/// }); /// -/// -/// HttpResponse::Ok().body(body); -/// } +/// HttpResponse::Ok().body(body); +/// } /// ``` pub fn channel>>() -> (Sender, Receiver) { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); (Sender::new(tx), Receiver::new(rx)) } -/// Channel Sender wrapper -/// -/// Senders can be cloned to create multiple senders that will send to the same underlying channel. +/// Channel Sender\ +/// Senders can be cloned to create multiple senders that will send to the same underlying channel.\ /// Senders should be mutable, as they can be used to close the channel. #[derive(Debug)] pub struct Sender { tx: UnboundedSender>, } impl Sender { - /// Constructs a new instance of the Sender struct with the specified UnboundedSender. - /// # Input - /// An UnboundedSender object representing the underlying channel - /// # Outputs - /// A Sender wrapper object + /// Constructs a new instance of the Sender struct with the specified UnboundedSender.\ + /// UnboundedSender object representing the sender for underlying channel pub fn new(tx: UnboundedSender>) -> Self { Self { tx } } /// Submits a chunk of bytes to the response body stream. - /// # Input - /// A Bytes object representing the chunk of bytes to send - /// # Outputs - /// A Result<(), Bytes> object. If the sending is successful, the method returns Ok(()). If the other side of the channel body was dropped, the method returns an Err(chunk). pub fn send(&mut self, chunk: Bytes) -> Result<(), Bytes> { self.tx.send(Ok(chunk)).map_err(|SendError(err)| match err { Ok(chunk) => chunk, @@ -65,10 +56,6 @@ impl Sender { } /// Closes the stream, optionally sending an error. - /// # Input - /// An optional error to send - /// # Outputs - /// A Result<(), E> object. If the closing is successful, the method returns Ok(()). If the other side of the channel body was dropped, the method returns an Err(error). pub fn close(self, err: Option) -> Result<(), E> { if let Some(err) = err { return self.tx.send(Err(err)).map_err(|SendError(err)| match err { @@ -80,9 +67,9 @@ impl Sender { } } -/// Clones the underlying [`UnboundedSender`]. +/// Clones the underlying [`UnboundedSender`].\ /// This creates a new handle to the same channel, allowing a message to be sent on multiple -/// handles. +/// handles.\ /// /// The returned [`Sender`] is a [`Clone`] of the original [`Sender`]. impl Clone for Sender { @@ -93,7 +80,11 @@ impl Clone for Sender { } } -/// Channel Receiver wrapper +/// Channel Receiver\ +/// Receivers are used to receive data from the underlying channel.\ +/// Receivers should not be mutable, as they cannot be used to close the channel.\ +/// Receivers can be used as a MessageBody, allowing for efficient streaming of data between tasks.\ +/// Since the Receiver is a unbound stream, it does not provide backpressure support. #[derive(Debug)] pub struct Receiver { rx: UnboundedReceiver>, @@ -101,17 +92,12 @@ pub struct Receiver { impl Receiver { /// Constructs a new instance of the Receiver struct with the specified UnboundedReceiver - /// # Input - /// An UnboundedReceiver object representing the underlying channel - /// - /// # Outputs - /// A Receiver wrapper object pub fn new(rx: UnboundedReceiver>) -> Self { Self { rx } } } -/// Drop the underlying [`UnboundedReceiver`]. +/// Drop the underlying [`UnboundedReceiver`].\ /// This will cause the [`Receiver`] to stop receiving messages. impl Drop for Receiver { fn drop(&mut self) { @@ -125,27 +111,16 @@ where { type Error = E; - /// Returns the body size of the Receiver as a BodySize object. + /// Returns the body size of the Receiver as a BodySize object.\ /// Since the Receiver is a stream, the method returns BodySize::Stream - /// # Input - /// None - /// # Output - /// A BodySize object representing the size of the Receiver #[inline] fn size(&self) -> BodySize { BodySize::Stream } - /// Attempts to pull out the next value of the underlying [`UnboundedReceiver`]. + /// Attempts to pull out the next value of the underlying [`UnboundedReceiver`].\ /// If the underlying [`UnboundedReceiver`] is not ready, the current task is scheduled to /// receive a notification when it is ready to make progress. - /// # Input - /// A Pin object representing the Receiver - /// A Context object representing the current task - /// - /// # Output - /// A Poll object representing the result of the operation - /// If the poll is ready, it returns the next value of the underlying UnboundedReceiver. If the poll is not ready, the task is scheduled to receive a notification when it is ready to make progress. fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, From 1451ad8614111093307739063e596da55dcb415d Mon Sep 17 00:00:00 2001 From: yinho999 Date: Fri, 21 Apr 2023 22:25:13 +0100 Subject: [PATCH 5/6] Proofread and restructure the documentation. --- actix-http/CHANGES.md | 1 + actix-http/src/body/channel.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index aaf84d765..dafad3537 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -6,6 +6,7 @@ - Add `body::to_body_limit()` function. - Add `body::BodyLimitExceeded` error type. +- Add `body::channel`, `body::channel::Sender`, and `body::channel::Receiver` types. ## 3.3.1 - 2023-03-02 diff --git a/actix-http/src/body/channel.rs b/actix-http/src/body/channel.rs index 5ecfecda4..42e1bf9d8 100644 --- a/actix-http/src/body/channel.rs +++ b/actix-http/src/body/channel.rs @@ -3,6 +3,7 @@ use std::{error::Error as StdError, task::Poll}; use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}; use super::{BodySize, MessageBody}; + /// Creates an unbounded mpsc (multi-producer, single-consumer) channel for communicating between asynchronous tasks. /// /// This function returns a `Sender` half and a `Receiver` half that can be used as a body type, allowing for efficient streaming of data between tasks. The `Sender` can be cloned to allow sending to the same channel from multiple code locations, making it suitable for multi-producer scenarios. Only one `Receiver` is supported, adhering to the single-consumer principle. @@ -41,6 +42,7 @@ pub fn channel>>() -> (Sender, Receiver) { pub struct Sender { tx: UnboundedSender>, } + impl Sender { /// Constructs a new instance of the Sender struct with the specified UnboundedSender.\ /// UnboundedSender object representing the sender for underlying channel @@ -131,7 +133,6 @@ where #[cfg(test)] mod tests { - use actix_rt::pin; use std::io; @@ -261,7 +262,7 @@ mod tests { tx.send(Bytes::from_static(b"test")).unwrap(); tx_cloned.send(Bytes::from_static(b"test2")).unwrap(); - let err = io::Error::new(io::ErrorKind::Other, "test"); + let err = io::Error::new(io::ErrorKind::Other, "error"); tx.close(Some(err)).unwrap(); @@ -269,9 +270,12 @@ mod tests { poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from_static(b"test")) ); - + assert_eq!( + poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from_static(b"test2")) + ); let err = poll_fn(|cx| rx.as_mut().poll_next(cx)).await.unwrap().err(); assert!(err.is_some()); - assert_eq!(err.unwrap().to_string(), "test"); + assert_eq!(err.unwrap().to_string(), "error"); } } From fd154f5234bda6bd8a68db6629f4160ff64a93bf Mon Sep 17 00:00:00 2001 From: yinho999 Date: Fri, 21 Apr 2023 22:28:28 +0100 Subject: [PATCH 6/6] Update changelogs --- actix-http/CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index dafad3537..756b34a0e 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -6,7 +6,7 @@ - Add `body::to_body_limit()` function. - Add `body::BodyLimitExceeded` error type. -- Add `body::channel`, `body::channel::Sender`, and `body::channel::Receiver` types. +- Add `body::channel` function, `body::channel::Sender`, and `body::channel::Receiver` types. ## 3.3.1 - 2023-03-02