update ssl connect tests

This commit is contained in:
Nikolay Kim 2019-11-14 17:46:10 +06:00
parent 7b7b9a600c
commit e95f754817
2 changed files with 82 additions and 60 deletions

View File

@ -10,35 +10,39 @@ use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use actix_connect::Connect;
#[cfg(feature = "ssl")]
#[cfg(feature = "openssl")]
#[test]
fn test_string() {
let srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
service_fn(|io: Io<tokio_net::tcp::TcpStream>| {
async {
let mut framed = Framed::new(io.into_parts().0, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}
})
});
let mut conn = default_connector();
let mut conn = actix_connect::default_connector();
let addr = format!("localhost:{}", srv.port());
let con = test::call_service(&mut conn, addr.into());
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[cfg(feature = "rust-tls")]
#[cfg(feature = "rustls")]
#[test]
fn test_rustls_string() {
let srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
service_fn(|io: Io<tokio_net::tcp::TcpStream>| {
async {
let mut framed = Framed::new(io.into_parts().0, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}
})
});
let mut conn = default_connector();
let mut conn = actix_connect::default_connector();
let addr = format!("localhost:{}", srv.port());
let con = test::call_service(&mut conn, addr.into());
assert_eq!(con.peer_addr().unwrap(), srv.addr());
@ -90,36 +94,44 @@ fn test_new_service() {
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[cfg(feature = "ssl")]
#[cfg(feature = "openssl")]
#[test]
fn test_uri() {
use http::HttpTryFrom;
let srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
service_fn(|io: Io<tokio_net::tcp::TcpStream>| {
async {
let mut framed = Framed::new(io.into_parts().0, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}
})
});
let mut conn = default_connector();
let addr = Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let mut conn = actix_connect::default_connector();
let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let con = test::call_service(&mut conn, addr.into());
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[cfg(feature = "rust-tls")]
#[cfg(feature = "rustls")]
#[test]
fn test_rustls_uri() {
use http::HttpTryFrom;
let srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec)
.send(Bytes::from_static(b"test"))
.then(|_| Ok::<_, ()>(()))
service_fn(|io: Io<tokio_net::tcp::TcpStream>| {
async {
let mut framed = Framed::new(io.into_parts().0, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}
})
});
let mut conn = default_connector();
let addr = Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let mut conn = actix_connect::default_connector();
let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let con = test::call_service(&mut conn, addr.into());
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}

View File

@ -98,41 +98,49 @@ impl Arbiter {
let (arb_tx, arb_rx) = unbounded();
let arb_tx2 = arb_tx.clone();
let handle = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let arb = Arbiter::with_sender(arb_tx);
let handle = thread::Builder::new()
.name(name.clone())
.spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear());
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys);
System::set_current(sys);
// start arbiter controller
rt.spawn(ArbiterController {
stop: Some(stop),
rx: arb_rx,
// start arbiter controller
rt.spawn(ArbiterController {
stop: Some(stop),
rx: arb_rx,
});
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb.clone()));
// run loop
let _ = match rt.block_on(stop_rx) {
Ok(code) => code,
Err(_) => 1,
};
// unregister arbiter
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id));
})
.unwrap_or_else(|err| {
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
});
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb.clone()));
// run loop
let _ = match rt.block_on(stop_rx) {
Ok(code) => code,
Err(_) => 1,
};
// unregister arbiter
let _ = System::current()
.sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id));
}).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err));
Arbiter{sender: arb_tx2, thread_handle: Some(handle)}
Arbiter {
sender: arb_tx2,
thread_handle: Some(handle),
}
}
pub(crate) fn run_system() {
@ -268,15 +276,17 @@ impl Arbiter {
}
fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
Self{sender, thread_handle: None}
Self {
sender,
thread_handle: None,
}
}
/// Wait for the event loop to stop by joining the underlying thread (if have Some).
pub fn join(&mut self) -> thread::Result<()>{
pub fn join(&mut self) -> thread::Result<()> {
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
}
else {
} else {
Ok(())
}
}