From 8886672ae6e1ae08856afd0fa2c9146c0f02169f Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Tue, 13 Nov 2018 21:13:37 -0800
Subject: [PATCH] add write buffer capacity caps for Framed

---
 src/codec/framed.rs       | 34 ++++++++++++++++++++------
 src/codec/framed_write.rs | 51 ++++++++++++++++++++-------------------
 src/codec/mod.rs          |  4 +--
 3 files changed, 55 insertions(+), 34 deletions(-)

diff --git a/src/codec/framed.rs b/src/codec/framed.rs
index 388f10c9..59203ae7 100644
--- a/src/codec/framed.rs
+++ b/src/codec/framed.rs
@@ -11,6 +11,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
 use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
 use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
 
+const LW: usize = 2 * 1024;
+const HW: usize = 8 * 1024;
+
 /// A unified `Stream` and `Sink` interface to an underlying I/O object, using
 /// the `Encoder` and `Decoder` traits to encode and decode frames.
 ///
@@ -45,7 +48,14 @@ where
     /// break them into separate objects, allowing them to interact more easily.
     pub fn new(inner: T, codec: U) -> Framed<T, U> {
         Framed {
-            inner: framed_read2(framed_write2(Fuse(inner, codec))),
+            inner: framed_read2(framed_write2(Fuse(inner, codec), LW, HW)),
+        }
+    }
+
+    /// Same as `Framed::new()` with ability to specify write buffer low/high capacity watermarks.
+    pub fn new_with_cap(inner: T, codec: U, lw: usize, hw: usize) -> Framed<T, U> {
+        Framed {
+            inner: framed_read2(framed_write2(Fuse(inner, codec), lw, hw)),
         }
     }
 }
@@ -75,7 +85,7 @@ impl<T, U> Framed<T, U> {
     pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
         Framed {
             inner: framed_read2_with_buffer(
-                framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf),
+                framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf, parts.write_buf_lw, parts.write_buf_hw),
                 parts.read_buf,
             ),
         }
