update to h2 0.3

This commit is contained in:
fakeshadow 2020-12-24 03:05:58 +08:00
parent 209e85add7
commit ac80b4c24b
29 changed files with 135 additions and 116 deletions

View File

@ -88,7 +88,7 @@ actix-web-codegen = "0.4.0"
actix-http = "2.2.0"
awc = { version = "2.0.3", default-features = false }
bytes = "0.6"
bytes = "1"
derive_more = "0.99.5"
encoding_rs = "0.8"
futures-channel = { version = "0.3.5", default-features = false }
@ -141,7 +141,6 @@ actix-tls = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio
actix-connect = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-utils = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-codec = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
h2 = { git = "https://github.com/hyperium/h2.git" }
http = { git = "https://github.com/fakeshadow/http.git" }
[[bench]]

View File

@ -1,7 +1,7 @@
# Changes
## Unreleased - 2020-xx-xx
* Update `Bytes` to 0.6.
* Update `bytes` to `1`.
## 0.4.1 - 2020-11-24

View File

@ -20,7 +20,7 @@ path = "src/lib.rs"
actix-web = { version = "3.0.0", default-features = false }
actix-service = "1.0.6"
bitflags = "1"
bytes = "0.6"
bytes = "1"
futures-core = { version = "0.3.7", default-features = false }
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
derive_more = "0.99.2"

View File

