Proofread and restructure the documentation.

This commit is contained in:
yinho999 2023-04-21 22:16:26 +01:00
parent ba58221310
commit 67cc0ee6b4
1 changed files with 24 additions and 49 deletions

View File

@ -3,7 +3,7 @@ use std::{error::Error as StdError, task::Poll};
use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender};
use super::{BodySize, MessageBody};
/// Creates an unbounded mpsc (multi-producer, single-consumer) channel wrapper for communicating between asynchronous tasks.
/// Creates an unbounded mpsc (multi-producer, single-consumer) channel for communicating between asynchronous tasks.
///
/// This function returns a `Sender` half and a `Receiver` half that can be used as a body type, allowing for efficient streaming of data between tasks. The `Sender` can be cloned to allow sending to the same channel from multiple code locations, making it suitable for multi-producer scenarios. Only one `Receiver` is supported, adhering to the single-consumer principle.
///
@ -14,9 +14,9 @@ use super::{BodySize, MessageBody};
/// This unbounded channel implementation is useful for streaming response bodies in web applications or other scenarios where it's essential to maintain a steady flow of data between asynchronous tasks. However, be cautious when using it in situations where the rate of incoming data may overwhelm the receiver, as it may lead to memory issues.
/// # Examples
/// ```
/// use actix_web::{HttpResponse, web};
/// use std::convert::Infallible;
/// use actix_http::body::channel;
/// # use actix_web::{HttpResponse, web};
/// # use std::convert::Infallible;
/// # use actix_http::body::channel;
///
/// #[actix_rt::main]
/// async fn main() {
@ -26,7 +26,6 @@ use super::{BodySize, MessageBody};
/// body_tx.send(web::Bytes::from_static(b"body from another thread")).unwrap();
/// });
///
///
/// HttpResponse::Ok().body(body);
/// }
/// ```
@ -35,28 +34,20 @@ pub fn channel<T: Into<Box<dyn StdError>>>() -> (Sender<T>, Receiver<T>) {
(Sender::new(tx), Receiver::new(rx))
}
/// Channel Sender wrapper
///
/// Senders can be cloned to create multiple senders that will send to the same underlying channel.
/// Channel Sender\
/// Senders can be cloned to create multiple senders that will send to the same underlying channel.\
/// Senders should be mutable, as they can be used to close the channel.
#[derive(Debug)]
pub struct Sender<E> {
tx: UnboundedSender<Result<Bytes, E>>,
}
impl<E> Sender<E> {
/// Constructs a new instance of the Sender struct with the specified UnboundedSender.
/// # Input
/// An UnboundedSender object representing the underlying channel
/// # Outputs
/// A Sender wrapper object
/// Constructs a new instance of the Sender struct with the specified UnboundedSender.\
/// UnboundedSender object representing the sender for underlying channel
pub fn new(tx: UnboundedSender<Result<Bytes, E>>) -> Self {
Self { tx }
}
/// Submits a chunk of bytes to the response body stream.
/// # Input
/// A Bytes object representing the chunk of bytes to send
/// # Outputs
/// A Result<(), Bytes> object. If the sending is successful, the method returns Ok(()). If the other side of the channel body was dropped, the method returns an Err(chunk).
pub fn send(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.tx.send(Ok(chunk)).map_err(|SendError(err)| match err {
Ok(chunk) => chunk,
@ -65,10 +56,6 @@ impl<E> Sender<E> {
}
/// Closes the stream, optionally sending an error.
/// # Input
/// An optional error to send
/// # Outputs
/// A Result<(), E> object. If the closing is successful, the method returns Ok(()). If the other side of the channel body was dropped, the method returns an Err(error).
pub fn close(self, err: Option<E>) -> Result<(), E> {
if let Some(err) = err {
return self.tx.send(Err(err)).map_err(|SendError(err)| match err {
@ -80,9 +67,9 @@ impl<E> Sender<E> {
}
}
/// Clones the underlying [`UnboundedSender`].
/// Clones the underlying [`UnboundedSender`].\
/// This creates a new handle to the same channel, allowing a message to be sent on multiple
/// handles.
/// handles.\
///
/// The returned [`Sender`] is a [`Clone`] of the original [`Sender`].
impl<E> Clone for Sender<E> {
@ -93,7 +80,11 @@ impl<E> Clone for Sender<E> {
}
}
/// Channel Receiver wrapper
/// Channel Receiver\
/// Receivers are used to receive data from the underlying channel.\
/// Receivers should not be mutable, as they cannot be used to close the channel.\
/// Receivers can be used as a MessageBody, allowing for efficient streaming of data between tasks.\
/// Since the Receiver is a unbound stream, it does not provide backpressure support.
#[derive(Debug)]
pub struct Receiver<E> {
rx: UnboundedReceiver<Result<Bytes, E>>,
@ -101,17 +92,12 @@ pub struct Receiver<E> {
impl<E> Receiver<E> {
/// Constructs a new instance of the Receiver struct with the specified UnboundedReceiver
/// # Input
/// An UnboundedReceiver object representing the underlying channel
///
/// # Outputs
/// A Receiver wrapper object
pub fn new(rx: UnboundedReceiver<Result<Bytes, E>>) -> Self {
Self { rx }
}
}
/// Drop the underlying [`UnboundedReceiver`].
/// Drop the underlying [`UnboundedReceiver`].\
/// This will cause the [`Receiver`] to stop receiving messages.
impl<E> Drop for Receiver<E> {
fn drop(&mut self) {
@ -125,27 +111,16 @@ where
{
type Error = E;
/// Returns the body size of the Receiver as a BodySize object.
/// Returns the body size of the Receiver as a BodySize object.\
/// Since the Receiver is a stream, the method returns BodySize::Stream
/// # Input
/// None
/// # Output
/// A BodySize object representing the size of the Receiver
#[inline]
fn size(&self) -> BodySize {
BodySize::Stream
}
/// Attempts to pull out the next value of the underlying [`UnboundedReceiver`].
/// Attempts to pull out the next value of the underlying [`UnboundedReceiver`].\
/// If the underlying [`UnboundedReceiver`] is not ready, the current task is scheduled to
/// receive a notification when it is ready to make progress.
/// # Input
/// A Pin object representing the Receiver
/// A Context object representing the current task
///
/// # Output
/// A Poll object representing the result of the operation
/// If the poll is ready, it returns the next value of the underlying UnboundedReceiver. If the poll is not ready, the task is scheduled to receive a notification when it is ready to make progress.
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,