diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index f834fdaf..9f73cb80 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -22,5 +22,5 @@ futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } log = "0.4" pin-project = "0.4.17" -tokio = { version = "0.2.5", default-features = false } +tokio = "0.3.0" tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] } diff --git a/actix-codec/LICENSE-APACHE b/actix-codec/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-codec/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-codec/LICENSE-APACHE b/actix-codec/LICENSE-APACHE new file mode 100644 index 00000000..6cdf2d16 --- /dev/null +++ b/actix-codec/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017-NOW Nikolay Kim + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/actix-codec/LICENSE-MIT b/actix-codec/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-codec/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-codec/LICENSE-MIT b/actix-codec/LICENSE-MIT new file mode 100644 index 00000000..0f80296a --- /dev/null +++ b/actix-codec/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 Nikolay Kim + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 844f20d8..96d0df49 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -220,11 +220,14 @@ impl Framed { if remaining < LW { this.read_buf.reserve(HW - remaining) } - let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - Poll::Ready(Ok(cnt)) => cnt, - }; + + // FixMe: This must be fixed as `poll_read_buf` is removed + // let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) { + // Poll::Pending => return Poll::Pending, + // Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + // Poll::Ready(Ok(cnt)) => cnt, + // }; + let cnt = 0; if cnt == 0 { this.flags.insert(Flags::EOF); diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 233195c5..c423981f 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -46,7 +46,7 @@ trust-dns-resolver = { version = "0.19", default-features = false, features = [" # openssl open-ssl = { package = "openssl", version = "0.10", optional = true } -tokio-openssl = { version = "0.4.0", optional = true } +tokio-openssl = { version = "0.5.0", optional = true } # rustls rust-tls = { package = "rustls", version = "0.18.0", optional = true } diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index b7d272cd..4eced1a4 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -22,7 +22,7 @@ futures-channel = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } copyless = "0.1.4" smallvec = "1" -tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } +tokio = { version = "0.3.0", features = ["rt", "net", "signal", "stream", "time"] } [dev-dependencies] -tokio = { version = "0.2.6", features = ["full"] } +tokio = { version = "0.3.0", features = ["full"] } diff --git a/actix-rt/LICENSE-APACHE b/actix-rt/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-rt/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-rt/LICENSE-APACHE b/actix-rt/LICENSE-APACHE new file mode 100644 index 00000000..6cdf2d16 --- /dev/null +++ b/actix-rt/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017-NOW Nikolay Kim + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/actix-rt/LICENSE-MIT b/actix-rt/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-rt/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-rt/LICENSE-MIT b/actix-rt/LICENSE-MIT new file mode 100644 index 00000000..0f80296a --- /dev/null +++ b/actix-rt/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 Nikolay Kim + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index aef78f12..e75076ae 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -60,7 +60,7 @@ pub mod net { /// Utilities for tracking time. pub mod time { pub use tokio::time::Instant; - pub use tokio::time::{delay_for, delay_until, Delay}; pub use tokio::time::{interval, interval_at, Interval}; + pub use tokio::time::{sleep, sleep_until, Sleep}; pub use tokio::time::{timeout, Timeout}; } diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index bbafb6f0..b3726329 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -18,10 +18,9 @@ impl Runtime { #[allow(clippy::new_ret_no_self)] /// Returns a new runtime initialized with default configuration values. pub fn new() -> io::Result { - let rt = runtime::Builder::new() + let rt = runtime::Builder::new_current_thread() .enable_io() .enable_time() - .basic_scheduler() .build()?; Ok(Runtime { diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index f33854ba..651c9d5b 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -95,10 +95,9 @@ impl System { /// } /// /// - /// let mut runtime = tokio::runtime::Builder::new() - /// .core_threads(2) + /// let runtime = tokio::runtime::Builder::new_multi_thread() + /// .worker_threads(2) /// .enable_all() - /// .threaded_scheduler() /// .build() /// .unwrap(); /// @@ -166,10 +165,9 @@ impl System { /// } /// /// - /// let runtime = tokio::runtime::Builder::new() - /// .core_threads(2) + /// let runtime = tokio::runtime::Builder::new_multi_thread() + /// .worker_threads(2) /// .enable_all() - /// .threaded_scheduler() /// .build() /// .unwrap(); /// @@ -178,7 +176,7 @@ impl System { /// ``` pub fn attach_to_tokio( name: impl Into, - mut runtime: tokio::runtime::Runtime, + runtime: tokio::runtime::Runtime, rest_operations: Fut, ) -> R where diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 8e775bab..c1d2b910 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -19,7 +19,7 @@ fn await_for_timer() { let time = Duration::from_secs(2); let instant = Instant::now(); actix_rt::System::new("test_wait_timer").block_on(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; }); assert!( instant.elapsed() >= time, @@ -34,7 +34,7 @@ fn join_another_arbiter() { actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); arbiter.send(Box::pin(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); arbiter.join().unwrap(); @@ -49,7 +49,7 @@ fn join_another_arbiter() { let mut arbiter = actix_rt::Arbiter::new(); arbiter.exec_fn(move || { actix_rt::spawn(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); }); }); @@ -64,7 +64,7 @@ fn join_another_arbiter() { actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); arbiter.send(Box::pin(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); arbiter.stop(); @@ -83,7 +83,7 @@ fn join_current_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_current_arbiter").block_on(async move { actix_rt::spawn(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); }); actix_rt::Arbiter::local_join().await; @@ -97,12 +97,12 @@ fn join_current_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_current_arbiter").block_on(async move { actix_rt::spawn(async move { - tokio::time::delay_for(time).await; + tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); }); let f = actix_rt::Arbiter::local_join(); actix_rt::spawn(async move { - tokio::time::delay_for(large_timer).await; + tokio::time::sleep(large_timer).await; actix_rt::Arbiter::current().stop(); }); f.await; diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 1a67f61c..4e24ce2c 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -25,20 +25,17 @@ actix-rt = "1.1.1" actix-codec = "0.3.0" actix-utils = "2.0.0" -log = "0.4" -num_cpus = "1.13" -mio = "0.6.19" -socket2 = "0.3" +concurrent-queue = "1.2.2" futures-channel = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } +log = "0.4" +mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } +num_cpus = "1.13" slab = "0.4" - -# unix domain sockets -# FIXME: Remove it and use mio own uds feature once mio 0.7 is released -mio-uds = { version = "0.6.7" } +socket2 = "0.3" [dev-dependencies] bytes = "0.5" env_logger = "0.7" actix-testing = "1.0.0" -tokio = { version = "0.2", features = ["io-util"] } +tokio = { version = "0.3.0", features = ["full"] } diff --git a/actix-server/LICENSE-APACHE b/actix-server/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-server/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-server/LICENSE-APACHE b/actix-server/LICENSE-APACHE new file mode 100644 index 00000000..6cdf2d16 --- /dev/null +++ b/actix-server/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017-NOW Nikolay Kim + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/actix-server/LICENSE-MIT b/actix-server/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-server/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-server/LICENSE-MIT b/actix-server/LICENSE-MIT new file mode 100644 index 00000000..0f80296a --- /dev/null +++ b/actix-server/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 Nikolay Kim + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/actix-server/examples/basic.rs b/actix-server/examples/basic.rs index 45e473a9..9a442f99 100644 --- a/actix-server/examples/basic.rs +++ b/actix-server/examples/basic.rs @@ -21,7 +21,7 @@ use actix_service::pipeline_factory; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadBuf}; #[actix_rt::main] async fn main() -> io::Result<()> { @@ -49,10 +49,10 @@ async fn main() -> io::Result<()> { let num = num + 1; let mut size = 0; - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); loop { - match stream.read_buf(&mut buf).await { + match stream.read(&mut buf).await { // end of stream; bail from loop Ok(0) => break, @@ -72,7 +72,7 @@ async fn main() -> io::Result<()> { } // send data down service pipeline - Ok((buf.freeze(), size)) + Ok((buf.len(), size)) } }) .map_err(|err| error!("Service Error: {:?}", err)) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 4dc218fd..4227f536 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,119 +1,100 @@ -use std::sync::mpsc as sync_mpsc; use std::time::Duration; + use std::{io, thread}; -use actix_rt::time::{delay_until, Instant}; +use actix_rt::time::{sleep_until, Instant}; use actix_rt::System; use log::{error, info}; +use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; use crate::server::Server; -use crate::socket::{SocketAddr, SocketListener, StdListener}; +use crate::socket::{MioSocketListener, SocketAddr, StdListener}; +use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN}; use crate::worker::{Conn, WorkerClient}; use crate::Token; -pub(crate) enum Command { - Pause, - Resume, - Stop, - Worker(WorkerClient), -} - struct ServerSocketInfo { addr: SocketAddr, token: Token, - sock: SocketListener, + sock: MioSocketListener, timeout: Option, } +/// accept notify would clone waker queue from accept loop and use it to push new interest and wake +/// up the accept poll. #[derive(Clone)] -pub(crate) struct AcceptNotify(mio::SetReadiness); +pub(crate) struct AcceptNotify(WakerQueue); impl AcceptNotify { - pub(crate) fn new(ready: mio::SetReadiness) -> Self { - AcceptNotify(ready) + pub(crate) fn new(waker: WakerQueue) -> Self { + Self(waker) } pub(crate) fn notify(&self) { - let _ = self.0.set_readiness(mio::Ready::readable()); - } -} - -impl Default for AcceptNotify { - fn default() -> Self { - AcceptNotify::new(mio::Registration::new2().1) + self.0.wake(WakerInterest::Notify); } } +/// Accept loop would live with `ServerBuilder`. +/// +/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to +/// `Accept` and `WorkerClient` accordingly. +/// +/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. pub(crate) struct AcceptLoop { - cmd_reg: Option, - cmd_ready: mio::SetReadiness, - notify_reg: Option, - notify_ready: mio::SetReadiness, - tx: sync_mpsc::Sender, - rx: Option>, srv: Option, + poll: Option, + waker: WakerQueue, } impl AcceptLoop { - pub fn new(srv: Server) -> AcceptLoop { - let (tx, rx) = sync_mpsc::channel(); - let (cmd_reg, cmd_ready) = mio::Registration::new2(); - let (notify_reg, notify_ready) = mio::Registration::new2(); + pub fn new(srv: Server) -> Self { + // Create a poll instance. + let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e)); - AcceptLoop { - tx, - cmd_ready, - cmd_reg: Some(cmd_reg), - notify_ready, - notify_reg: Some(notify_reg), - rx: Some(rx), + // construct a waker queue which would wake up poll with associate extra interest types. + let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap(); + + Self { srv: Some(srv), + poll: Some(poll), + waker, } } - pub fn send(&self, msg: Command) { - let _ = self.tx.send(msg); - let _ = self.cmd_ready.set_readiness(mio::Ready::readable()); + pub fn wake_accept(&self, i: WakerInterest) { + self.waker.wake(i); } - pub fn get_notify(&self) -> AcceptNotify { - AcceptNotify::new(self.notify_ready.clone()) + pub fn get_accept_notify(&self) -> AcceptNotify { + AcceptNotify::new(self.waker.clone()) } - pub(crate) fn start( + pub(crate) fn start_accept( &mut self, socks: Vec<(Token, StdListener)>, workers: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); + let poll = self.poll.take().unwrap(); + let waker = self.waker.clone(); - Accept::start( - self.rx.take().expect("Can not re-use AcceptInfo"), - self.cmd_reg.take().expect("Can not re-use AcceptInfo"), - self.notify_reg.take().expect("Can not re-use AcceptInfo"), - socks, - srv, - workers, - ); + Accept::start(poll, waker, socks, srv, workers); } } +/// poll instance of the server. struct Accept { - poll: mio::Poll, - rx: sync_mpsc::Receiver, - sockets: Slab, + poll: Poll, + waker: WakerQueue, workers: Vec, srv: Server, - timer: (mio::Registration, mio::SetReadiness), next: usize, backpressure: bool, } const DELTA: usize = 100; -const CMD: mio::Token = mio::Token(0); -const TIMER: mio::Token = mio::Token(1); -const NOTIFY: mio::Token = mio::Token(2); /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means @@ -129,11 +110,9 @@ fn connection_error(e: &io::Error) -> bool { } impl Accept { - #![allow(clippy::too_many_arguments)] pub(crate) fn start( - rx: sync_mpsc::Receiver, - cmd_reg: mio::Registration, - notify_reg: mio::Registration, + poll: Poll, + waker: WakerQueue, socks: Vec<(Token, StdListener)>, srv: Server, workers: Vec, @@ -141,66 +120,40 @@ impl Accept { let sys = System::current(); // start accept thread - let _ = thread::Builder::new() + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let mut accept = Accept::new(rx, socks, workers, srv); - - // Start listening for incoming commands - if let Err(err) = accept.poll.register( - &cmd_reg, - CMD, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register Registration: {}", err); - } - - // Start listening for notify updates - if let Err(err) = accept.poll.register( - ¬ify_reg, - NOTIFY, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register Registration: {}", err); - } - - accept.poll(); - }); + let (mut accept, sockets) = + Accept::new_with_sockets(poll, waker, socks, workers, srv); + accept.poll_with(sockets); + }) + .unwrap(); } - fn new( - rx: sync_mpsc::Receiver, + fn new_with_sockets( + poll: Poll, + waker: WakerQueue, socks: Vec<(Token, StdListener)>, workers: Vec, srv: Server, - ) -> Accept { - // Create a poll instance - let poll = match mio::Poll::new() { - Ok(poll) => poll, - Err(err) => panic!("Can not create mio::Poll: {}", err), - }; - + // Accept and sockets info are separated so that we can borrow mut on both at the same time + ) -> (Accept, Slab) { // Start accept let mut sockets = Slab::new(); for (hnd_token, lst) in socks.into_iter() { let addr = lst.local_addr(); - let server = lst.into_listener(); + let mut server = lst + .into_listener() + .unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e)); let entry = sockets.vacant_entry(); let token = entry.key(); // Start listening for incoming connections - if let Err(err) = poll.register( - &server, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register io: {}", err); - } + poll.registry() + .register(&mut server, MioToken(token + DELTA), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); entry.insert(ServerSocketInfo { addr, @@ -210,67 +163,124 @@ impl Accept { }); } - // Timer - let (tm, tmr) = mio::Registration::new2(); - if let Err(err) = - poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register Registration: {}", err); - } - - Accept { + let accept = Accept { poll, - rx, - sockets, + waker, workers, srv, next: 0, - timer: (tm, tmr), backpressure: false, - } + }; + + (accept, sockets) } - fn poll(&mut self) { + fn poll_with(&mut self, mut sockets: Slab) { // Create storage for events let mut events = mio::Events::with_capacity(128); loop { - if let Err(err) = self.poll.poll(&mut events, None) { - panic!("Poll error: {}", err); - } + self.poll + .poll(&mut events, None) + .unwrap_or_else(|e| panic!("Poll error: {}", e)); for event in events.iter() { let token = event.token(); match token { - CMD => { - if !self.process_cmd() { - return; + // This is a loop because interests for command were a loop that would try to + // drain the command channel. We break at first iter with other kind interests. + WAKER_TOKEN => 'waker: loop { + match self.waker.pop() { + Ok(i) => { + match i { + WakerInterest::Pause => { + for (_, info) in sockets.iter_mut() { + if let Err(err) = + self.poll.registry().deregister(&mut info.sock) + { + error!( + "Can not deregister server socket {}", + err + ); + } else { + info!( + "Paused accepting connections on {}", + info.addr + ); + } + } + } + WakerInterest::Resume => { + for (token, info) in sockets.iter_mut() { + if let Err(err) = self.register(token, info) { + error!( + "Can not resume socket accept process: {}", + err + ); + } else { + info!( + "Accepting connections on {} has been resumed", + info.addr + ); + } + } + } + WakerInterest::Stop => { + for (_, info) in sockets.iter_mut() { + let _ = + self.poll.registry().deregister(&mut info.sock); + } + return; + } + WakerInterest::Worker(worker) => { + self.backpressure(&mut sockets, false); + self.workers.push(worker); + } + // timer and notify interests need to break the loop at first iter. + WakerInterest::Timer => { + self.process_timer(&mut sockets); + break 'waker; + } + WakerInterest::Notify => { + self.backpressure(&mut sockets, false); + break 'waker; + } + } + } + Err(err) => match err { + // the waker queue is empty so we break the loop + WakerQueueError::Empty => break 'waker, + // the waker queue is closed so we return + WakerQueueError::Closed => { + for (_, info) in sockets.iter_mut() { + let _ = self.poll.registry().deregister(&mut info.sock); + } + return; + } + }, } - } - TIMER => self.process_timer(), - NOTIFY => self.backpressure(false), + }, _ => { let token = usize::from(token); if token < DELTA { continue; } - self.accept(token - DELTA); + self.accept(&mut sockets, token - DELTA); } } } } } - fn process_timer(&mut self) { + fn process_timer(&mut self, sockets: &mut Slab) { let now = Instant::now(); - for (token, info) in self.sockets.iter_mut() { + for (token, info) in sockets.iter_mut() { if let Some(inst) = info.timeout.take() { if now > inst { - if let Err(err) = self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), + if let Err(err) = self.poll.registry().register( + &mut info.sock, + MioToken(token + DELTA), + Interest::READABLE, ) { error!("Can not register server socket {}", err); } else { @@ -283,63 +293,12 @@ impl Accept { } } - fn process_cmd(&mut self) -> bool { - loop { - match self.rx.try_recv() { - Ok(cmd) => match cmd { - Command::Pause => { - for (_, info) in self.sockets.iter_mut() { - if let Err(err) = self.poll.deregister(&info.sock) { - error!("Can not deregister server socket {}", err); - } else { - info!("Paused accepting connections on {}", info.addr); - } - } - } - Command::Resume => { - for (token, info) in self.sockets.iter() { - if let Err(err) = self.register(token, info) { - error!("Can not resume socket accept process: {}", err); - } else { - info!( - "Accepting connections on {} has been resumed", - info.addr - ); - } - } - } - Command::Stop => { - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - return false; - } - Command::Worker(worker) => { - self.backpressure(false); - self.workers.push(worker); - } - }, - Err(err) => match err { - sync_mpsc::TryRecvError::Empty => break, - sync_mpsc::TryRecvError::Disconnected => { - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); - } - return false; - } - }, - } - } - true - } - #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> { - self.poll.register( - &info.sock, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), + fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + self.poll.registry().register( + &mut info.sock, + MioToken(token + DELTA), + Interest::READABLE, ) } @@ -365,11 +324,11 @@ impl Accept { }) } - fn backpressure(&mut self, on: bool) { + fn backpressure(&mut self, sockets: &mut Slab, on: bool) { if self.backpressure { if !on { self.backpressure = false; - for (token, info) in self.sockets.iter() { + for (token, info) in sockets.iter_mut() { if let Err(err) = self.register(token, info) { error!("Can not resume socket accept process: {}", err); } else { @@ -379,13 +338,13 @@ impl Accept { } } else if on { self.backpressure = true; - for (_, info) in self.sockets.iter() { - let _ = self.poll.deregister(&info.sock); + for (_, info) in sockets.iter_mut() { + let _ = self.poll.registry().deregister(&mut info.sock); } } } - fn accept_one(&mut self, mut msg: Conn) { + fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { if self.backpressure { while !self.workers.is_empty() { match self.workers[self.next].send(msg) { @@ -422,7 +381,7 @@ impl Accept { self.workers.swap_remove(self.next); if self.workers.is_empty() { error!("No workers"); - self.backpressure(true); + self.backpressure(sockets, true); return; } else if self.workers.len() <= self.next { self.next = 0; @@ -434,14 +393,14 @@ impl Accept { self.next = (self.next + 1) % self.workers.len(); } // enable backpressure - self.backpressure(true); - self.accept_one(msg); + self.backpressure(sockets, true); + self.accept_one(sockets, msg); } } - fn accept(&mut self, token: usize) { + fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { - let msg = if let Some(info) = self.sockets.get_mut(token) { + let msg = if let Some(info) = sockets.get_mut(token) { match info.sock.accept() { Ok(Some((io, addr))) => Conn { io, @@ -453,17 +412,17 @@ impl Accept { Err(ref e) if connection_error(e) => continue, Err(e) => { error!("Error accepting connection: {}", e); - if let Err(err) = self.poll.deregister(&info.sock) { + if let Err(err) = self.poll.registry().deregister(&mut info.sock) { error!("Can not deregister server socket {}", err); } // sleep after error info.timeout = Some(Instant::now() + Duration::from_millis(500)); - let r = self.timer.1.clone(); + let w = self.waker.clone(); System::current().arbiter().send(Box::pin(async move { - delay_until(Instant::now() + Duration::from_millis(510)).await; - let _ = r.set_readiness(mio::Ready::readable()); + sleep_until(Instant::now() + Duration::from_millis(510)).await; + w.wake(WakerInterest::Timer); })); return; } @@ -472,7 +431,7 @@ impl Accept { return; }; - self.accept_one(msg); + self.accept_one(sockets, msg); } } } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 8a90d598..b5a9d298 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -4,7 +4,7 @@ use std::time::Duration; use std::{io, mem, net}; use actix_rt::net::TcpStream; -use actix_rt::time::{delay_until, Instant}; +use actix_rt::time::{sleep_until, Instant}; use actix_rt::{spawn, System}; use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::oneshot; @@ -14,12 +14,13 @@ use futures_util::{future::Future, ready, stream::Stream, FutureExt, StreamExt}; use log::{error, info}; use socket2::{Domain, Protocol, Socket, Type}; -use crate::accept::{AcceptLoop, AcceptNotify, Command}; +use crate::accept::{AcceptLoop, AcceptNotify}; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::StdListener; +use crate::waker_queue::WakerInterest; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::Token; @@ -265,7 +266,7 @@ impl ServerBuilder { // start workers let workers = (0..self.threads) .map(|idx| { - let worker = self.start_worker(idx, self.accept.get_notify()); + let worker = self.start_worker(idx, self.accept.get_accept_notify()); self.workers.push((idx, worker.clone())); worker @@ -276,7 +277,7 @@ impl ServerBuilder { for sock in &self.sockets { info!("Starting \"{}\" service on {}", sock.1, sock.2); } - self.accept.start( + self.accept.start_accept( mem::take(&mut self.sockets) .into_iter() .map(|t| (t.0, t.2)) @@ -307,11 +308,11 @@ impl ServerBuilder { fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { - self.accept.send(Command::Pause); + self.accept.wake_accept(WakerInterest::Pause); let _ = tx.send(()); } ServerCommand::Resume(tx) => { - self.accept.send(Command::Resume); + self.accept.wake_accept(WakerInterest::Resume); let _ = tx.send(()); } ServerCommand::Signal(sig) => { @@ -355,7 +356,7 @@ impl ServerBuilder { let exit = self.exit; // stop accept thread - self.accept.send(Command::Stop); + self.accept.wake_accept(WakerInterest::Stop); let notify = std::mem::take(&mut self.notify); // stop workers @@ -376,7 +377,7 @@ impl ServerBuilder { if exit { spawn( async { - delay_until( + sleep_until( Instant::now() + Duration::from_millis(300), ) .await; @@ -392,7 +393,7 @@ impl ServerBuilder { // we need to stop system if server was spawned if self.exit { spawn( - delay_until(Instant::now() + Duration::from_millis(300)).then( + sleep_until(Instant::now() + Duration::from_millis(300)).then( |_| { System::current().stop(); ready(()) @@ -432,9 +433,9 @@ impl ServerBuilder { break; } - let worker = self.start_worker(new_idx, self.accept.get_notify()); + let worker = self.start_worker(new_idx, self.accept.get_accept_notify()); self.workers.push((new_idx, worker.clone())); - self.accept.send(Command::Worker(worker)); + self.accept.wake_accept(WakerInterest::Worker(worker)); } } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 8efc29d3..1074304c 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -9,6 +9,7 @@ mod server; mod service; mod signals; mod socket; +mod waker_queue; mod worker; pub use self::builder::ServerBuilder; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 984e5228..3869d4d4 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -11,12 +11,12 @@ use futures_util::{FutureExt, TryFutureExt}; use log::error; use super::Token; -use crate::socket::{FromStream, StdStream}; +use crate::socket::{FromStream, MioStream}; /// Server message pub(crate) enum ServerMessage { /// New stream - Connect(StdStream), + Connect(MioStream), /// Gracefully shutdown Shutdown(Duration), @@ -77,7 +77,7 @@ where fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { - let stream = FromStream::from_stdstream(stream).map_err(|e| { + let stream = FromStream::from_mio_stream(stream).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index b6339621..a2e9cf7f 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures_util::future::lazy; +use futures_util::stream::Stream; use crate::server::Server; @@ -87,7 +88,7 @@ impl Future for Signals { { for idx in 0..self.streams.len() { loop { - match self.streams[idx].1.poll_recv(cx) { + match Pin::new(&mut self.streams[idx].1).poll_next(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Pending => break, Poll::Ready(Some(_)) => { diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 3025660a..c3084484 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,7 +1,18 @@ use std::{fmt, io, net}; +use mio::event::Source; +use mio::net::{ + TcpListener as MioTcpListener, TcpStream as MioTcpStream, UnixListener as MioUnixListener, + UnixStream as MioUnixStream, +}; +use mio::{Interest, Registry, Token}; + use actix_codec::{AsyncRead, AsyncWrite}; -use actix_rt::net::TcpStream; +use actix_rt::net::{TcpStream, UnixStream}; + +/// socket module contains a unified wrapper for Tcp/Uds listener/SocketAddr/Stream and necessary +/// trait impl for registering the listener to mio::Poll and convert stream to +/// `actix_rt::net::{TcpStream, UnixStream}`. pub(crate) enum StdListener { Tcp(net::TcpListener), @@ -12,7 +23,7 @@ pub(crate) enum StdListener { pub(crate) enum SocketAddr { Tcp(net::SocketAddr), #[cfg(all(unix))] - Uds(std::os::unix::net::SocketAddr), + Uds(mio::net::SocketAddr), } impl fmt::Display for SocketAddr { @@ -50,86 +61,93 @@ impl StdListener { match self { StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), #[cfg(all(unix))] - StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + StdListener::Uds(_lst) => { + // FixMe: How to get a SocketAddr? + unimplemented!() + // SocketAddr::Uds(lst.local_addr().unwrap()) + } } } - pub(crate) fn into_listener(self) -> SocketListener { + pub(crate) fn into_listener(self) -> std::io::Result { match self { - StdListener::Tcp(lst) => SocketListener::Tcp( - mio::net::TcpListener::from_std(lst) - .expect("Can not create mio::net::TcpListener"), - ), + StdListener::Tcp(lst) => { + // ToDo: is this non_blocking a good practice? + lst.set_nonblocking(true)?; + Ok(MioSocketListener::Tcp(mio::net::TcpListener::from_std(lst))) + } #[cfg(all(unix))] - StdListener::Uds(lst) => SocketListener::Uds( - mio_uds::UnixListener::from_listener(lst) - .expect("Can not create mio_uds::UnixListener"), - ), + StdListener::Uds(lst) => { + // ToDo: the same as above + lst.set_nonblocking(true)?; + Ok(MioSocketListener::Uds(mio::net::UnixListener::from_std( + lst, + ))) + } } } } #[derive(Debug)] -pub enum StdStream { - Tcp(std::net::TcpStream), +pub enum MioStream { + Tcp(MioTcpStream), #[cfg(all(unix))] - Uds(std::os::unix::net::UnixStream), + Uds(MioUnixStream), } -pub(crate) enum SocketListener { - Tcp(mio::net::TcpListener), +pub(crate) enum MioSocketListener { + Tcp(MioTcpListener), #[cfg(all(unix))] - Uds(mio_uds::UnixListener), + Uds(MioUnixListener), } -impl SocketListener { - pub(crate) fn accept(&self) -> io::Result> { +impl MioSocketListener { + pub(crate) fn accept(&self) -> io::Result> { match *self { - SocketListener::Tcp(ref lst) => lst - .accept_std() - .map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))), + MioSocketListener::Tcp(ref lst) => lst + .accept() + .map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))), #[cfg(all(unix))] - SocketListener::Uds(ref lst) => lst.accept_std().map(|res| { - res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr))) - }), + MioSocketListener::Uds(ref lst) => lst + .accept() + .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))), } } } -impl mio::Evented for SocketListener { +impl Source for MioSocketListener { fn register( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, + &mut self, + registry: &Registry, + token: Token, + interests: Interest, ) -> io::Result<()> { match *self { - SocketListener::Tcp(ref lst) => lst.register(poll, token, interest, opts), + MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests), #[cfg(all(unix))] - SocketListener::Uds(ref lst) => lst.register(poll, token, interest, opts), + MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests), } } fn reregister( - &self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt, + &mut self, + registry: &Registry, + token: Token, + interests: Interest, ) -> io::Result<()> { match *self { - SocketListener::Tcp(ref lst) => lst.reregister(poll, token, interest, opts), + MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), #[cfg(all(unix))] - SocketListener::Uds(ref lst) => lst.reregister(poll, token, interest, opts), + MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), } } - fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { match *self { - SocketListener::Tcp(ref lst) => lst.deregister(poll), + MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry), #[cfg(all(unix))] - SocketListener::Uds(ref lst) => { - let res = lst.deregister(poll); + MioSocketListener::Uds(ref mut lst) => { + let res = lst.deregister(registry); // cleanup file path if let Ok(addr) = lst.local_addr() { @@ -143,16 +161,21 @@ impl mio::Evented for SocketListener { } } +/// helper trait for converting mio stream to tokio stream. pub trait FromStream: AsyncRead + AsyncWrite + Sized { - fn from_stdstream(sock: StdStream) -> io::Result; + fn from_mio_stream(sock: MioStream) -> io::Result; } impl FromStream for TcpStream { - fn from_stdstream(sock: StdStream) -> io::Result { + fn from_mio_stream(sock: MioStream) -> io::Result { match sock { - StdStream::Tcp(stream) => TcpStream::from_std(stream), + MioStream::Tcp(stream) => { + // FixMe: this only works on unix. We possibly want TcpStream::new from tokio. + let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream); + TcpStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) }) + } #[cfg(all(unix))] - StdStream::Uds(_) => { + MioStream::Uds(_) => { panic!("Should not happen, bug in server impl"); } } @@ -160,11 +183,15 @@ impl FromStream for TcpStream { } #[cfg(all(unix))] -impl FromStream for actix_rt::net::UnixStream { - fn from_stdstream(sock: StdStream) -> io::Result { +impl FromStream for UnixStream { + fn from_mio_stream(sock: MioStream) -> io::Result { match sock { - StdStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - StdStream::Uds(stream) => actix_rt::net::UnixStream::from_std(stream), + MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), + MioStream::Uds(stream) => { + // FixMe: this only works on unix. Like for TcpStream. + let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream); + UnixStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) }) + } } } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs new file mode 100644 index 00000000..d7e40a95 --- /dev/null +++ b/actix-server/src/waker_queue.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use concurrent_queue::{ConcurrentQueue, PopError}; +use mio::{Registry, Token as MioToken, Waker}; + +use crate::worker::WorkerClient; +use futures_util::core_reexport::fmt::Formatter; +use std::fmt::Debug; + +/// waker token for `mio::Poll` instance +pub(crate) const WAKER_TOKEN: MioToken = MioToken(1); + +/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` +/// we want `Poll` to look into. +pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue)>); + +impl Clone for WakerQueue { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl WakerQueue { + /// construct a waker queue with given `Poll`'s `Registry` and capacity. + /// + /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match + /// event for it to properly handle `WakerInterest`. + pub(crate) fn with_capacity(registry: &Registry, cap: usize) -> std::io::Result { + let waker = Waker::new(registry, WAKER_TOKEN)?; + let queue = ConcurrentQueue::bounded(cap); + + Ok(Self(Arc::new((waker, queue)))) + } + + /// push a new interest to the queue and wake up the accept poll afterwards. + pub(crate) fn wake(&self, interest: WakerInterest) { + // ToDo: should we handle error here? + let r = (self.0).1.push(interest); + assert!(r.is_ok()); + + (self.0).0.wake().expect("can not wake up Accept Poll"); + } + + /// pop an `Interests` from the back of the queue. + pub(crate) fn pop(&self) -> Result { + (self.0).1.pop() + } +} + +/// the types of interests we would look into when `Accept`'s `Poll` is waked up by waker. +/// +/// *. These interests should not confused with `mio::Interest` and mostly not I/O related +pub(crate) enum WakerInterest { + /// Interest from `WorkerClient` notifying `Accept` to run `backpressure` method + Notify, + /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to + /// `ServerCommand` and notify `Accept` to do exactly these tasks. + Pause, + Resume, + Stop, + /// `Timer` is an interest sent as a delayed future. When an error happens on accepting + /// connection the poll would deregister the socket temporary and wake up the poll and register + /// again after the delayed future resolve. + Timer, + Worker(WorkerClient), +} + +impl Debug for WakerInterest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut f = f.debug_struct("WakerInterest"); + + match *self { + Self::Notify => f.field("type", &"notify"), + Self::Pause => f.field("type", &"pause"), + Self::Resume => f.field("type", &"resume"), + Self::Stop => f.field("type", &"stop"), + Self::Timer => f.field("type", &"timer"), + Self::Worker(_) => f.field("type", &"worker"), + }; + + f.finish() + } +} + +pub(crate) type WakerQueueError = PopError; diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 35331757..45d8d3aa 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time; -use actix_rt::time::{delay_until, Delay, Instant}; +use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; @@ -15,7 +15,7 @@ use log::{error, info, trace}; use crate::accept::AcceptNotify; use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; -use crate::socket::{SocketAddr, StdStream}; +use crate::socket::{MioStream, SocketAddr}; use crate::Token; pub(crate) struct WorkerCommand(Conn); @@ -29,7 +29,7 @@ pub(crate) struct StopCommand { #[derive(Debug)] pub(crate) struct Conn { - pub io: StdStream, + pub io: MioStream, pub token: Token, pub peer: Option, } @@ -307,8 +307,8 @@ enum WorkerState { Pin, ()>>>>, ), Shutdown( - Pin>, - Pin>, + Pin>, + Pin>, Option>, ), } @@ -335,8 +335,8 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))), - Box::pin(delay_until(Instant::now() + self.shutdown_timeout)), + Box::pin(sleep_until(Instant::now() + time::Duration::from_secs(1))), + Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)), Some(result), ); } else { @@ -437,7 +437,7 @@ impl Future for Worker { match t1.as_mut().poll(cx) { Poll::Pending => (), Poll::Ready(_) => { - *t1 = Box::pin(delay_until( + *t1 = Box::pin(sleep_until( Instant::now() + time::Duration::from_secs(1), )); let _ = t1.as_mut().poll(cx); diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index ce309c94..7217aab2 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -90,11 +90,13 @@ fn test_start() { }) }) .unwrap() + .workers(1) .start(); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); + let (srv, sys) = rx.recv().unwrap(); let mut buf = [1u8; 4]; diff --git a/actix-utils/LICENSE-APACHE b/actix-utils/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-utils/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-utils/LICENSE-APACHE b/actix-utils/LICENSE-APACHE new file mode 100644 index 00000000..6cdf2d16 --- /dev/null +++ b/actix-utils/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017-NOW Nikolay Kim + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/actix-utils/LICENSE-MIT b/actix-utils/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-utils/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-utils/LICENSE-MIT b/actix-utils/LICENSE-MIT new file mode 100644 index 00000000..0f80296a --- /dev/null +++ b/actix-utils/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 Nikolay Kim + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs index 5ed987c7..21fff5e3 100644 --- a/actix-utils/src/inflight.rs +++ b/actix-utils/src/inflight.rs @@ -131,7 +131,7 @@ mod tests { } fn call(&mut self, _: ()) -> Self::Future { - actix_rt::time::delay_for(self.0) + actix_rt::time::sleep(self.0) .then(|_| ok::<_, ()>(())) .boxed_local() } diff --git a/actix-utils/src/keepalive.rs b/actix-utils/src/keepalive.rs index 4413dcd5..70850cfb 100644 --- a/actix-utils/src/keepalive.rs +++ b/actix-utils/src/keepalive.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use actix_rt::time::{delay_until, Delay, Instant}; +use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_service::{Service, ServiceFactory}; use futures_util::future::{ok, Ready}; @@ -71,7 +71,7 @@ pub struct KeepAliveService { f: F, ka: Duration, time: LowResTimeService, - delay: Delay, + delay: Sleep, expire: Instant, _t: PhantomData<(R, E)>, } @@ -87,7 +87,7 @@ where ka, time, expire, - delay: delay_until(expire), + delay: sleep_until(expire), _t: PhantomData, } } diff --git a/actix-utils/src/time.rs b/actix-utils/src/time.rs index 2ce65bc3..e8daf4bf 100644 --- a/actix-utils/src/time.rs +++ b/actix-utils/src/time.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use std::task::{Context, Poll}; use std::time::{self, Duration, Instant}; -use actix_rt::time::delay_for; +use actix_rt::time::sleep; use actix_service::{Service, ServiceFactory}; use futures_util::future::{ok, ready, FutureExt, Ready}; @@ -79,7 +79,7 @@ impl LowResTimeService { b.resolution }; - actix_rt::spawn(delay_for(interval).then(move |_| { + actix_rt::spawn(sleep(interval).then(move |_| { inner.borrow_mut().current.take(); ready(()) })); @@ -144,7 +144,7 @@ impl SystemTimeService { b.resolution }; - actix_rt::spawn(delay_for(interval).then(move |_| { + actix_rt::spawn(sleep(interval).then(move |_| { inner.borrow_mut().current.take(); ready(()) })); @@ -195,7 +195,7 @@ mod tests { .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); - delay_for(wait_time).await; + sleep(wait_time).await; let second_time = time_service .now() @@ -217,7 +217,7 @@ mod tests { let first_time = time_service.now(); - delay_for(wait_time).await; + sleep(wait_time).await; let second_time = time_service.now(); assert!(second_time - first_time >= wait_time); diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index f1d30f19..2dc73d02 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, time}; -use actix_rt::time::{delay_for, Delay}; +use actix_rt::time::{sleep, Sleep}; use actix_service::{IntoService, Service, Transform}; use futures_util::future::{ok, Ready}; @@ -135,7 +135,7 @@ where fn call(&mut self, request: S::Request) -> Self::Future { TimeoutServiceResponse { fut: self.service.call(request), - sleep: delay_for(self.timeout), + sleep: sleep(self.timeout), } } } @@ -146,7 +146,7 @@ where pub struct TimeoutServiceResponse { #[pin] fut: T::Future, - sleep: Delay, + sleep: Sleep, } impl Future for TimeoutServiceResponse @@ -195,7 +195,7 @@ mod tests { } fn call(&mut self, _: ()) -> Self::Future { - actix_rt::time::delay_for(self.0) + actix_rt::time::sleep(self.0) .then(|_| ok::<_, ()>(())) .boxed_local() }