work in progress

This commit is contained in:
fakeshadow 2020-11-04 22:43:11 +08:00
parent 61b65aa64a
commit 8250de1f83
24 changed files with 88 additions and 103 deletions

View File

@ -1,6 +1,8 @@
# Changes
## Unreleased - 2020-xx-xx
### Changed
* Upgrade `bytes` to `0.6`.
## 3.2.0 - 2020-10-30

View File

@ -88,7 +88,7 @@ actix-web-codegen = "0.4.0"
actix-http = "2.1.0"
awc = { version = "2.0.0", default-features = false }
bytes = "0.5.3"
bytes = "0.6"
derive_more = "0.99.2"
encoding_rs = "0.8"
futures-channel = { version = "0.3.5", default-features = false }
@ -133,6 +133,16 @@ actix-files = { path = "actix-files" }
actix-multipart = { path = "actix-multipart" }
awc = { path = "awc" }
# FIXME: remove these override
actix-rt = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-server = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-tls = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-connect = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-utils = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
actix-codec = { git = "https://github.com/fakeshadow/actix-net.git", branch = "mio-0.7.3" }
http = { git = "https://github.com/paolobarbolini/http.git", branch = "bytes06" }
h2 = { git = "https://github.com/paolobarbolini/h2", branch = "bytes06" }
[[bench]]
name = "server"
harness = false

View File

@ -4,6 +4,9 @@
### Added
* HttpResponse builders for 1xx status codes. [#1768]
### Changed
* Upgrade `bytes` to `0.6`.
### Fixed
* Started dropping `transfer-encoding: chunked` and `Content-Length` for 1XX and 204 responses. [#1767]

View File

@ -51,7 +51,7 @@ actix = { version = "0.10.0", optional = true }
base64 = "0.13"
bitflags = "1.2"
bytes = "0.5.3"
bytes = "0.6"
cookie = { version = "0.14.1", features = ["percent-encode"] }
copyless = "0.1.4"
derive_more = "0.99.2"
@ -61,8 +61,13 @@ futures-channel = { version = "0.3.5", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
futures-util = { version = "0.3.5", default-features = false }
fxhash = "0.2.1"
h2 = "0.2.1"
http = "0.2.0"
#h2 = "0.2.7"
#http = "0.2.0"
# FIXME: Use release version
http = { git = "https://github.com/paolobarbolini/http.git", branch = "bytes06" }
h2 = { git = "https://github.com/paolobarbolini/h2", branch = "bytes06" }
httparse = "1.3"
indexmap = "1.3"
itoa = "0.4"

View File

@ -1,10 +1,10 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io, mem, time};
use std::{fmt, io, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{Buf, Bytes};
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use bytes::Bytes;
use futures_util::future::{err, Either, FutureExt, LocalBoxFuture, Ready};
use h2::client::SendRequest;
use pin_project::pin_project;
@ -223,23 +223,13 @@ where
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.project() {
EitherIoProj::A(val) => val.poll_read(cx, buf),
EitherIoProj::B(val) => val.poll_read(cx, buf),
}
}
unsafe fn prepare_uninitialized_buffer(
&self,
buf: &mut [mem::MaybeUninit<u8>],
) -> bool {
match self {
EitherIo::A(ref val) => val.prepare_uninitialized_buffer(buf),
EitherIo::B(ref val) => val.prepare_uninitialized_buffer(buf),
}
}
}
impl<A, B> AsyncWrite for EitherIo<A, B>
@ -274,18 +264,4 @@ where
EitherIoProj::B(val) => val.poll_shutdown(cx),
}
}
fn poll_write_buf<U: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut U,
) -> Poll<Result<usize, io::Error>>
where
Self: Sized,
{
match self.project() {
EitherIoProj::A(val) => val.poll_write_buf(cx, buf),
EitherIoProj::B(val) => val.poll_write_buf(cx, buf),
}
}
}

View File

@ -1,10 +1,10 @@
use std::io::Write;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{io, mem, time};
use std::{io, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::buf::BufMutExt;
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::future::poll_fn;
@ -204,18 +204,11 @@ where
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncRead for H1Connection<T> {
unsafe fn prepare_uninitialized_buffer(
&self,
buf: &mut [mem::MaybeUninit<u8>],
) -> bool {
self.io.as_ref().unwrap().prepare_uninitialized_buffer(buf)
}
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io.as_mut().unwrap()).poll_read(cx, buf)
}
}

