From a21f64087d80c482b1c166d97dafd593c129d4f2 Mon Sep 17 00:00:00 2001 From: opq Date: Fri, 21 Jun 2019 12:26:30 -1000 Subject: [PATCH] Expose the max limit for payload sizes in Websocket Actors. --- actix-web-actors/src/ws.rs | 130 +++++++++++++++++++++++-------------- 1 file changed, 82 insertions(+), 48 deletions(-) diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs index 0ef3c9169..e9c9b0436 100644 --- a/actix-web-actors/src/ws.rs +++ b/actix-web-actors/src/ws.rs @@ -27,14 +27,24 @@ use futures::{Async, Future, Poll, Stream}; /// Do websocket handshake and start ws actor. pub fn start(actor: A, req: &HttpRequest, stream: T) -> Result -where - A: Actor> + StreamHandler, - T: Stream + 'static, + where + A: Actor> + StreamHandler, + T: Stream + 'static, { let mut res = handshake(req)?; Ok(res.streaming(WebsocketContext::create(actor, stream))) } +/// Do websocket handshake and start ws actor with provided limit. +pub fn start_with_limit(actor: A, req: &HttpRequest, stream: T, limit: usize) -> Result + where + A: Actor> + StreamHandler, + T: Stream + 'static, +{ + let mut res = handshake(req)?; + Ok(res.streaming(WebsocketContext::create_with_limit(actor, stream, limit))) +} + /// Prepare `WebSocket` handshake response. /// /// This function returns handshake `HttpResponse`, ready to send to peer. @@ -101,16 +111,16 @@ pub fn handshake(req: &HttpRequest) -> Result -where - A: Actor>, + where + A: Actor>, { inner: ContextParts, messages: VecDeque>, } impl ActorContext for WebsocketContext -where - A: Actor, + where + A: Actor, { fn stop(&mut self) { self.inner.stop(); @@ -126,19 +136,19 @@ where } impl AsyncContext for WebsocketContext -where - A: Actor, + where + A: Actor, { fn spawn(&mut self, fut: F) -> SpawnHandle - where - F: ActorFuture + 'static, + where + F: ActorFuture + 'static, { self.inner.spawn(fut) } fn wait(&mut self, fut: F) - where - F: ActorFuture + 'static, + where + F: ActorFuture + 'static, { self.inner.wait(fut) } @@ -162,24 +172,34 @@ where } impl WebsocketContext -where - A: Actor, + where + A: Actor, { #[inline] /// Create a new Websocket context from a request and an actor pub fn create(actor: A, stream: S) -> impl Stream - where - A: StreamHandler, - S: Stream + 'static, + where + A: StreamHandler, + S: Stream + 'static, + { + WebsocketContext::create_with_limit(actor, stream, 65_536) + } + + #[inline] + /// Create a new Websocket context from a request, an actor, and a limit + pub fn create_with_limit(actor: A, stream: S, limit: usize) -> impl Stream + where + A: StreamHandler, + S: Stream + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { inner: ContextParts::new(mb.sender_producer()), messages: VecDeque::new(), }; - ctx.add_stream(WsStream::new(stream)); + ctx.add_stream(WsStream::new(stream, limit)); - WebsocketContextFut::new(ctx, actor, mb) + WebsocketContextFut::new(ctx, actor, mb, limit) } /// Create a new Websocket context @@ -187,27 +207,41 @@ where stream: S, f: F, ) -> impl Stream - where - F: FnOnce(&mut Self) -> A + 'static, - A: StreamHandler, - S: Stream + 'static, + where + F: FnOnce(&mut Self) -> A + 'static, + A: StreamHandler, + S: Stream + 'static, + { + WebsocketContext::with_factory_codec(stream, f, 65_536) + } + + /// Create a new Websocket context with a limit + pub fn with_factory_codec( + stream: S, + f: F, + limit: usize, + ) -> impl Stream + where + F: FnOnce(&mut Self) -> A + 'static, + A: StreamHandler, + S: Stream + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { inner: ContextParts::new(mb.sender_producer()), messages: VecDeque::new(), }; - ctx.add_stream(WsStream::new(stream)); + ctx.add_stream(WsStream::new(stream, limit)); let act = f(&mut ctx); - WebsocketContextFut::new(ctx, act, mb) + WebsocketContextFut::new(ctx, act, mb, limit) } } impl WebsocketContext -where - A: Actor, + where + A: Actor, { /// Write payload /// @@ -266,8 +300,8 @@ where } impl AsyncContextParts for WebsocketContext -where - A: Actor, + where + A: Actor, { fn parts(&mut self) -> &mut ContextParts { &mut self.inner @@ -275,8 +309,8 @@ where } struct WebsocketContextFut -where - A: Actor>, + where + A: Actor>, { fut: ContextFut>, encoder: Codec, @@ -285,14 +319,14 @@ where } impl WebsocketContextFut -where - A: Actor>, + where + A: Actor>, { - fn new(ctx: WebsocketContext, act: A, mailbox: Mailbox) -> Self { + fn new(ctx: WebsocketContext, act: A, mailbox: Mailbox, limit: usize) -> Self { let fut = ContextFut::new(ctx, act, mailbox); WebsocketContextFut { fut, - encoder: Codec::new(), + encoder: Codec::new().max_size(limit), buf: BytesMut::new(), closed: false, } @@ -300,8 +334,8 @@ where } impl Stream for WebsocketContextFut -where - A: Actor>, + where + A: Actor>, { type Item = Bytes; type Error = Error; @@ -332,10 +366,10 @@ where } impl ToEnvelope for WebsocketContext -where - A: Actor> + Handler, - M: ActixMessage + Send + 'static, - M::Result: Send, + where + A: Actor> + Handler, + M: ActixMessage + Send + 'static, + M::Result: Send, { fn pack(msg: M, tx: Option>) -> Envelope { Envelope::new(msg, tx) @@ -350,13 +384,13 @@ struct WsStream { } impl WsStream -where - S: Stream, + where + S: Stream, { - fn new(stream: S) -> Self { + fn new(stream: S, limit: usize) -> Self { Self { stream, - decoder: Codec::new(), + decoder: Codec::new().max_size(limit), buf: BytesMut::new(), closed: false, } @@ -364,8 +398,8 @@ where } impl Stream for WsStream -where - S: Stream, + where + S: Stream, { type Item = Message; type Error = ProtocolError;