Expose the max limit for payload sizes in Websocket Actors.

This commit is contained in:
opq 2019-06-21 12:26:30 -10:00
parent ad0e6f73b3
commit a21f64087d
1 changed files with 82 additions and 48 deletions

View File

@ -27,7 +27,7 @@ use futures::{Async, Future, Poll, Stream};
/// Do websocket handshake and start ws actor. /// Do websocket handshake and start ws actor.
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error> pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error>
where where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>, A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
T: Stream<Item = Bytes, Error = PayloadError> + 'static, T: Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
@ -35,6 +35,16 @@ where
Ok(res.streaming(WebsocketContext::create(actor, stream))) Ok(res.streaming(WebsocketContext::create(actor, stream)))
} }
/// Do websocket handshake and start ws actor with provided limit.
pub fn start_with_limit<A, T>(actor: A, req: &HttpRequest, stream: T, limit: usize) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
let mut res = handshake(req)?;
Ok(res.streaming(WebsocketContext::create_with_limit(actor, stream, limit)))
}
/// Prepare `WebSocket` handshake response. /// Prepare `WebSocket` handshake response.
/// ///
/// This function returns handshake `HttpResponse`, ready to send to peer. /// This function returns handshake `HttpResponse`, ready to send to peer.
@ -101,7 +111,7 @@ pub fn handshake(req: &HttpRequest) -> Result<HttpResponseBuilder, HandshakeErro
/// Execution context for `WebSockets` actors /// Execution context for `WebSockets` actors
pub struct WebsocketContext<A> pub struct WebsocketContext<A>
where where
A: Actor<Context = WebsocketContext<A>>, A: Actor<Context = WebsocketContext<A>>,
{ {
inner: ContextParts<A>, inner: ContextParts<A>,
@ -109,7 +119,7 @@ where
} }
impl<A> ActorContext for WebsocketContext<A> impl<A> ActorContext for WebsocketContext<A>
where where
A: Actor<Context = Self>, A: Actor<Context = Self>,
{ {
fn stop(&mut self) { fn stop(&mut self) {
@ -126,7 +136,7 @@ where
} }
impl<A> AsyncContext<A> for WebsocketContext<A> impl<A> AsyncContext<A> for WebsocketContext<A>
where where
A: Actor<Context = Self>, A: Actor<Context = Self>,
{ {
fn spawn<F>(&mut self, fut: F) -> SpawnHandle fn spawn<F>(&mut self, fut: F) -> SpawnHandle
@ -162,7 +172,7 @@ where
} }
impl<A> WebsocketContext<A> impl<A> WebsocketContext<A>
where where
A: Actor<Context = Self>, A: Actor<Context = Self>,
{ {
#[inline] #[inline]
@ -171,15 +181,25 @@ where
where where
A: StreamHandler<Message, ProtocolError>, A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
WebsocketContext::create_with_limit(actor, stream, 65_536)
}
#[inline]
/// Create a new Websocket context from a request, an actor, and a limit
pub fn create_with_limit<S>(actor: A, stream: S, limit: usize) -> impl Stream<Item = Bytes, Error = Error>
where
A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
let mb = Mailbox::default(); let mb = Mailbox::default();
let mut ctx = WebsocketContext { let mut ctx = WebsocketContext {
inner: ContextParts::new(mb.sender_producer()), inner: ContextParts::new(mb.sender_producer()),
messages: VecDeque::new(), messages: VecDeque::new(),
}; };
ctx.add_stream(WsStream::new(stream)); ctx.add_stream(WsStream::new(stream, limit));
WebsocketContextFut::new(ctx, actor, mb) WebsocketContextFut::new(ctx, actor, mb, limit)
} }
/// Create a new Websocket context /// Create a new Websocket context
@ -191,22 +211,36 @@ where
F: FnOnce(&mut Self) -> A + 'static, F: FnOnce(&mut Self) -> A + 'static,
A: StreamHandler<Message, ProtocolError>, A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
WebsocketContext::with_factory_codec(stream, f, 65_536)
}
/// Create a new Websocket context with a limit
pub fn with_factory_codec<S, F>(
stream: S,
f: F,
limit: usize,
) -> impl Stream<Item = Bytes, Error = Error>
where
F: FnOnce(&mut Self) -> A + 'static,
A: StreamHandler<Message, ProtocolError>,
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
let mb = Mailbox::default(); let mb = Mailbox::default();
let mut ctx = WebsocketContext { let mut ctx = WebsocketContext {
inner: ContextParts::new(mb.sender_producer()), inner: ContextParts::new(mb.sender_producer()),
messages: VecDeque::new(), messages: VecDeque::new(),
}; };
ctx.add_stream(WsStream::new(stream)); ctx.add_stream(WsStream::new(stream, limit));
let act = f(&mut ctx); let act = f(&mut ctx);
WebsocketContextFut::new(ctx, act, mb) WebsocketContextFut::new(ctx, act, mb, limit)
} }
} }
impl<A> WebsocketContext<A> impl<A> WebsocketContext<A>
where where
A: Actor<Context = Self>, A: Actor<Context = Self>,
{ {
/// Write payload /// Write payload
@ -266,7 +300,7 @@ where
} }
impl<A> AsyncContextParts<A> for WebsocketContext<A> impl<A> AsyncContextParts<A> for WebsocketContext<A>
where where
A: Actor<Context = Self>, A: Actor<Context = Self>,
{ {
fn parts(&mut self) -> &mut ContextParts<A> { fn parts(&mut self) -> &mut ContextParts<A> {
@ -275,7 +309,7 @@ where
} }
struct WebsocketContextFut<A> struct WebsocketContextFut<A>
where where
A: Actor<Context = WebsocketContext<A>>, A: Actor<Context = WebsocketContext<A>>,
{ {
fut: ContextFut<A, WebsocketContext<A>>, fut: ContextFut<A, WebsocketContext<A>>,
@ -285,14 +319,14 @@ where
} }
impl<A> WebsocketContextFut<A> impl<A> WebsocketContextFut<A>
where where
A: Actor<Context = WebsocketContext<A>>, A: Actor<Context = WebsocketContext<A>>,
{ {
fn new(ctx: WebsocketContext<A>, act: A, mailbox: Mailbox<A>) -> Self { fn new(ctx: WebsocketContext<A>, act: A, mailbox: Mailbox<A>, limit: usize) -> Self {
let fut = ContextFut::new(ctx, act, mailbox); let fut = ContextFut::new(ctx, act, mailbox);
WebsocketContextFut { WebsocketContextFut {
fut, fut,
encoder: Codec::new(), encoder: Codec::new().max_size(limit),
buf: BytesMut::new(), buf: BytesMut::new(),
closed: false, closed: false,
} }
@ -300,7 +334,7 @@ where
} }
impl<A> Stream for WebsocketContextFut<A> impl<A> Stream for WebsocketContextFut<A>
where where
A: Actor<Context = WebsocketContext<A>>, A: Actor<Context = WebsocketContext<A>>,
{ {
type Item = Bytes; type Item = Bytes;
@ -332,7 +366,7 @@ where
} }
impl<A, M> ToEnvelope<A, M> for WebsocketContext<A> impl<A, M> ToEnvelope<A, M> for WebsocketContext<A>
where where
A: Actor<Context = WebsocketContext<A>> + Handler<M>, A: Actor<Context = WebsocketContext<A>> + Handler<M>,
M: ActixMessage + Send + 'static, M: ActixMessage + Send + 'static,
M::Result: Send, M::Result: Send,
@ -350,13 +384,13 @@ struct WsStream<S> {
} }
impl<S> WsStream<S> impl<S> WsStream<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Bytes, Error = PayloadError>,
{ {
fn new(stream: S) -> Self { fn new(stream: S, limit: usize) -> Self {
Self { Self {
stream, stream,
decoder: Codec::new(), decoder: Codec::new().max_size(limit),
buf: BytesMut::new(), buf: BytesMut::new(),
closed: false, closed: false,
} }
@ -364,7 +398,7 @@ where
} }
impl<S> Stream for WsStream<S> impl<S> Stream for WsStream<S>
where where
S: Stream<Item = Bytes, Error = PayloadError>, S: Stream<Item = Bytes, Error = PayloadError>,
{ {
type Item = Message; type Item = Message;