diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 23c15296a..6d9001f3a 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -9,6 +9,7 @@ * `body::None` struct. [#2468] * Impl `MessageBody` for `bytestring::ByteString`. [#2468] * `impl Clone for ws::HandshakeError`. [#2468] +* `#[must_use]` for `ws::Codec` to prevent subtle bugs. [#1920] ### Changed * Rename `body::BoxBody::{from_body => new}`. [#2468] @@ -18,15 +19,18 @@ * `From` implementations on error types now return a `Response`. [#2468] * `ResponseBuilder::body(B)` now returns `Response>`. [#2468] * `ResponseBuilder::finish()` now returns `Response>`. [#2468] +* `impl Copy` for `ws::Codec`. [#1920] ### Removed * `ResponseBuilder::streaming`. [#2468] * `impl Future` for `ResponseBuilder`. [#2468] * Remove unnecessary `MessageBody` bound on types passed to `body::AnyBody::new`. [#2468] * Move `body::AnyBody` to `awc`. Replaced with `EitherBody` and `BoxBody`. [#2468] +* `impl Copy` for `ws::Codec`. [#1920] [#2483]: https://github.com/actix/actix-web/pull/2483 [#2468]: https://github.com/actix/actix-web/pull/2468 +[#1920]: https://github.com/actix/actix-web/pull/1920 ## 3.0.0-beta.14 - 2021-11-30 diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index 8655216fa..a110982f7 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -63,8 +63,8 @@ pub enum Item { Last(Bytes), } -#[derive(Debug, Copy, Clone)] /// WebSocket protocol codec. +#[derive(Debug, Clone)] pub struct Codec { flags: Flags, max_size: usize, @@ -89,7 +89,8 @@ impl Codec { /// Set max frame size. /// - /// By default max size is set to 64kB. + /// By default max size is set to 64KiB. + #[must_use = "this returns the a new Codec, without modifying the original"] pub fn max_size(mut self, size: usize) -> Self { self.max_size = size; self @@ -98,12 +99,19 @@ impl Codec { /// Set decoder to client mode. /// /// By default decoder works in server mode. + #[must_use = "this returns the a new Codec, without modifying the original"] pub fn client_mode(mut self) -> Self { self.flags.remove(Flags::SERVER); self } } +impl Default for Codec { + fn default() -> Self { + Self::new() + } +} + impl Encoder for Codec { type Error = ProtocolError; diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs index d3a40e537..c41268b01 100644 --- a/actix-web-actors/src/ws.rs +++ b/actix-web-actors/src/ws.rs @@ -1,20 +1,24 @@ //! Websocket integration. -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{collections::VecDeque, convert::TryFrom}; - -use actix::dev::{ - AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler, ToEnvelope, +use std::{ + collections::VecDeque, + convert::TryFrom, + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, }; -use actix::fut::ActorFuture; + use actix::{ + dev::{ + AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler, + ToEnvelope, + }, + fut::ActorFuture, Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message as ActixMessage, SpawnHandle, }; -use actix_codec::{Decoder, Encoder}; +use actix_codec::{Decoder as _, Encoder as _}; pub use actix_http::ws::{ CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError, }; @@ -31,25 +35,25 @@ use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use futures_core::Stream; use pin_project_lite::pin_project; -use tokio::sync::oneshot::Sender; +use tokio::sync::oneshot; -/// Builder for Websocket Session response. +/// Builder for Websocket session response. /// /// # Examples /// -/// Create a Websocket session response with default configs. +/// Create a Websocket session response with default configuration. /// ```ignore /// WsResponseBuilder::new(WsActor, &req, stream).start() /// ``` /// -/// Create a Websocket session with a specific max frame size, -/// a [`Codec`] or protocols. +/// Create a Websocket session with a specific max frame size, [`Codec`], and protocols. /// ```ignore -/// const MAX_FRAME_SIZE: usize = 10_000; // in bytes. +/// const MAX_FRAME_SIZE: usize = 16_384; // 16KiB +/// /// ws::WsResponseBuilder::new(WsActor, &req, stream) -/// .codec(Codec::new()) // optional -/// .protocols(&["A", "B"]) // optional -/// .frame_size(MAX_FRAME_SIZE) // optional +/// .codec(Codec::new()) +/// .protocols(&["A", "B"]) +/// .frame_size(MAX_FRAME_SIZE) /// .start() /// ``` pub struct WsResponseBuilder<'a, A, T> @@ -60,9 +64,9 @@ where actor: A, req: &'a HttpRequest, stream: T, + codec: Option, protocols: Option<&'a [&'a str]>, frame_size: Option, - codec: Option, } impl<'a, A, T> WsResponseBuilder<'a, A, T> @@ -78,9 +82,9 @@ where actor, req, stream, + codec: None, protocols: None, frame_size: None, - codec: None, } } @@ -90,7 +94,7 @@ where self } - /// Set the max frame size for each message. + /// Set the max frame size for each message (in bytes). /// /// **Note**: This will override any given [`Codec`]'s max frame size. pub fn frame_size(mut self, frame_size: usize) -> Self { @@ -115,19 +119,21 @@ where fn set_frame_size(&mut self) { if let Some(frame_size) = self.frame_size { match &mut self.codec { - // Modify existing Codec's size. Some(codec) => { - codec.max_size(frame_size); + // modify existing codec's max frame size + let orig_codec = mem::take(codec); + *codec = orig_codec.max_size(frame_size); } - // Otherwise, create a new codec with the given size. + None => { + // create a new codec with the given size self.codec = Some(Codec::new().max_size(frame_size)); } } } } - /// Create a new Websocket context from a request, an actor, and a codec. + /// Create a new Websocket context from an actor, request stream, and codec. /// /// Returns a pair, where the first item is an addr for the created actor, and the second item /// is a stream intended to be set as part of the response @@ -146,7 +152,7 @@ where inner: ContextParts::new(mb.sender_producer()), messages: VecDeque::new(), }; - ctx.add_stream(WsStream::new(stream, codec)); + ctx.add_stream(WsStream::new(stream, codec.clone())); let addr = ctx.address(); @@ -478,7 +484,7 @@ where inner: ContextParts::new(mb.sender_producer()), messages: VecDeque::new(), }; - ctx.add_stream(WsStream::new(stream, codec)); + ctx.add_stream(WsStream::new(stream, codec.clone())); WebsocketContextFut::new(ctx, actor, mb, codec) } @@ -636,12 +642,13 @@ where M: ActixMessage + Send + 'static, M::Result: Send, { - fn pack(msg: M, tx: Option>) -> Envelope { + fn pack(msg: M, tx: Option>) -> Envelope { Envelope::new(msg, tx) } } pin_project! { + #[derive(Debug)] struct WsStream { #[pin] stream: S, diff --git a/actix-web-actors/tests/test_ws.rs b/actix-web-actors/tests/test_ws.rs index b66935623..23002efed 100644 --- a/actix-web-actors/tests/test_ws.rs +++ b/actix-web-actors/tests/test_ws.rs @@ -1,7 +1,7 @@ use actix::prelude::*; use actix_http::ws::Codec; use actix_web::{web, App, HttpRequest}; -use actix_web_actors::*; +use actix_web_actors::ws; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; @@ -13,12 +13,12 @@ impl Actor for Ws { impl StreamHandler> for Ws { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - match msg.unwrap() { - ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Text(text) => ctx.text(text), - ws::Message::Binary(bin) => ctx.binary(bin), - ws::Message::Close(reason) => ctx.close(reason), - _ => {} + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => ctx.text(text), + Ok(ws::Message::Binary(bin)) => ctx.binary(bin), + Ok(ws::Message::Close(reason)) => ctx.close(reason), + _ => ctx.close(Some(ws::CloseCode::Error.into())), } } } @@ -29,8 +29,8 @@ const DEFAULT_FRAME_SIZE: usize = 10; async fn common_test_code(mut srv: actix_test::TestServer, frame_size: usize) { // client service let mut framed = srv.ws().await.unwrap(); - framed.send(ws::Message::Text("text".into())).await.unwrap(); + framed.send(ws::Message::Text("text".into())).await.unwrap(); let item = framed.next().await.unwrap().unwrap(); assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text"))); @@ -50,7 +50,6 @@ async fn common_test_code(mut srv: actix_test::TestServer, frame_size: usize) { .send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) .await .unwrap(); - let item = framed.next().await.unwrap().unwrap(); assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into()))); } @@ -85,6 +84,8 @@ async fn test_builder_with_frame_size() { #[actix_rt::test] async fn test_builder_with_frame_size_exceeded() { + const MAX_FRAME_SIZE: usize = 64; + let mut srv = actix_test::start(|| { App::new().service(web::resource("/").to( |req: HttpRequest, stream: web::Payload| async move { @@ -102,7 +103,12 @@ async fn test_builder_with_frame_size_exceeded() { let bytes = Bytes::from(vec![0; MAX_FRAME_SIZE + 1]); framed.send(ws::Message::Binary(bytes)).await.unwrap(); - assert!(framed.next().await.is_none()); + let frame = framed.next().await.unwrap().unwrap(); + let close_reason = match frame { + ws::Frame::Close(Some(reason)) => reason, + _ => panic!("close frame expected"), + }; + assert_eq!(close_reason.code, ws::CloseCode::Error); } #[actix_rt::test]