diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs index 10113665b..acae88fa3 100644 --- a/actix-web-actors/src/ws.rs +++ b/actix-web-actors/src/ws.rs @@ -29,6 +29,143 @@ use bytestring::ByteString; use futures_core::Stream; use tokio::sync::oneshot::Sender; +/// Builder for Websocket Session response. +pub struct WsResponseBuilder<'a, A, T> +where + A: Actor> + + StreamHandler>, + T: Stream> + 'static, +{ + actor: A, + req: &'a HttpRequest, + stream: T, + protocols: Option<&'a [&'a str]>, + frame_size: Option, + codec: Option, +} + +impl<'a, A, T> WsResponseBuilder<'a, A, T> +where + A: Actor> + + StreamHandler>, + T: Stream> + 'static, +{ + #[inline] + pub fn new(actor: A, req: &'a HttpRequest, stream: T) -> Self { + WsResponseBuilder { + actor, + req, + stream, + protocols: None, + frame_size: None, + codec: None, + } + } + + #[inline] + pub fn protocols(mut self, protocols: &'a [&'a str]) -> Self { + self.protocols = Some(protocols); + self + } + + #[inline] + pub fn frame_size(mut self, frame_size: usize) -> Self { + self.frame_size = Some(frame_size); + self + } + + #[inline] + pub fn codec(mut self, codec: Codec) -> Self { + self.codec = Some(codec); + self + } + + #[inline] + fn handshake_resp(&self) -> Result { + match self.protocols { + Some(protocols) => handshake_with_protocols(self.req, protocols), + None => handshake(self.req), + } + } + + #[inline] + fn set_frame_size(&mut self) { + if let Some(frame_size) = self.frame_size { + match &mut self.codec { + // Modify existing Codec's size. + Some(codec) => { + codec.max_size(frame_size); + } + // Otherwise, create a new codec with the given size. + None => { + self.codec = Some(Codec::new().max_size(frame_size)); + } + } + } + } + + #[inline] + /// Perform WebSocket handshake and start actor. + /// + /// `req` is an [`HttpRequest`] that should be requesting a websocket + /// protocol change. `stream` should be a [`Bytes`] stream (such as + /// `actix_web::web::Payload`) that contains a stream of the body request. + /// + /// If there is a problem with the handshake, an error is returned. + /// + /// If successful, consume the [`WsResponseBuilder`] and return a + /// [`HttpResponse`] wrapped in a [`Result`]. + pub fn start(mut self) -> Result { + let mut res = self.handshake_resp()?; + self.set_frame_size(); + + match self.codec { + Some(codec) => { + let out_stream = + WebsocketContext::with_codec(self.actor, self.stream, codec); + Ok(res.streaming(out_stream)) + } + None => { + let out_stream = WebsocketContext::create(self.actor, self.stream); + Ok(res.streaming(out_stream)) + } + } + } + + #[inline] + /// Perform WebSocket handshake and start actor. + /// + /// `req` is an [`HttpRequest`] that should be requesting a websocket + /// protocol change. `stream` should be a [`Bytes`] stream (such as + /// `actix_web::web::Payload`) that contains a stream of the body request. + /// + /// If there is a problem with the handshake, an error is returned. + /// + /// If successful, returns a pair where the first item is an address for the + /// created actor and the second item is the [`HttpResponse`] that should be + /// returned from the websocket request. + pub fn start_with_addr(mut self) -> Result<(Addr, HttpResponse), Error> { + let mut res = self.handshake_resp()?; + self.set_frame_size(); + + match self.codec { + Some(codec) => { + let (addr, out_stream) = WebsocketContext::create_with_codec_addr( + self.actor, + self.stream, + codec, + ); + Ok((addr, res.streaming(out_stream))) + } + None => { + let (addr, out_stream) = + WebsocketContext::create_with_addr(self.actor, self.stream); + Ok((addr, res.streaming(out_stream))) + } + } + } +} + /// Perform WebSocket handshake and start actor. pub fn start(actor: A, req: &HttpRequest, stream: T) -> Result where @@ -279,6 +416,33 @@ where (addr, WebsocketContextFut::new(ctx, actor, mb, Codec::new())) } + #[inline] + /// Create a new Websocket context from a request, an actor, and a codec. + /// + /// Returns a pair, where the first item is an addr for the created actor, + /// and the second item is a stream intended to be set as part of the + /// response via `HttpResponseBuilder::streaming()`. + pub fn create_with_codec_addr( + actor: A, + stream: S, + codec: Codec, + ) -> (Addr, impl Stream>) + where + A: StreamHandler>, + S: Stream> + 'static, + { + let mb = Mailbox::default(); + let mut ctx = WebsocketContext { + inner: ContextParts::new(mb.sender_producer()), + messages: VecDeque::new(), + }; + ctx.add_stream(WsStream::new(stream, codec)); + + let addr = ctx.address(); + + (addr, WebsocketContextFut::new(ctx, actor, mb, codec)) + } + #[inline] /// Create a new Websocket context from a request, an actor, and a codec pub fn with_codec(