View File

@ -6,8 +6,8 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::{delay_for, Delay};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::time::{sleep, Sleep};
use actix_service::Service;
use actix_utils::{oneshot, task::LocalWaker};
use bytes::Bytes;
@ -333,10 +333,11 @@ where
} else {
let mut io = conn.io;
let mut buf = [0; 2];
let mut read_buf = ReadBuf::new(&mut buf);
if let ConnectionType::H1(ref mut s) = io {
match Pin::new(s).poll_read(cx, &mut buf) {
match Pin::new(s).poll_read(cx, &mut read_buf) {
Poll::Pending => (),
Poll::Ready(Ok(n)) if n > 0 => {
Poll::Ready(Ok(())) if read_buf.filled().len() > 0 => {
if let Some(timeout) = self.config.disconnect_timeout {
if let ConnectionType::H1(io) = io {
actix_rt::spawn(CloseConnection::new(
@ -388,7 +389,7 @@ where
struct CloseConnection<T> {
io: T,
timeout: Delay,
timeout: Sleep,
}
impl<T> CloseConnection<T>
@ -398,7 +399,7 @@ where
fn new(io: T, timeout: Duration) -> Self {
CloseConnection {
io,
timeout: delay_for(timeout),
timeout: sleep(timeout),
}
}
}

View File

@ -4,7 +4,7 @@ use std::rc::Rc;
use std::time::Duration;
use std::{fmt, net};
use actix_rt::time::{delay_for, delay_until, Delay, Instant};
use actix_rt::time::{sleep, sleep_until, Instant, Sleep};
use bytes::BytesMut;
use futures_util::{future, FutureExt};
use time::OffsetDateTime;
@ -121,10 +121,10 @@ impl ServiceConfig {
#[inline]
/// Client timeout for first request.
pub fn client_timer(&self) -> Option<Delay> {
pub fn client_timer(&self) -> Option<Sleep> {
let delay_time = self.0.client_timeout;
if delay_time != 0 {
Some(delay_until(
Some(sleep_until(
self.0.timer.now() + Duration::from_millis(delay_time),
))
} else {
@ -154,9 +154,9 @@ impl ServiceConfig {
#[inline]
/// Return keep-alive timer delay is configured.
pub fn keep_alive_timer(&self) -> Option<Delay> {
pub fn keep_alive_timer(&self) -> Option<Sleep> {
if let Some(ka) = self.0.keep_alive {
Some(delay_until(self.0.timer.now() + ka))
Some(sleep_until(self.0.timer.now() + ka))
} else {
None
}
@ -266,7 +266,7 @@ impl DateService {
// periodic date update
let s = self.clone();
actix_rt::spawn(delay_for(Duration::from_millis(500)).then(move |_| {
actix_rt::spawn(sleep(Duration::from_millis(500)).then(move |_| {
s.0.reset();
future::ready(())
}));

View File

@ -5,7 +5,7 @@ use std::task::{Context, Poll};
use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use actix_rt::time::{delay_until, Delay, Instant};
use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_service::Service;
use bitflags::bitflags;
use bytes::{Buf, BytesMut};
@ -102,7 +102,7 @@ where
messages: VecDeque<DispatcherMessage>,
ka_expire: Instant,
ka_timer: Option<Delay>,
ka_timer: Option<Sleep>,
io: Option<T>,
read_buf: BytesMut,
@ -203,7 +203,7 @@ where
codec: Codec,
config: ServiceConfig,
read_buf: BytesMut,
timeout: Option<Delay>,
timeout: Option<Sleep>,
service: CloneableService<S>,
expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>,
@ -635,7 +635,7 @@ where
// shutdown timeout
if this.flags.contains(Flags::SHUTDOWN) {
if let Some(interval) = this.codec.config().client_disconnect_timer() {
*this.ka_timer = Some(delay_until(interval));
*this.ka_timer = Some(sleep_until(interval));
} else {
this.flags.insert(Flags::READ_DISCONNECT);
if let Some(mut payload) = this.payload.take() {
@ -912,7 +912,8 @@ fn read<T>(
where
T: AsyncRead + Unpin,
{
Pin::new(io).poll_read_buf(cx, buf)
// FIXME: use tokio-util poll_read_buf
actix_codec::util::poll_read_buf(Pin::new(io), cx, buf)
}
#[cfg(test)]

View File

@ -6,7 +6,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::{Delay, Instant};
use actix_rt::time::{Instant, Sleep};
use actix_service::Service;
use bytes::{Bytes, BytesMut};
use h2::server::{Connection, SendResponse};
@ -41,7 +41,7 @@ where
config: ServiceConfig,
peer_addr: Option<net::SocketAddr>,
ka_expire: Instant,
ka_timer: Option<Delay>,
ka_timer: Option<Sleep>,
_t: PhantomData<B>,
}
@ -60,7 +60,7 @@ where
on_connect: Option<Box<dyn DataFactory>>,
on_connect_data: Extensions,
config: ServiceConfig,
timeout: Option<Delay>,
timeout: Option<Sleep>,
peer_addr: Option<net::SocketAddr>,
) -> Self {
// let keepalive = config.keep_alive_enabled();

View File

@ -3,7 +3,7 @@ use std::io::Write;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use bytes::{buf::BufMutExt, BytesMut};
use bytes::{buf::BufMut, BytesMut};
use http::header::{HeaderValue, InvalidHeaderValue};
use time::{offset, OffsetDateTime, PrimitiveDateTime};

View File

@ -5,7 +5,7 @@ use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use bytes::{Bytes, BytesMut};
use http::header::{self, HeaderName, HeaderValue};
use http::{Error as HttpError, Method, Uri, Version};
@ -245,9 +245,9 @@ impl AsyncRead for TestBuffer {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(self.get_mut().read(buf))
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(self.get_mut().read(buf.filled_mut()).map(|_| ()))
}
}

View File

@ -3,7 +3,7 @@ use std::time::Duration;
use std::{net, thread};
use actix_http_test::test_server;
use actix_rt::time::delay_for;
use actix_rt::time::sleep;
use actix_service::fn_service;
use bytes::Bytes;
use futures_util::future::{self, err, ok, ready, FutureExt};
@ -88,7 +88,7 @@ async fn test_expect_continue_h1() {
let srv = test_server(|| {
HttpService::build()
.expect(fn_service(|req: Request| {
delay_for(Duration::from_millis(20)).then(move |_| {
sleep(Duration::from_millis(20)).then(move |_| {
if req.head().uri.query() == Some("yes=") {
ok(req)
} else {

View File

@ -1,7 +1,8 @@
# Changes
## Unreleased - 2020-xx-xx
### Changed
* Upgrade `bytes` to `0.6`.
## 2.0.1 - 2020-10-30
### Changed

View File

@ -43,7 +43,7 @@ actix-http = "2.0.0"
actix-rt = "1.0.0"
base64 = "0.13"
bytes = "0.5.3"
bytes = "0.6"
cfg-if = "1.0"
derive_more = "0.99.2"
futures-core = { version = "0.3.5", default-features = false }

View File

@ -2,9 +2,9 @@ use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{fmt, io, mem, net};
use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use actix_http::body::Body;
use actix_http::client::{
Connect as ClientConnect, ConnectError, Connection, SendRequestError,
@ -221,18 +221,11 @@ impl fmt::Debug for BoxedSocket {
}
impl AsyncRead for BoxedSocket {
unsafe fn prepare_uninitialized_buffer(
&self,
buf: &mut [mem::MaybeUninit<u8>],
) -> bool {
self.0.as_read().prepare_uninitialized_buffer(buf)
}
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf)
}
}

View File

@ -5,7 +5,7 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::time::{delay_for, Delay};
use actix_rt::time::{sleep, Sleep};
use bytes::Bytes;
use derive_more::From;
use futures_core::Stream;
@ -56,7 +56,7 @@ impl Into<SendRequestError> for PrepForSendingError {
pub enum SendClientRequest {
Fut(
Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>,
Option<Delay>,
Option<Sleep>,
bool,
),
Err(Option<SendRequestError>),
@ -68,7 +68,7 @@ impl SendClientRequest {
response_decompress: bool,
timeout: Option<Duration>,
) -> SendClientRequest {
let delay = timeout.map(delay_for);
let delay = timeout.map(sleep);
SendClientRequest::Fut(send, delay, response_decompress)
}
}

View File

@ -108,7 +108,7 @@ async fn test_form() {
async fn test_timeout() {
let srv = test::start(|| {
App::new().service(web::resource("/").route(web::to(|| async {
actix_rt::time::delay_for(Duration::from_millis(200)).await;
actix_rt::time::sleep(Duration::from_millis(200)).await;
Ok::<_, Error>(HttpResponse::Ok().body(STR))
})))
});
@ -136,7 +136,7 @@ async fn test_timeout() {
async fn test_timeout_override() {
let srv = test::start(|| {
App::new().service(web::resource("/").route(web::to(|| async {
actix_rt::time::delay_for(Duration::from_millis(200)).await;
actix_rt::time::sleep(Duration::from_millis(200)).await;
Ok::<_, Error>(HttpResponse::Ok().body(STR))
})))
});

View File

@ -586,7 +586,7 @@ impl ServiceFactory for ResourceEndpoint {
mod tests {
use std::time::Duration;
use actix_rt::time::delay_for;
use actix_rt::time::sleep;
use actix_service::Service;
use futures_util::future::ok;
@ -654,7 +654,7 @@ mod tests {
async fn test_to() {
let mut srv =
init_service(App::new().service(web::resource("/test").to(|| async {
delay_for(Duration::from_millis(100)).await;
sleep(Duration::from_millis(100)).await;
Ok::<_, Error>(HttpResponse::Ok())
})))
.await;

View File

@ -345,7 +345,7 @@ where
mod tests {
use std::time::Duration;
use actix_rt::time::delay_for;
use actix_rt::time::sleep;
use bytes::Bytes;
use serde_derive::Serialize;
@ -369,16 +369,16 @@ mod tests {
Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
}))
.route(web::post().to(|| async {
delay_for(Duration::from_millis(100)).await;
sleep(Duration::from_millis(100)).await;
Ok::<_, ()>(HttpResponse::Created())
}))
.route(web::delete().to(|| async {
delay_for(Duration::from_millis(100)).await;
sleep(Duration::from_millis(100)).await;
Err::<HttpResponse, _>(error::ErrorBadRequest("err"))
})),
)
.service(web::resource("/json").route(web::get().to(|| async {
delay_for(Duration::from_millis(25)).await;
sleep(Duration::from_millis(25)).await;
web::Json(MyObject {
name: "test".to_string(),
})

View File

@ -149,7 +149,7 @@ where
/// This method should be called before `bind()` method call.
pub fn backlog(mut self, backlog: i32) -> Self {
self.backlog = backlog;
self.builder = self.builder.backlog(backlog);
self.builder = self.builder.backlog(backlog as u32);
self
}

View File

@ -11,7 +11,7 @@ use actix_http::http::{Error as HttpError, Method, StatusCode, Uri, Version};
use actix_http::test::TestRequest as HttpTestRequest;
use actix_http::{cookie::Cookie, ws, Extensions, HttpService, Request};
use actix_router::{Path, ResourceDef, Url};
use actix_rt::{time::delay_for, System};
use actix_rt::{time::sleep, System};
use actix_service::{
map_config, IntoService, IntoServiceFactory, Service, ServiceFactory,
};
@ -1021,7 +1021,7 @@ impl TestServer {
pub async fn stop(self) {
self.server.stop(true).await;
self.system.stop();
delay_for(time::Duration::from_millis(100)).await;
sleep(time::Duration::from_millis(100)).await;
}
}

View File

@ -39,7 +39,7 @@ actix-testing = "1.0.0"
awc = "2.0.0"
base64 = "0.13"
bytes = "0.5.3"
bytes = "0.6"
futures-core = { version = "0.3.5", default-features = false }
http = "0.2.0"
log = "0.4"

View File

@ -45,7 +45,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
struct TestBody {
data: Bytes,
chunk_size: usize,
delay: actix_rt::time::Delay,
delay: actix_rt::time::Sleep,
}
impl TestBody {
@ -53,7 +53,7 @@ impl TestBody {
TestBody {
data,
chunk_size,
delay: actix_rt::time::delay_for(std::time::Duration::from_millis(10)),
delay: actix_rt::time::sleep(std::time::Duration::from_millis(10)),
}
}
}
@ -67,7 +67,7 @@ impl futures_core::stream::Stream for TestBody {
) -> Poll<Option<Self::Item>> {
ready!(Pin::new(&mut self.delay).poll(cx));
self.delay = actix_rt::time::delay_for(std::time::Duration::from_millis(10));
self.delay = actix_rt::time::sleep(std::time::Duration::from_millis(10));
let chunk_size = std::cmp::min(self.chunk_size, self.data.len());
let chunk = self.data.split_to(chunk_size);
if chunk.is_empty() {