@ -125,9 +125,8 @@ impl Files {
/// Set custom directory renderer
pub fn files_listing_renderer<F>(mut self, f: F) -> Self
where
for<'r, 's> F:
Fn(&'r Directory, &'s HttpRequest) -> Result<ServiceResponse, io::Error>
+ 'static,
for<'r, 's> F: Fn(&'r Directory, &'s HttpRequest) -> Result<ServiceResponse, io::Error>
+ 'static,
{
self.renderer = Rc::new(f);
self
@ -201,11 +200,11 @@ impl Files {
where
F: IntoServiceFactory<U>,
U: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
{
// create and configure default resource
self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory(

View File

@ -1,7 +1,7 @@
# Changes
## Unreleased - 2020-xx-xx
* Update `Bytes` to 0.6.
* Update `bytes` to `1`.
## 2.1.0 - 2020-11-25

View File

@ -39,9 +39,9 @@ actix-testing = "1.0.0"
awc = "2.0.0"
base64 = "0.13"
bytes = "0.6"
bytes = "1"
futures-core = { version = "0.3.5", default-features = false }
http = "0.2.0"
http = "0.2.2"
log = "0.4"
socket2 = "0.3"
serde = "1.0"

View File

@ -3,6 +3,8 @@
## Unreleased - 2020-xx-xx
### Changed
* Bumped `rand` to `0.8`
* Upgrade `bytes` to `1`
* Upgrade `h2` to `0.3`
## 2.2.0 - 2020-11-25
### Added

View File

@ -51,7 +51,7 @@ actix = { version = "0.10.0", optional = true }
base64 = "0.13"
bitflags = "1.2"
bytes = "0.6"
bytes = "1"
cookie = { version = "0.14.1", features = ["percent-encode"] }
copyless = "0.1.4"
derive_more = "0.99.2"
@ -62,7 +62,7 @@ futures-core = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false, features = ["sink"] }
fxhash = "0.2.1"
h2 = "0.3.0"
http = "0.2.1"
http = "0.2.2"
httparse = "1.3"
indexmap = "1.3"
itoa = "0.4"

View File

@ -62,10 +62,10 @@ impl Connector<(), ()> {
#[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
pub fn new() -> Connector<
impl Service<
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, TcpStream>,
Error = actix_connect::ConnectError,
> + Clone,
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, TcpStream>,
Error = actix_connect::ConnectError,
> + Clone,
TcpStream,
> {
Connector {
@ -117,10 +117,10 @@ impl<T, U> Connector<T, U> {
where
U1: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
T1: Service<
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U1>,
Error = actix_connect::ConnectError,
> + Clone,
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U1>,
Error = actix_connect::ConnectError,
> + Clone,
{
Connector {
connector,
@ -135,10 +135,10 @@ impl<T, U> Connector<T, U>
where
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
T: Service<
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U>,
Error = actix_connect::ConnectError,
> + Clone
Request = TcpConnect<Uri>,
Response = TcpConnection<Uri, U>,
Error = actix_connect::ConnectError,
> + Clone
+ 'static,
{
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.

View File

@ -388,8 +388,10 @@ where
}
}
#[pin_project::pin_project]
struct CloseConnection<T> {
io: T,
#[pin]
timeout: Sleep,
}
@ -412,11 +414,11 @@ where
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.get_mut();
let this = self.project();
match Pin::new(&mut this.timeout).poll(cx) {
match this.timeout.poll(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => match Pin::new(&mut this.io).poll_shutdown(cx) {
Poll::Pending => match Pin::new(this.io).poll_shutdown(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
},

View File

@ -108,6 +108,8 @@ where
messages: VecDeque<DispatcherMessage>,
ka_expire: Instant,
#[pin]
ka_timer: Option<Sleep>,
io: Option<T>,
@ -644,7 +646,7 @@ where
// shutdown timeout
if this.flags.contains(Flags::SHUTDOWN) {
if let Some(interval) = this.codec.config().client_disconnect_timer() {
*this.ka_timer = Some(sleep_until(interval));
this.ka_timer.set(Some(sleep_until(interval)));
} else {
this.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = this.payload.take() {
@ -657,12 +659,14 @@ where
}
}
match Pin::new(&mut this.ka_timer.as_mut().unwrap()).poll(cx) {
match this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(()) => {
// if we get timeout during shutdown, drop connection
if this.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout);
} else if this.ka_timer.as_mut().unwrap().deadline() >= *this.ka_expire {
} else if this.ka_timer.as_mut().as_pin_mut().unwrap().deadline()
>= *this.ka_expire
{
// check for any outstanding tasks
if this.state.is_empty() && this.write_buf.is_empty() {
if this.flags.contains(Flags::STARTED) {
@ -673,9 +677,15 @@ where
if let Some(deadline) =
this.codec.config().client_disconnect_timer()
{
if let Some(mut timer) = this.ka_timer.as_mut() {
if let Some(timer) = this.ka_timer.as_mut().as_pin_mut()
{
timer.reset(deadline);
let _ = Pin::new(&mut timer).poll(cx);
let _ = this
.ka_timer
.as_mut()
.as_pin_mut()
.unwrap()
.poll(cx);
}
} else {
// no shutdown timeout, drop socket
@ -700,14 +710,15 @@ where
} else if let Some(deadline) =
this.codec.config().keep_alive_expire()
{
if let Some(mut timer) = this.ka_timer.as_mut() {
if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() {
timer.reset(deadline);
let _ = Pin::new(&mut timer).poll(cx);
let _ =
this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx);
}
}
} else if let Some(mut timer) = this.ka_timer.as_mut() {
} else if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() {
timer.reset(*this.ka_expire);
let _ = Pin::new(&mut timer).poll(cx);
let _ = this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx);
}
}
Poll::Pending => (),

View File

@ -135,7 +135,7 @@ pub(crate) trait MessageType: Sized {
let mut has_date = false;
let mut buf = dst.bytes_mut().as_mut_ptr() as *mut u8;
let mut buf = dst.chunk_mut().as_mut_ptr() as *mut u8;
let mut remaining = dst.capacity() - dst.len();
// tracks bytes written since last buffer resize
@ -177,7 +177,7 @@ pub(crate) trait MessageType: Sized {
// re-assign buf raw pointer since it's possible that the buffer was
// reallocated and/or resized
buf = dst.bytes_mut().as_mut_ptr() as *mut u8;
buf = dst.chunk_mut().as_mut_ptr() as *mut u8;
}
// SAFETY: on each write, it is enough to ensure that the advancement of the
@ -224,7 +224,7 @@ pub(crate) trait MessageType: Sized {
// re-assign buf raw pointer since it's possible that the buffer was
// reallocated and/or resized
buf = dst.bytes_mut().as_mut_ptr() as *mut u8;
buf = dst.chunk_mut().as_mut_ptr() as *mut u8;
}
// SAFETY: on each write, it is enough to ensure that the advancement of

View File

@ -358,11 +358,15 @@ impl AsyncRead for TestSeqBuffer {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let r = self.get_mut().read(buf);
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let dst = buf.initialize_unfilled();
let r = self.get_mut().read(dst);
match r {
Ok(n) => Poll::Ready(Ok(n)),
Ok(n) => {
buf.advance(n);
Poll::Ready(Ok(()))
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
Err(err) => Poll::Ready(Err(err)),
}

View File

@ -2,7 +2,7 @@
## Unreleased - 2020-xx-xx
* Fix multipart consuming payload before header checks #1513
* Update `Bytes` to 0.6
* Update `bytes` to `1`
## 3.0.0 - 2020-09-11

View File

@ -19,7 +19,8 @@ path = "src/lib.rs"
actix-web = { version = "3.0.0", default-features = false }
actix-service = "1.0.6"
actix-utils = "2.0.0"
bytes = "0.6"
bytes = "1"
derive_more = "0.99.2"
httparse = "1.3"
futures-util = { version = "0.3.5", default-features = false }

View File

@ -2,7 +2,7 @@
## Unreleased - 2020-xx-xx
* Upgrade `pin-project` to `1.0`.
* Update `Bytes` to 0.6.
* Update `bytes` to `1`.
## 3.0.0 - 2020-09-11
* No significant changes from `3.0.0-beta.2`.

View File

@ -20,7 +20,7 @@ actix = "0.10.0"
actix-web = { version = "3.0.0", default-features = false }
actix-http = "2.0.0"
actix-codec = "0.3.0"
bytes = "0.6"
bytes = "1"
futures-channel = { version = "0.3.5", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
pin-project = "1.0.0"

View File

@ -2,7 +2,7 @@
## Unreleased - 2020-xx-xx
### Changed
* Upgrade `bytes` to `0.6`.
* Upgrade `bytes` to `1`.
* Bumped `rand` to `0.8`

View File

@ -43,7 +43,7 @@ actix-http = "2.2.0"
actix-rt = "1.0.0"
base64 = "0.13"
bytes = "0.6"
bytes = "1"
cfg-if = "1.0"
derive_more = "0.99.2"
futures-core = { version = "0.3.5", default-features = false }

View File

@ -56,7 +56,8 @@ impl Into<SendRequestError> for PrepForSendingError {
pub enum SendClientRequest {
Fut(
Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>,
Option<Sleep>,
// FIXME: use a pinned Sleep instead of box.
Option<Pin<Box<Sleep>>>,
bool,
),
Err(Option<SendRequestError>),
@ -68,7 +69,7 @@ impl SendClientRequest {
response_decompress: bool,
timeout: Option<Duration>,
) -> SendClientRequest {
let delay = timeout.map(sleep);
let delay = timeout.map(|d| Box::pin(sleep(d)));
SendClientRequest::Fut(send, delay, response_decompress)
}
}

View File

@ -71,7 +71,6 @@ impl WebsocketsRequest {
{
let mut err = None;
#[allow(clippy::field_reassign_with_default)]
let mut head = {
let mut head = RequestHead::default();
head.method = Method::GET;

View File

@ -270,11 +270,11 @@ where
where
F: IntoServiceFactory<U>,
U: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
U::InitError: fmt::Debug,
{
// create and configure default resource

View File

@ -107,12 +107,12 @@ impl AppService {
) where
F: IntoServiceFactory<S>,
S: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
self.services.push((
rdef,

View File

@ -156,10 +156,10 @@ impl<T: FromRequest, S> Extract<T, S> {
impl<T: FromRequest, S> ServiceFactory for Extract<T, S>
where
S: Service<
Request = (T, HttpRequest),
Response = ServiceResponse,
Error = Infallible,
> + Clone,
Request = (T, HttpRequest),
Response = ServiceResponse,
Error = Infallible,
> + Clone,
{
type Request = ServiceRequest;
type Response = ServiceResponse;
@ -185,10 +185,10 @@ pub struct ExtractService<T: FromRequest, S> {
impl<T: FromRequest, S> Service for ExtractService<T, S>
where
S: Service<
Request = (T, HttpRequest),
Response = ServiceResponse,
Error = Infallible,
> + Clone,
Request = (T, HttpRequest),
Response = ServiceResponse,
Error = Infallible,
> + Clone,
{
type Request = ServiceRequest;
type Response = ServiceResponse;

View File

@ -347,11 +347,11 @@ where
where
F: IntoServiceFactory<U>,
U: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
U::InitError: fmt::Debug,
{
// create and configure default resource
@ -368,12 +368,12 @@ where
impl<T> HttpServiceFactory for Resource<T>
where
T: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
fn register(mut self, config: &mut AppService) {
let guards = if self.guards.is_empty() {

View File

@ -287,11 +287,11 @@ where
where
F: IntoServiceFactory<U>,
U: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
> + 'static,
U::InitError: fmt::Debug,
{
// create and configure default resource
@ -410,12 +410,12 @@ where
impl<T> HttpServiceFactory for Scope<T>
where
T: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
fn register(mut self, config: &mut AppService) {
// update default resource if needed

View File

@ -488,12 +488,12 @@ impl WebService {
where
F: IntoServiceFactory<T>,
T: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
WebServiceImpl {
srv: service.into_factory(),
@ -514,12 +514,12 @@ struct WebServiceImpl<T> {
impl<T> HttpServiceFactory for WebServiceImpl<T>
where
T: ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
Config = (),
Request = ServiceRequest,
Response = ServiceResponse,
Error = Error,
InitError = (),
> + 'static,
{
fn register(mut self, config: &mut AppService) {
let guards = if self.guards.is_empty() {

View File

@ -183,7 +183,7 @@ where
let config = JsonConfig::from_req(req);
let limit = config.limit;
let ctype = config.content_type.clone();
let ctype = config.content_type.as_deref();
let err_handler = config.err_handler.clone();
JsonExtractFut {
@ -361,13 +361,13 @@ where
pub fn new(
req: &HttpRequest,
payload: &mut Payload,
ctype: Option<Arc<dyn Fn(mime::Mime) -> bool + Send + Sync>>,
ctype: Option<&(dyn Fn(mime::Mime) -> bool + Send + Sync)>,
) -> Self {
// check content-type
let json = if let Ok(Some(mime)) = req.mime_type() {
mime.subtype() == mime::JSON
|| mime.suffix() == Some(mime::JSON)
|| ctype.as_ref().map_or(false, |predicate| predicate(mime))
|| ctype.map_or(false, |predicate| predicate(mime))
} else {
false
};

View File

@ -45,7 +45,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
struct TestBody {
data: Bytes,
chunk_size: usize,
delay: actix_rt::time::Sleep,
delay: Pin<Box<actix_rt::time::Sleep>>,
}
impl TestBody {
@ -53,7 +53,7 @@ impl TestBody {
TestBody {
data,
chunk_size,
delay: actix_rt::time::sleep(std::time::Duration::from_millis(10)),
delay: Box::pin(actix_rt::time::sleep(std::time::Duration::from_millis(10))),
}
}
}
@ -67,7 +67,8 @@ impl futures_core::stream::Stream for TestBody {
) -> Poll<Option<Self::Item>> {
ready!(Pin::new(&mut self.delay).poll(cx));
self.delay = actix_rt::time::sleep(std::time::Duration::from_millis(10));
self.delay =
Box::pin(actix_rt::time::sleep(std::time::Duration::from_millis(10)));
let chunk_size = std::cmp::min(self.chunk_size, self.data.len());
let chunk = self.data.split_to(chunk_size);
if chunk.is_empty() {