update mio for actix-server

This commit is contained in:
fakeshadow 2020-10-19 08:31:19 +08:00
parent 242bef269f
commit 80098d2906
31 changed files with 1313 additions and 344 deletions

View File

@ -22,5 +22,5 @@ futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
log = "0.4"
pin-project = "0.4.17"
tokio = { version = "0.2.5", default-features = false }
tokio = "0.3.0"
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }

View File

@ -1 +0,0 @@
../LICENSE-APACHE

201
actix-codec/LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2017-NOW Nikolay Kim
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1 +0,0 @@
../LICENSE-MIT

25
actix-codec/LICENSE-MIT Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2017 Nikolay Kim
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -220,11 +220,14 @@ impl<T, U> Framed<T, U> {
if remaining < LW {
this.read_buf.reserve(HW - remaining)
}
let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(cnt)) => cnt,
};
// FixMe: This must be fixed as `poll_read_buf` is removed
// let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) {
// Poll::Pending => return Poll::Pending,
// Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
// Poll::Ready(Ok(cnt)) => cnt,
// };
let cnt = 0;
if cnt == 0 {
this.flags.insert(Flags::EOF);

View File

@ -46,7 +46,7 @@ trust-dns-resolver = { version = "0.19", default-features = false, features = ["
# openssl
open-ssl = { package = "openssl", version = "0.10", optional = true }
tokio-openssl = { version = "0.4.0", optional = true }
tokio-openssl = { version = "0.5.0", optional = true }
# rustls
rust-tls = { package = "rustls", version = "0.18.0", optional = true }

View File

@ -22,7 +22,7 @@ futures-channel = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
copyless = "0.1.4"
smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
tokio = { version = "0.3.0", features = ["rt", "net", "signal", "stream", "time"] }
[dev-dependencies]
tokio = { version = "0.2.6", features = ["full"] }
tokio = { version = "0.3.0", features = ["full"] }

View File

@ -1 +0,0 @@
../LICENSE-APACHE

201
actix-rt/LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2017-NOW Nikolay Kim
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1 +0,0 @@
../LICENSE-MIT

25
actix-rt/LICENSE-MIT Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2017 Nikolay Kim
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -60,7 +60,7 @@ pub mod net {
/// Utilities for tracking time.
pub mod time {
pub use tokio::time::Instant;
pub use tokio::time::{delay_for, delay_until, Delay};
pub use tokio::time::{interval, interval_at, Interval};
pub use tokio::time::{sleep, sleep_until, Sleep};
pub use tokio::time::{timeout, Timeout};
}

View File

@ -18,10 +18,9 @@ impl Runtime {
#[allow(clippy::new_ret_no_self)]
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
let rt = runtime::Builder::new()
let rt = runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.basic_scheduler()
.build()?;
Ok(Runtime {

View File

@ -95,10 +95,9 @@ impl System {
/// }
///
///
/// let mut runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .worker_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
@ -166,10 +165,9 @@ impl System {
/// }
///
///
/// let runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .worker_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
@ -178,7 +176,7 @@ impl System {
/// ```
pub fn attach_to_tokio<Fut, R>(
name: impl Into<String>,
mut runtime: tokio::runtime::Runtime,
runtime: tokio::runtime::Runtime,
rest_operations: Fut,
) -> R
where

View File

@ -19,7 +19,7 @@ fn await_for_timer() {
let time = Duration::from_secs(2);
let instant = Instant::now();
actix_rt::System::new("test_wait_timer").block_on(async move {
tokio::time::delay_for(time).await;
tokio::time::sleep(time).await;
});
assert!(
instant.elapsed() >= time,
@ -34,7 +34,7 @@ fn join_another_arbiter() {
actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move {
tokio::time::delay_for(time).await;
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
}));
arbiter.join().unwrap();
@ -49,7 +49,7 @@ fn join_another_arbiter() {
let mut arbiter = actix_rt::Arbiter::new();
arbiter.exec_fn(move || {
actix_rt::spawn(async move {
tokio::time::delay_for(time).await;
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
});
});
@ -64,7 +64,7 @@ fn join_another_arbiter() {
actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move {
tokio::time::delay_for(time).await;
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
}));
arbiter.stop();
@ -83,7 +83,7 @@ fn join_current_arbiter() {
let instant = Instant::now();
actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move {
tokio::time::delay_for(time).await;
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
});
actix_rt::Arbiter::local_join().await;
@ -97,12 +97,12 @@ fn join_current_arbiter() {
let instant = Instant::now();
actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move {
tokio::time::delay_for(time).await;
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
});
let f = actix_rt::Arbiter::local_join();
actix_rt::spawn(async move {
tokio::time::delay_for(large_timer).await;
tokio::time::sleep(large_timer).await;
actix_rt::Arbiter::current().stop();
});
f.await;

View File

@ -25,20 +25,17 @@ actix-rt = "1.1.1"
actix-codec = "0.3.0"
actix-utils = "2.0.0"
log = "0.4"
num_cpus = "1.13"
mio = "0.6.19"
socket2 = "0.3"
concurrent-queue = "1.2.2"
futures-channel = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false, features = ["sink"] }
log = "0.4"
mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] }
num_cpus = "1.13"
slab = "0.4"
# unix domain sockets
# FIXME: Remove it and use mio own uds feature once mio 0.7 is released
mio-uds = { version = "0.6.7" }
socket2 = "0.3"
[dev-dependencies]
bytes = "0.5"
env_logger = "0.7"
actix-testing = "1.0.0"
tokio = { version = "0.2", features = ["io-util"] }
tokio = { version = "0.3.0", features = ["full"] }

View File

@ -1 +0,0 @@
../LICENSE-APACHE

201
actix-server/LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2017-NOW Nikolay Kim
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1 +0,0 @@
../LICENSE-MIT

25
actix-server/LICENSE-MIT Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2017 Nikolay Kim
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -21,7 +21,7 @@ use actix_service::pipeline_factory;
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadBuf};
#[actix_rt::main]
async fn main() -> io::Result<()> {
@ -49,10 +49,10 @@ async fn main() -> io::Result<()> {
let num = num + 1;
let mut size = 0;
let mut buf = BytesMut::new();
let mut buf = Vec::new();
loop {
match stream.read_buf(&mut buf).await {
match stream.read(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
@ -72,7 +72,7 @@ async fn main() -> io::Result<()> {
}
// send data down service pipeline
Ok((buf.freeze(), size))
Ok((buf.len(), size))
}
})
.map_err(|err| error!("Service Error: {:?}", err))

View File

@ -1,119 +1,100 @@
use std::sync::mpsc as sync_mpsc;
use std::time::Duration;
use std::{io, thread};
use actix_rt::time::{delay_until, Instant};
use actix_rt::time::{sleep_until, Instant};
use actix_rt::System;
use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::server::Server;
use crate::socket::{SocketAddr, SocketListener, StdListener};
use crate::socket::{MioSocketListener, SocketAddr, StdListener};
use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN};
use crate::worker::{Conn, WorkerClient};
use crate::Token;
pub(crate) enum Command {
Pause,
Resume,
Stop,
Worker(WorkerClient),
}
struct ServerSocketInfo {
addr: SocketAddr,
token: Token,
sock: SocketListener,
sock: MioSocketListener,
timeout: Option<Instant>,
}
/// accept notify would clone waker queue from accept loop and use it to push new interest and wake
/// up the accept poll.
#[derive(Clone)]
pub(crate) struct AcceptNotify(mio::SetReadiness);
pub(crate) struct AcceptNotify(WakerQueue);
impl AcceptNotify {
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
AcceptNotify(ready)
pub(crate) fn new(waker: WakerQueue) -> Self {
Self(waker)
}
pub(crate) fn notify(&self) {
let _ = self.0.set_readiness(mio::Ready::readable());
}
}
impl Default for AcceptNotify {
fn default() -> Self {
AcceptNotify::new(mio::Registration::new2().1)
self.0.wake(WakerInterest::Notify);
}
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `WorkerClient` accordingly.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
cmd_reg: Option<mio::Registration>,
cmd_ready: mio::SetReadiness,
notify_reg: Option<mio::Registration>,
notify_ready: mio::SetReadiness,
tx: sync_mpsc::Sender<Command>,
rx: Option<sync_mpsc::Receiver<Command>>,
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: Server) -> AcceptLoop {
let (tx, rx) = sync_mpsc::channel();
let (cmd_reg, cmd_ready) = mio::Registration::new2();
let (notify_reg, notify_ready) = mio::Registration::new2();
pub fn new(srv: Server) -> Self {
// Create a poll instance.
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e));
AcceptLoop {
tx,
cmd_ready,
cmd_reg: Some(cmd_reg),
notify_ready,
notify_reg: Some(notify_reg),
rx: Some(rx),
// construct a waker queue which would wake up poll with associate extra interest types.
let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap();
Self {
srv: Some(srv),
poll: Some(poll),
waker,
}
}
pub fn send(&self, msg: Command) {
let _ = self.tx.send(msg);
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
pub fn wake_accept(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub fn get_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.notify_ready.clone())
pub fn get_accept_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.waker.clone())
}
pub(crate) fn start(
pub(crate) fn start_accept(
&mut self,
socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(
self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
socks,
srv,
workers,
);
Accept::start(poll, waker, socks, srv, workers);
}
}
/// poll instance of the server.
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
poll: Poll,
waker: WakerQueue,
workers: Vec<WorkerClient>,
srv: Server,
timer: (mio::Registration, mio::SetReadiness),
next: usize,
backpressure: bool,
}
const DELTA: usize = 100;
const CMD: mio::Token = mio::Token(0);
const TIMER: mio::Token = mio::Token(1);
const NOTIFY: mio::Token = mio::Token(2);
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
@ -129,11 +110,9 @@ fn connection_error(e: &io::Error) -> bool {
}
impl Accept {
#![allow(clippy::too_many_arguments)]
pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>,
cmd_reg: mio::Registration,
notify_reg: mio::Registration,
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, StdListener)>,
srv: Server,
workers: Vec<WorkerClient>,
@ -141,66 +120,40 @@ impl Accept {
let sys = System::current();
// start accept thread
let _ = thread::Builder::new()
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv);
// Start listening for incoming commands
if let Err(err) = accept.poll.register(
&cmd_reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, workers, srv);
accept.poll_with(sockets);
})
.unwrap();
}
// Start listening for notify updates
if let Err(err) = accept.poll.register(
&notify_reg,
NOTIFY,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
accept.poll();
});
}
fn new(
rx: sync_mpsc::Receiver<Command>,
fn new_with_sockets(
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>,
srv: Server,
) -> Accept {
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Accept and sockets info are separated so that we can borrow mut on both at the same time
) -> (Accept, Slab<ServerSocketInfo>) {
// Start accept
let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() {
let addr = lst.local_addr();
let server = lst.into_listener();
let mut server = lst
.into_listener()
.unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e));
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
poll.registry()
.register(&mut server, MioToken(token + DELTA), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
entry.insert(ServerSocketInfo {
addr,
@ -210,67 +163,124 @@ impl Accept {
});
}
// Timer
let (tm, tmr) = mio::Registration::new2();
if let Err(err) =
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
Accept {
let accept = Accept {
poll,
rx,
sockets,
waker,
workers,
srv,
next: 0,
timer: (tm, tmr),
backpressure: false,
}
};
(accept, sockets)
}
fn poll(&mut self) {
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {
// Create storage for events
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(err) = self.poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
self.poll
.poll(&mut events, None)
.unwrap_or_else(|e| panic!("Poll error: {}", e));
for event in events.iter() {
let token = event.token();
match token {
CMD => {
if !self.process_cmd() {
// This is a loop because interests for command were a loop that would try to
// drain the command channel. We break at first iter with other kind interests.
WAKER_TOKEN => 'waker: loop {
match self.waker.pop() {
Ok(i) => {
match i {
WakerInterest::Pause => {
for (_, info) in sockets.iter_mut() {
if let Err(err) =
self.poll.registry().deregister(&mut info.sock)
{
error!(
"Can not deregister server socket {}",
err
);
} else {
info!(
"Paused accepting connections on {}",
info.addr
);
}
}
}
WakerInterest::Resume => {
for (token, info) in sockets.iter_mut() {
if let Err(err) = self.register(token, info) {
error!(
"Can not resume socket accept process: {}",
err
);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
WakerInterest::Stop => {
for (_, info) in sockets.iter_mut() {
let _ =
self.poll.registry().deregister(&mut info.sock);
}
return;
}
WakerInterest::Worker(worker) => {
self.backpressure(&mut sockets, false);
self.workers.push(worker);
}
TIMER => self.process_timer(),
NOTIFY => self.backpressure(false),
// timer and notify interests need to break the loop at first iter.
WakerInterest::Timer => {
self.process_timer(&mut sockets);
break 'waker;
}
WakerInterest::Notify => {
self.backpressure(&mut sockets, false);
break 'waker;
}
}
}
Err(err) => match err {
// the waker queue is empty so we break the loop
WakerQueueError::Empty => break 'waker,
// the waker queue is closed so we return
WakerQueueError::Closed => {
for (_, info) in sockets.iter_mut() {
let _ = self.poll.registry().deregister(&mut info.sock);
}
return;
}
},
}
},
_ => {
let token = usize::from(token);
if token < DELTA {
continue;
}
self.accept(token - DELTA);
self.accept(&mut sockets, token - DELTA);
}
}
}
}
}
fn process_timer(&mut self) {
fn process_timer(&mut self, sockets: &mut Slab<ServerSocketInfo>) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
for (token, info) in sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
if let Err(err) = self.poll.registry().register(
&mut info.sock,
MioToken(token + DELTA),
Interest::READABLE,
) {
error!("Can not register server socket {}", err);
} else {
@ -283,63 +293,12 @@ impl Accept {
}
}
fn process_cmd(&mut self) -> bool {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Command::Resume => {
for (token, info) in self.sockets.iter() {
if let Err(err) = self.register(token, info) {
error!("Can not resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
Command::Stop => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
Command::Worker(worker) => {
self.backpressure(false);
self.workers.push(worker);
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
},
}
}
true
}
#[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> {
self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().register(
&mut info.sock,
MioToken(token + DELTA),
Interest::READABLE,
)
}
@ -365,11 +324,11 @@ impl Accept {
})
}
fn backpressure(&mut self, on: bool) {
fn backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter() {
for (token, info) in sockets.iter_mut() {
if let Err(err) = self.register(token, info) {
error!("Can not resume socket accept process: {}", err);
} else {
@ -379,13 +338,13 @@ impl Accept {
}
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
for (_, info) in sockets.iter_mut() {
let _ = self.poll.registry().deregister(&mut info.sock);
}
}
}
fn accept_one(&mut self, mut msg: Conn) {
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
@ -422,7 +381,7 @@ impl Accept {
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
self.backpressure(true);
self.backpressure(sockets, true);
return;
} else if self.workers.len() <= self.next {
self.next = 0;
@ -434,14 +393,14 @@ impl Accept {
self.next = (self.next + 1) % self.workers.len();
}
// enable backpressure
self.backpressure(true);
self.accept_one(msg);
self.backpressure(sockets, true);
self.accept_one(sockets, msg);
}
}
fn accept(&mut self, token: usize) {
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
let msg = if let Some(info) = sockets.get_mut(token) {
match info.sock.accept() {
Ok(Some((io, addr))) => Conn {
io,
@ -453,17 +412,17 @@ impl Accept {
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.deregister(&info.sock) {
if let Err(err) = self.poll.registry().deregister(&mut info.sock) {
error!("Can not deregister server socket {}", err);
}
// sleep after error
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
let w = self.waker.clone();
System::current().arbiter().send(Box::pin(async move {
delay_until(Instant::now() + Duration::from_millis(510)).await;
let _ = r.set_readiness(mio::Ready::readable());
sleep_until(Instant::now() + Duration::from_millis(510)).await;
w.wake(WakerInterest::Timer);
}));
return;
}
@ -472,7 +431,7 @@ impl Accept {
return;
};
self.accept_one(msg);
self.accept_one(sockets, msg);
}
}
}

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use std::{io, mem, net};
use actix_rt::net::TcpStream;
use actix_rt::time::{delay_until, Instant};
use actix_rt::time::{sleep_until, Instant};
use actix_rt::{spawn, System};
use futures_channel::mpsc::{unbounded, UnboundedReceiver};
use futures_channel::oneshot;
@ -14,12 +14,13 @@ use futures_util::{future::Future, ready, stream::Stream, FutureExt, StreamExt};
use log::{error, info};
use socket2::{Domain, Protocol, Socket, Type};
use crate::accept::{AcceptLoop, AcceptNotify, Command};
use crate::accept::{AcceptLoop, AcceptNotify};
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::waker_queue::WakerInterest;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::Token;
@ -265,7 +266,7 @@ impl ServerBuilder {
// start workers
let workers = (0..self.threads)
.map(|idx| {
let worker = self.start_worker(idx, self.accept.get_notify());
let worker = self.start_worker(idx, self.accept.get_accept_notify());
self.workers.push((idx, worker.clone()));
worker
@ -276,7 +277,7 @@ impl ServerBuilder {
for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2);
}
self.accept.start(
self.accept.start_accept(
mem::take(&mut self.sockets)
.into_iter()
.map(|t| (t.0, t.2))
@ -307,11 +308,11 @@ impl ServerBuilder {
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.send(Command::Pause);
self.accept.wake_accept(WakerInterest::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.send(Command::Resume);
self.accept.wake_accept(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => {
@ -355,7 +356,7 @@ impl ServerBuilder {
let exit = self.exit;
// stop accept thread
self.accept.send(Command::Stop);
self.accept.wake_accept(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify);
// stop workers
@ -376,7 +377,7 @@ impl ServerBuilder {
if exit {
spawn(
async {
delay_until(
sleep_until(
Instant::now() + Duration::from_millis(300),
)
.await;
@ -392,7 +393,7 @@ impl ServerBuilder {
// we need to stop system if server was spawned
if self.exit {
spawn(
delay_until(Instant::now() + Duration::from_millis(300)).then(
sleep_until(Instant::now() + Duration::from_millis(300)).then(
|_| {
System::current().stop();
ready(())
@ -432,9 +433,9 @@ impl ServerBuilder {
break;
}
let worker = self.start_worker(new_idx, self.accept.get_notify());
let worker = self.start_worker(new_idx, self.accept.get_accept_notify());
self.workers.push((new_idx, worker.clone()));
self.accept.send(Command::Worker(worker));
self.accept.wake_accept(WakerInterest::Worker(worker));
}
}
}

View File

@ -9,6 +9,7 @@ mod server;
mod service;
mod signals;
mod socket;
mod waker_queue;
mod worker;
pub use self::builder::ServerBuilder;

View File

@ -11,12 +11,12 @@ use futures_util::{FutureExt, TryFutureExt};
use log::error;
use super::Token;
use crate::socket::{FromStream, StdStream};
use crate::socket::{FromStream, MioStream};
/// Server message
pub(crate) enum ServerMessage {
/// New stream
Connect(StdStream),
Connect(MioStream),
/// Gracefully shutdown
Shutdown(Duration),
@ -77,7 +77,7 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req {
ServerMessage::Connect(stream) => {
let stream = FromStream::from_stdstream(stream).map_err(|e| {
let stream = FromStream::from_mio_stream(stream).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});

View File

@ -4,6 +4,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::future::lazy;
use futures_util::stream::Stream;
use crate::server::Server;
@ -87,7 +88,7 @@ impl Future for Signals {
{
for idx in 0..self.streams.len() {
loop {
match self.streams[idx].1.poll_recv(cx) {
match Pin::new(&mut self.streams[idx].1).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break,
Poll::Ready(Some(_)) => {

View File

@ -1,7 +1,18 @@
use std::{fmt, io, net};
use mio::event::Source;
use mio::net::{
TcpListener as MioTcpListener, TcpStream as MioTcpStream, UnixListener as MioUnixListener,
UnixStream as MioUnixStream,
};
use mio::{Interest, Registry, Token};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream;
use actix_rt::net::{TcpStream, UnixStream};
/// socket module contains a unified wrapper for Tcp/Uds listener/SocketAddr/Stream and necessary
/// trait impl for registering the listener to mio::Poll and convert stream to
/// `actix_rt::net::{TcpStream, UnixStream}`.
pub(crate) enum StdListener {
Tcp(net::TcpListener),
@ -12,7 +23,7 @@ pub(crate) enum StdListener {
pub(crate) enum SocketAddr {
Tcp(net::SocketAddr),
#[cfg(all(unix))]
Uds(std::os::unix::net::SocketAddr),
Uds(mio::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
@ -50,86 +61,93 @@ impl StdListener {
match self {
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
#[cfg(all(unix))]
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
StdListener::Uds(_lst) => {
// FixMe: How to get a SocketAddr?
unimplemented!()
// SocketAddr::Uds(lst.local_addr().unwrap())
}
}
}
pub(crate) fn into_listener(self) -> SocketListener {
pub(crate) fn into_listener(self) -> std::io::Result<MioSocketListener> {
match self {
StdListener::Tcp(lst) => SocketListener::Tcp(
mio::net::TcpListener::from_std(lst)
.expect("Can not create mio::net::TcpListener"),
),
StdListener::Tcp(lst) => {
// ToDo: is this non_blocking a good practice?
lst.set_nonblocking(true)?;
Ok(MioSocketListener::Tcp(mio::net::TcpListener::from_std(lst)))
}
#[cfg(all(unix))]
StdListener::Uds(lst) => SocketListener::Uds(
mio_uds::UnixListener::from_listener(lst)
.expect("Can not create mio_uds::UnixListener"),
),
StdListener::Uds(lst) => {
// ToDo: the same as above
lst.set_nonblocking(true)?;
Ok(MioSocketListener::Uds(mio::net::UnixListener::from_std(
lst,
)))
}
}
}
}
#[derive(Debug)]
pub enum StdStream {
Tcp(std::net::TcpStream),
pub enum MioStream {
Tcp(MioTcpStream),
#[cfg(all(unix))]
Uds(std::os::unix::net::UnixStream),
Uds(MioUnixStream),
}
pub(crate) enum SocketListener {
Tcp(mio::net::TcpListener),
pub(crate) enum MioSocketListener {
Tcp(MioTcpListener),
#[cfg(all(unix))]
Uds(mio_uds::UnixListener),
Uds(MioUnixListener),
}
impl SocketListener {
pub(crate) fn accept(&self) -> io::Result<Option<(StdStream, SocketAddr)>> {
impl MioSocketListener {
pub(crate) fn accept(&self) -> io::Result<Option<(MioStream, SocketAddr)>> {
match *self {
SocketListener::Tcp(ref lst) => lst
.accept_std()
.map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))),
MioSocketListener::Tcp(ref lst) => lst
.accept()
.map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))),
#[cfg(all(unix))]
SocketListener::Uds(ref lst) => lst.accept_std().map(|res| {
res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr)))
}),
MioSocketListener::Uds(ref lst) => lst
.accept()
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))),
}
}
}
impl mio::Evented for SocketListener {
impl Source for MioSocketListener {
fn register(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts),
MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests),
#[cfg(all(unix))]
SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts),
MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests),
}
}
fn reregister(
&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts),
MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests),
#[cfg(all(unix))]
SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts),
MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests),
}
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
match *self {
SocketListener::Tcp(ref lst) => lst.deregister(poll),
MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry),
#[cfg(all(unix))]
SocketListener::Uds(ref lst) => {
let res = lst.deregister(poll);
MioSocketListener::Uds(ref mut lst) => {
let res = lst.deregister(registry);
// cleanup file path
if let Ok(addr) = lst.local_addr() {
@ -143,16 +161,21 @@ impl mio::Evented for SocketListener {
}
}
/// helper trait for converting mio stream to tokio stream.
pub trait FromStream: AsyncRead + AsyncWrite + Sized {
fn from_stdstream(sock: StdStream) -> io::Result<Self>;
fn from_mio_stream(sock: MioStream) -> io::Result<Self>;
}
impl FromStream for TcpStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(stream) => TcpStream::from_std(stream),
MioStream::Tcp(stream) => {
// FixMe: this only works on unix. We possibly want TcpStream::new from tokio.
let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream);
TcpStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) })
}
#[cfg(all(unix))]
StdStream::Uds(_) => {
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
@ -160,11 +183,15 @@ impl FromStream for TcpStream {
}
#[cfg(all(unix))]
impl FromStream for actix_rt::net::UnixStream {
fn from_stdstream(sock: StdStream) -> io::Result<Self> {
impl FromStream for UnixStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> {
match sock {
StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
StdStream::Uds(stream) => actix_rt::net::UnixStream::from_std(stream),
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(stream) => {
// FixMe: this only works on unix. Like for TcpStream.
let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream);
UnixStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) })
}
}
}
}

View File

@ -0,0 +1,85 @@
use std::sync::Arc;
use concurrent_queue::{ConcurrentQueue, PopError};
use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerClient;
use futures_util::core_reexport::fmt::Formatter;
use std::fmt::Debug;
/// waker token for `mio::Poll` instance
pub(crate) const WAKER_TOKEN: MioToken = MioToken(1);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// we want `Poll` to look into.
pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue<WakerInterest>)>);
impl Clone for WakerQueue {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl WakerQueue {
/// construct a waker queue with given `Poll`'s `Registry` and capacity.
///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
/// event for it to properly handle `WakerInterest`.
pub(crate) fn with_capacity(registry: &Registry, cap: usize) -> std::io::Result<Self> {
let waker = Waker::new(registry, WAKER_TOKEN)?;
let queue = ConcurrentQueue::bounded(cap);
Ok(Self(Arc::new((waker, queue))))
}
/// push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) {
// ToDo: should we handle error here?
let r = (self.0).1.push(interest);
assert!(r.is_ok());
(self.0).0.wake().expect("can not wake up Accept Poll");
}
/// pop an `Interests` from the back of the queue.
pub(crate) fn pop(&self) -> Result<WakerInterest, WakerQueueError> {
(self.0).1.pop()
}
}
/// the types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
///
/// *. These interests should not confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest {
/// Interest from `WorkerClient` notifying `Accept` to run `backpressure` method
Notify,
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause,
Resume,
Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection the poll would deregister the socket temporary and wake up the poll and register
/// again after the delayed future resolve.
Timer,
Worker(WorkerClient),
}
impl Debug for WakerInterest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("WakerInterest");
match *self {
Self::Notify => f.field("type", &"notify"),
Self::Pause => f.field("type", &"pause"),
Self::Resume => f.field("type", &"resume"),
Self::Stop => f.field("type", &"stop"),
Self::Timer => f.field("type", &"timer"),
Self::Worker(_) => f.field("type", &"worker"),
};
f.finish()
}
}
pub(crate) type WakerQueueError = PopError;

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time;
use actix_rt::time::{delay_until, Delay, Instant};
use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter;
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
@ -15,7 +15,7 @@ use log::{error, info, trace};
use crate::accept::AcceptNotify;
use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{SocketAddr, StdStream};
use crate::socket::{MioStream, SocketAddr};
use crate::Token;
pub(crate) struct WorkerCommand(Conn);
@ -29,7 +29,7 @@ pub(crate) struct StopCommand {
#[derive(Debug)]
pub(crate) struct Conn {
pub io: StdStream,
pub io: MioStream,
pub token: Token,
pub peer: Option<SocketAddr>,
}
@ -307,8 +307,8 @@ enum WorkerState {
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
),
Shutdown(
Pin<Box<Delay>>,
Pin<Box<Delay>>,
Pin<Box<Sleep>>,
Pin<Box<Sleep>>,
Option<oneshot::Sender<bool>>,
),
}
@ -335,8 +335,8 @@ impl Future for Worker {
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))),
Box::pin(delay_until(Instant::now() + self.shutdown_timeout)),
Box::pin(sleep_until(Instant::now() + time::Duration::from_secs(1))),
Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)),
Some(result),
);
} else {
@ -437,7 +437,7 @@ impl Future for Worker {
match t1.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
*t1 = Box::pin(delay_until(
*t1 = Box::pin(sleep_until(
Instant::now() + time::Duration::from_secs(1),
));
let _ = t1.as_mut().poll(cx);

View File

@ -90,11 +90,13 @@ fn test_start() {
})
})
.unwrap()
.workers(1)
.start();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
});
let (srv, sys) = rx.recv().unwrap();
let mut buf = [1u8; 4];

View File

@ -1 +0,0 @@
../LICENSE-APACHE

201
actix-utils/LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2017-NOW Nikolay Kim
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1 +0,0 @@
../LICENSE-MIT

25
actix-utils/LICENSE-MIT Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2017 Nikolay Kim
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -131,7 +131,7 @@ mod tests {
}
fn call(&mut self, _: ()) -> Self::Future {
actix_rt::time::delay_for(self.0)
actix_rt::time::sleep(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
}

View File

@ -5,7 +5,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::time::{delay_until, Delay, Instant};
use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, Ready};
@ -71,7 +71,7 @@ pub struct KeepAliveService<R, E, F> {
f: F,
ka: Duration,
time: LowResTimeService,
delay: Delay,
delay: Sleep,
expire: Instant,
_t: PhantomData<(R, E)>,
}
@ -87,7 +87,7 @@ where
ka,
time,
expire,
delay: delay_until(expire),
delay: sleep_until(expire),
_t: PhantomData,
}
}

View File

@ -4,7 +4,7 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use actix_rt::time::delay_for;
use actix_rt::time::sleep;
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, ready, FutureExt, Ready};
@ -79,7 +79,7 @@ impl LowResTimeService {
b.resolution
};
actix_rt::spawn(delay_for(interval).then(move |_| {
actix_rt::spawn(sleep(interval).then(move |_| {
inner.borrow_mut().current.take();
ready(())
}));
@ -144,7 +144,7 @@ impl SystemTimeService {
b.resolution
};
actix_rt::spawn(delay_for(interval).then(move |_| {
actix_rt::spawn(sleep(interval).then(move |_| {
inner.borrow_mut().current.take();
ready(())
}));
@ -195,7 +195,7 @@ mod tests {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
delay_for(wait_time).await;
sleep(wait_time).await;
let second_time = time_service
.now()
@ -217,7 +217,7 @@ mod tests {
let first_time = time_service.now();
delay_for(wait_time).await;
sleep(wait_time).await;
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);

View File

@ -8,7 +8,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, time};
use actix_rt::time::{delay_for, Delay};
use actix_rt::time::{sleep, Sleep};
use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
@ -135,7 +135,7 @@ where
fn call(&mut self, request: S::Request) -> Self::Future {
TimeoutServiceResponse {
fut: self.service.call(request),
sleep: delay_for(self.timeout),
sleep: sleep(self.timeout),
}
}
}
@ -146,7 +146,7 @@ where
pub struct TimeoutServiceResponse<T: Service> {
#[pin]
fut: T::Future,
sleep: Delay,
sleep: Sleep,
}
impl<T> Future for TimeoutServiceResponse<T>
@ -195,7 +195,7 @@ mod tests {
}
fn call(&mut self, _: ()) -> Self::Future {
actix_rt::time::delay_for(self.0)
actix_rt::time::sleep(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
}