@@ -123,11 +133,11 @@ impl<T, U> Framed<T, U> {
     /// Consume the `Frame`, returning `Frame` with different codec.
     pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
         let (inner, read_buf) = self.inner.into_parts();
-        let (inner, write_buf) = inner.into_parts();
+        let (inner, write_buf, lw, hw) = inner.into_parts();
 
         Framed {
             inner: framed_read2_with_buffer(
-                framed_write2_with_buffer(Fuse(inner.0, codec), write_buf),
+                framed_write2_with_buffer(Fuse(inner.0, codec), write_buf, lw, hw),
                 read_buf,
             ),
         }
@@ -141,13 +151,15 @@ impl<T, U> Framed<T, U> {
     /// being worked with.
     pub fn into_parts(self) -> FramedParts<T, U> {
         let (inner, read_buf) = self.inner.into_parts();
-        let (inner, write_buf) = inner.into_parts();
+        let (inner, write_buf, write_buf_lw, write_buf_hw) = inner.into_parts();
 
         FramedParts {
             io: inner.0,
             codec: inner.1,
-            read_buf: read_buf,
-            write_buf: write_buf,
+            read_buf,
+            write_buf,
+            write_buf_lw,
+            write_buf_hw,
             _priv: (),
         }
     }
@@ -273,6 +285,12 @@ pub struct FramedParts<T, U> {
     /// A buffer with unprocessed data which are not written yet.
     pub write_buf: BytesMut,
 
+    /// A buffer low watermark capacity
+    pub write_buf_lw: usize,
+
+    /// A buffer high watermark capacity
+    pub write_buf_hw: usize,
+
     /// This private field allows us to add additional fields in the future in a
     /// backwards compatible way.
     _priv: (),
@@ -286,6 +304,8 @@ impl<T, U> FramedParts<T, U> {
             codec,
             read_buf: BytesMut::new(),
             write_buf: BytesMut::new(),
+            write_buf_lw: LW,
+            write_buf_hw: HW,
             _priv: (),
         }
     }
diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs
index 1348b09c..57789cc6 100644
--- a/src/codec/framed_write.rs
+++ b/src/codec/framed_write.rs
@@ -16,20 +16,19 @@ pub struct FramedWrite<T, E> {
 pub struct FramedWrite2<T> {
     inner: T,
     buffer: BytesMut,
+    low_watermark: usize,
+    high_watermark: usize,
 }
 
-const INITIAL_CAPACITY: usize = 8 * 1024;
-const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
-
 impl<T, E> FramedWrite<T, E>
 where
     T: AsyncWrite,
     E: Encoder,
 {
     /// Creates a new `FramedWrite` with the given `encoder`.
-    pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
+    pub fn new(inner: T, encoder: E, lw: usize, hw: usize) -> FramedWrite<T, E> {
         FramedWrite {
-            inner: framed_write2(Fuse(inner, encoder)),
+            inner: framed_write2(Fuse(inner, encoder), lw, hw),
         }
     }
 }
@@ -124,21 +123,25 @@ where
 
 // ===== impl FramedWrite2 =====
 
-pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
+pub fn framed_write2<T>(inner: T, low_watermark: usize, high_watermark: usize) -> FramedWrite2<T> {
     FramedWrite2 {
-        inner: inner,
-        buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+        inner,
+        low_watermark,
+        high_watermark,
+        buffer: BytesMut::with_capacity(high_watermark),
     }
 }
 
-pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
-    if buf.capacity() < INITIAL_CAPACITY {
-        let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
-        buf.reserve(bytes_to_reserve);
+pub fn framed_write2_with_buffer<T>(inner: T, mut buffer: BytesMut, low_watermark: usize, high_watermark: usize) -> FramedWrite2<T> {
+    if buffer.capacity() < high_watermark {
+        let bytes_to_reserve = high_watermark - buffer.capacity();
+        buffer.reserve(bytes_to_reserve);
     }
     FramedWrite2 {
-        inner: inner,
-        buffer: buf,
+        inner,
+        buffer,
+        low_watermark,
+        high_watermark,
     }
 }
 
@@ -151,8 +154,8 @@ impl<T> FramedWrite2<T> {
         self.inner
     }
 
-    pub fn into_parts(self) -> (T, BytesMut) {
-        (self.inner, self.buffer)
+    pub fn into_parts(self) -> (T, BytesMut, usize, usize) {
+        (self.inner, self.buffer, self.low_watermark, self.high_watermark)
     }
 
     pub fn get_mut(&mut self) -> &mut T {
@@ -168,15 +171,13 @@ where
     type SinkError = T::Error;
 
     fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
-        // If the buffer is already over 8KiB, then attempt to flush it. If after
-        // flushing it's *still* over 8KiB, then apply backpressure (reject the
-        // send).
-        if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
-            try!(self.poll_complete());
-
-            if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
-                return Ok(AsyncSink::NotReady(item));
-            }
+        // Check the buffer capacity
+        let len = self.buffer.len();
+        if len >= self.high_watermark {
+            return Ok(AsyncSink::NotReady(item));
+        }
+        if len < self.low_watermark {
+            self.buffer.reserve(self.high_watermark - len)
         }
 
         try!(self.inner.encode(item, &mut self.buffer));
diff --git a/src/codec/mod.rs b/src/codec/mod.rs
index 140e77bb..7674cdf5 100644
--- a/src/codec/mod.rs
+++ b/src/codec/mod.rs
@@ -14,12 +14,12 @@
 
 mod bcodec;
 mod framed;
-mod framed2;
+// mod framed2;
 mod framed_read;
 mod framed_write;
 
 pub use self::bcodec::BytesCodec;
 pub use self::framed::{Framed, FramedParts};
-pub use self::framed2::{Framed2, FramedParts2};
+// pub use self::framed2::{Framed2, FramedParts2};
 pub use self::framed_read::FramedRead;
 pub use self::framed_write::FramedWrite;