mirror of https://github.com/fafhrd91/actix-net
				
				
				
			
		
			
				
	
	
		
			538 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Rust
		
	
	
	
			
		
		
	
	
			538 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Rust
		
	
	
	
#![allow(clippy::let_underscore_future, missing_docs)]
 | 
						|
 | 
						|
use std::{
 | 
						|
    net,
 | 
						|
    sync::{
 | 
						|
        atomic::{AtomicUsize, Ordering},
 | 
						|
        mpsc, Arc,
 | 
						|
    },
 | 
						|
    thread,
 | 
						|
    time::Duration,
 | 
						|
};
 | 
						|
 | 
						|
use actix_rt::{net::TcpStream, time::sleep};
 | 
						|
use actix_server::{Server, TestServer};
 | 
						|
use actix_service::fn_service;
 | 
						|
 | 
						|
fn unused_addr() -> net::SocketAddr {
 | 
						|
    TestServer::unused_addr()
 | 
						|
}
 | 
						|
 | 
						|
#[test]
 | 
						|
fn test_bind() {
 | 
						|
    let addr = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        actix_rt::System::new().block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                .workers(1)
 | 
						|
                .disable_signals()
 | 
						|
                .shutdown_timeout(3600)
 | 
						|
                .bind("test", addr, move || {
 | 
						|
                    fn_service(|_| async { Ok::<_, ()>(()) })
 | 
						|
                })?
 | 
						|
                .run();
 | 
						|
 | 
						|
            tx.send(srv.handle()).unwrap();
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let srv = rx.recv().unwrap();
 | 
						|
 | 
						|
    thread::sleep(Duration::from_millis(500));
 | 
						|
 | 
						|
    net::TcpStream::connect(addr).unwrap();
 | 
						|
 | 
						|
    let _ = srv.stop(true);
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
}
 | 
						|
 | 
						|
#[test]
 | 
						|
fn test_listen() {
 | 
						|
    let addr = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
    let lst = net::TcpListener::bind(addr).unwrap();
 | 
						|
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        actix_rt::System::new().block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                .workers(1)
 | 
						|
                .disable_signals()
 | 
						|
                .shutdown_timeout(3600)
 | 
						|
                .listen("test", lst, move || {
 | 
						|
                    fn_service(|_| async { Ok::<_, ()>(()) })
 | 
						|
                })?
 | 
						|
                .run();
 | 
						|
 | 
						|
            tx.send(srv.handle()).unwrap();
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let srv = rx.recv().unwrap();
 | 
						|
 | 
						|
    thread::sleep(Duration::from_millis(500));
 | 
						|
 | 
						|
    net::TcpStream::connect(addr).unwrap();
 | 
						|
 | 
						|
    let _ = srv.stop(true);
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
}
 | 
						|
 | 
						|
#[test]
 | 
						|
fn plain_tokio_runtime() {
 | 
						|
    let addr = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        let rt = tokio::runtime::Builder::new_current_thread()
 | 
						|
            .enable_all()
 | 
						|
            .build()
 | 
						|
            .unwrap();
 | 
						|
 | 
						|
        rt.block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                .workers(1)
 | 
						|
                .disable_signals()
 | 
						|
                .bind("test", addr, move || {
 | 
						|
                    fn_service(|_| async { Ok::<_, ()>(()) })
 | 
						|
                })?
 | 
						|
                .run();
 | 
						|
 | 
						|
            tx.send(srv.handle()).unwrap();
 | 
						|
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let srv = rx.recv().unwrap();
 | 
						|
 | 
						|
    thread::sleep(Duration::from_millis(500));
 | 
						|
    assert!(net::TcpStream::connect(addr).is_ok());
 | 
						|
 | 
						|
    let _ = srv.stop(true);
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
}
 | 
						|
 | 
						|
#[test]
 | 
						|
#[cfg(unix)]
 | 
						|
fn test_start() {
 | 
						|
    use std::io::Read;
 | 
						|
 | 
						|
    use actix_codec::{BytesCodec, Framed};
 | 
						|
    use bytes::Bytes;
 | 
						|
    use futures_util::sink::SinkExt;
 | 
						|
 | 
						|
    let addr = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        actix_rt::System::new().block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                .backlog(100)
 | 
						|
                .disable_signals()
 | 
						|
                .bind("test", addr, move || {
 | 
						|
                    fn_service(|io: TcpStream| async move {
 | 
						|
                        let mut f = Framed::new(io, BytesCodec);
 | 
						|
                        f.send(Bytes::from_static(b"test")).await.unwrap();
 | 
						|
                        Ok::<_, ()>(())
 | 
						|
                    })
 | 
						|
                })?
 | 
						|
                .run();
 | 
						|
 | 
						|
            let _ = tx.send((srv.handle(), actix_rt::System::current()));
 | 
						|
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let (srv, sys) = rx.recv().unwrap();
 | 
						|
 | 
						|
    let mut buf = [1u8; 4];
 | 
						|
    let mut conn = net::TcpStream::connect(addr).unwrap();
 | 
						|
    let _ = conn.read_exact(&mut buf);
 | 
						|
    assert_eq!(buf, b"test"[..]);
 | 
						|
 | 
						|
    // pause
 | 
						|
    let _ = srv.pause();
 | 
						|
    thread::sleep(Duration::from_millis(200));
 | 
						|
    let mut conn = net::TcpStream::connect(addr).unwrap();
 | 
						|
    conn.set_read_timeout(Some(Duration::from_millis(100)))
 | 
						|
        .unwrap();
 | 
						|
    let res = conn.read_exact(&mut buf);
 | 
						|
    assert!(res.is_err());
 | 
						|
 | 
						|
    // resume
 | 
						|
    let _ = srv.resume();
 | 
						|
    thread::sleep(Duration::from_millis(100));
 | 
						|
    assert!(net::TcpStream::connect(addr).is_ok());
 | 
						|
    assert!(net::TcpStream::connect(addr).is_ok());
 | 
						|
    assert!(net::TcpStream::connect(addr).is_ok());
 | 
						|
 | 
						|
    let mut buf = [0u8; 4];
 | 
						|
    let mut conn = net::TcpStream::connect(addr).unwrap();
 | 
						|
    let _ = conn.read_exact(&mut buf);
 | 
						|
    assert_eq!(buf, b"test"[..]);
 | 
						|
 | 
						|
    // stop
 | 
						|
    let _ = srv.stop(false);
 | 
						|
    sys.stop();
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
 | 
						|
    thread::sleep(Duration::from_secs(1));
 | 
						|
    assert!(net::TcpStream::connect(addr).is_err());
 | 
						|
}
 | 
						|
 | 
						|
#[actix_rt::test]
 | 
						|
async fn test_max_concurrent_connections() {
 | 
						|
    // Note:
 | 
						|
    // A TCP listener would accept connects based on it's backlog setting.
 | 
						|
    //
 | 
						|
    // The limit test on the other hand is only for concurrent TCP stream limiting a work
 | 
						|
    // thread accept.
 | 
						|
 | 
						|
    use tokio::io::AsyncWriteExt;
 | 
						|
 | 
						|
    let addr = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
 | 
						|
    let counter = Arc::new(AtomicUsize::new(0));
 | 
						|
    let counter_clone = counter.clone();
 | 
						|
 | 
						|
    let max_conn = 3;
 | 
						|
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        actix_rt::System::new().block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                // Set a relative higher backlog.
 | 
						|
                .backlog(12)
 | 
						|
                // max connection for a worker is 3.
 | 
						|
                .max_concurrent_connections(max_conn)
 | 
						|
                .workers(1)
 | 
						|
                .disable_signals()
 | 
						|
                .bind("test", addr, move || {
 | 
						|
                    let counter = counter.clone();
 | 
						|
                    fn_service(move |_io: TcpStream| {
 | 
						|
                        let counter = counter.clone();
 | 
						|
                        async move {
 | 
						|
                            counter.fetch_add(1, Ordering::SeqCst);
 | 
						|
                            sleep(Duration::from_secs(20)).await;
 | 
						|
                            counter.fetch_sub(1, Ordering::SeqCst);
 | 
						|
                            Ok::<(), ()>(())
 | 
						|
                        }
 | 
						|
                    })
 | 
						|
                })?
 | 
						|
                .run();
 | 
						|
 | 
						|
            let _ = tx.send((srv.handle(), actix_rt::System::current()));
 | 
						|
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let (srv, sys) = rx.recv().unwrap();
 | 
						|
 | 
						|
    let mut conns = vec![];
 | 
						|
 | 
						|
    for _ in 0..12 {
 | 
						|
        let conn = tokio::net::TcpStream::connect(addr).await.unwrap();
 | 
						|
        conns.push(conn);
 | 
						|
    }
 | 
						|
 | 
						|
    sleep(Duration::from_secs(5)).await;
 | 
						|
 | 
						|
    // counter would remain at 3 even with 12 successful connection.
 | 
						|
    // and 9 of them remain in backlog.
 | 
						|
    assert_eq!(max_conn, counter_clone.load(Ordering::SeqCst));
 | 
						|
 | 
						|
    for mut conn in conns {
 | 
						|
        conn.shutdown().await.unwrap();
 | 
						|
    }
 | 
						|
 | 
						|
    srv.stop(false).await;
 | 
						|
    sys.stop();
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
}
 | 
						|
 | 
						|
// TODO: race-y failures detected due to integer underflow when calling Counter::total
 | 
						|
#[actix_rt::test]
 | 
						|
async fn test_service_restart() {
 | 
						|
    use std::task::{Context, Poll};
 | 
						|
 | 
						|
    use actix_service::{fn_factory, Service};
 | 
						|
    use futures_core::future::LocalBoxFuture;
 | 
						|
    use tokio::io::AsyncWriteExt;
 | 
						|
 | 
						|
    struct TestService(Arc<AtomicUsize>);
 | 
						|
 | 
						|
    impl Service<TcpStream> for TestService {
 | 
						|
        type Response = ();
 | 
						|
        type Error = ();
 | 
						|
        type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
 | 
						|
 | 
						|
        fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
 | 
						|
            let TestService(ref counter) = self;
 | 
						|
            let c = counter.fetch_add(1, Ordering::SeqCst);
 | 
						|
            // Force the service to restart on first readiness check.
 | 
						|
            if c > 0 {
 | 
						|
                Poll::Ready(Ok(()))
 | 
						|
            } else {
 | 
						|
                Poll::Ready(Err(()))
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        fn call(&self, _: TcpStream) -> Self::Future {
 | 
						|
            Box::pin(async { Ok(()) })
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    let addr1 = unused_addr();
 | 
						|
    let addr2 = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
    let num = Arc::new(AtomicUsize::new(0));
 | 
						|
    let num2 = Arc::new(AtomicUsize::new(0));
 | 
						|
 | 
						|
    let num_clone = num.clone();
 | 
						|
    let num2_clone = num2.clone();
 | 
						|
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        let num = num.clone();
 | 
						|
        actix_rt::System::new().block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                .backlog(1)
 | 
						|
                .disable_signals()
 | 
						|
                .bind("addr1", addr1, move || {
 | 
						|
                    let num = num.clone();
 | 
						|
                    fn_factory(move || {
 | 
						|
                        let num = num.clone();
 | 
						|
                        async move { Ok::<_, ()>(TestService(num)) }
 | 
						|
                    })
 | 
						|
                })?
 | 
						|
                .bind("addr2", addr2, move || {
 | 
						|
                    let num2 = num2.clone();
 | 
						|
                    fn_factory(move || {
 | 
						|
                        let num2 = num2.clone();
 | 
						|
                        async move { Ok::<_, ()>(TestService(num2)) }
 | 
						|
                    })
 | 
						|
                })?
 | 
						|
                .workers(1)
 | 
						|
                .run();
 | 
						|
 | 
						|
            let _ = tx.send(srv.handle());
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let srv = rx.recv().unwrap();
 | 
						|
 | 
						|
    for _ in 0..5 {
 | 
						|
        TcpStream::connect(addr1)
 | 
						|
            .await
 | 
						|
            .unwrap()
 | 
						|
            .shutdown()
 | 
						|
            .await
 | 
						|
            .unwrap();
 | 
						|
        TcpStream::connect(addr2)
 | 
						|
            .await
 | 
						|
            .unwrap()
 | 
						|
            .shutdown()
 | 
						|
            .await
 | 
						|
            .unwrap();
 | 
						|
    }
 | 
						|
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
 | 
						|
    assert!(num_clone.load(Ordering::SeqCst) > 5);
 | 
						|
    assert!(num2_clone.load(Ordering::SeqCst) > 5);
 | 
						|
 | 
						|
    let _ = srv.stop(false);
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
}
 | 
						|
 | 
						|
#[ignore] // non-deterministic on CI
 | 
						|
#[actix_rt::test]
 | 
						|
async fn worker_restart() {
 | 
						|
    use actix_service::{Service, ServiceFactory};
 | 
						|
    use futures_core::future::LocalBoxFuture;
 | 
						|
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
						|
 | 
						|
    struct TestServiceFactory(Arc<AtomicUsize>);
 | 
						|
 | 
						|
    impl ServiceFactory<TcpStream> for TestServiceFactory {
 | 
						|
        type Response = ();
 | 
						|
        type Error = ();
 | 
						|
        type Config = ();
 | 
						|
        type Service = TestService;
 | 
						|
        type InitError = ();
 | 
						|
        type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
 | 
						|
 | 
						|
        fn new_service(&self, _: Self::Config) -> Self::Future {
 | 
						|
            let counter = self.0.fetch_add(1, Ordering::Relaxed);
 | 
						|
 | 
						|
            Box::pin(async move { Ok(TestService(counter)) })
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    struct TestService(usize);
 | 
						|
 | 
						|
    impl Service<TcpStream> for TestService {
 | 
						|
        type Response = ();
 | 
						|
        type Error = ();
 | 
						|
        type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
 | 
						|
 | 
						|
        actix_service::always_ready!();
 | 
						|
 | 
						|
        fn call(&self, stream: TcpStream) -> Self::Future {
 | 
						|
            let counter = self.0;
 | 
						|
 | 
						|
            let mut stream = stream.into_std().unwrap();
 | 
						|
            use std::io::Write;
 | 
						|
            let str = counter.to_string();
 | 
						|
            let buf = str.as_bytes();
 | 
						|
 | 
						|
            let mut written = 0;
 | 
						|
 | 
						|
            while written < buf.len() {
 | 
						|
                if let Ok(n) = stream.write(&buf[written..]) {
 | 
						|
                    written += n;
 | 
						|
                }
 | 
						|
            }
 | 
						|
            stream.flush().unwrap();
 | 
						|
            stream.shutdown(net::Shutdown::Write).unwrap();
 | 
						|
 | 
						|
            // force worker 2 to restart service once.
 | 
						|
            if counter == 2 {
 | 
						|
                panic!("panic on purpose")
 | 
						|
            } else {
 | 
						|
                Box::pin(async { Ok(()) })
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    let addr = unused_addr();
 | 
						|
    let (tx, rx) = mpsc::channel();
 | 
						|
 | 
						|
    let counter = Arc::new(AtomicUsize::new(1));
 | 
						|
    let h = thread::spawn(move || {
 | 
						|
        let counter = counter.clone();
 | 
						|
        actix_rt::System::new().block_on(async {
 | 
						|
            let srv = Server::build()
 | 
						|
                .disable_signals()
 | 
						|
                .bind("addr", addr, move || TestServiceFactory(counter.clone()))?
 | 
						|
                .workers(2)
 | 
						|
                .run();
 | 
						|
 | 
						|
            let _ = tx.send(srv.handle());
 | 
						|
 | 
						|
            srv.await
 | 
						|
        })
 | 
						|
    });
 | 
						|
 | 
						|
    let srv = rx.recv().unwrap();
 | 
						|
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
 | 
						|
    let mut buf = [0; 8];
 | 
						|
 | 
						|
    // worker 1 would not restart and return it's id consistently.
 | 
						|
    let mut stream = TcpStream::connect(addr).await.unwrap();
 | 
						|
    let n = stream.read(&mut buf).await.unwrap();
 | 
						|
    let id = String::from_utf8_lossy(&buf[0..n]);
 | 
						|
    assert_eq!("1", id);
 | 
						|
    stream.shutdown().await.unwrap();
 | 
						|
 | 
						|
    // worker 2 dead after return response.
 | 
						|
    let mut stream = TcpStream::connect(addr).await.unwrap();
 | 
						|
    let n = stream.read(&mut buf).await.unwrap();
 | 
						|
    let id = String::from_utf8_lossy(&buf[0..n]);
 | 
						|
    assert_eq!("2", id);
 | 
						|
    stream.shutdown().await.unwrap();
 | 
						|
 | 
						|
    // request to worker 1
 | 
						|
    let mut stream = TcpStream::connect(addr).await.unwrap();
 | 
						|
    let n = stream.read(&mut buf).await.unwrap();
 | 
						|
    let id = String::from_utf8_lossy(&buf[0..n]);
 | 
						|
    assert_eq!("1", id);
 | 
						|
    stream.shutdown().await.unwrap();
 | 
						|
 | 
						|
    // TODO: Remove sleep if it can pass CI.
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
 | 
						|
    // worker 2 restarting and work goes to worker 1.
 | 
						|
    let mut stream = TcpStream::connect(addr).await.unwrap();
 | 
						|
    let n = stream.read(&mut buf).await.unwrap();
 | 
						|
    let id = String::from_utf8_lossy(&buf[0..n]);
 | 
						|
    assert_eq!("1", id);
 | 
						|
    stream.shutdown().await.unwrap();
 | 
						|
 | 
						|
    // TODO: Remove sleep if it can pass CI.
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
 | 
						|
    // worker 2 restarted but worker 1 was still the next to accept connection.
 | 
						|
    let mut stream = TcpStream::connect(addr).await.unwrap();
 | 
						|
    let n = stream.read(&mut buf).await.unwrap();
 | 
						|
    let id = String::from_utf8_lossy(&buf[0..n]);
 | 
						|
    assert_eq!("1", id);
 | 
						|
    stream.shutdown().await.unwrap();
 | 
						|
 | 
						|
    // TODO: Remove sleep if it can pass CI.
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
 | 
						|
    // worker 2 accept connection again but it's id is 3.
 | 
						|
    let mut stream = TcpStream::connect(addr).await.unwrap();
 | 
						|
    let n = stream.read(&mut buf).await.unwrap();
 | 
						|
    let id = String::from_utf8_lossy(&buf[0..n]);
 | 
						|
    assert_eq!("3", id);
 | 
						|
    stream.shutdown().await.unwrap();
 | 
						|
 | 
						|
    let _ = srv.stop(false);
 | 
						|
    h.join().unwrap().unwrap();
 | 
						|
}
 | 
						|
 | 
						|
#[test]
 | 
						|
fn no_runtime_on_init() {
 | 
						|
    use std::{thread::sleep, time::Duration};
 | 
						|
 | 
						|
    let addr = unused_addr();
 | 
						|
    let counter = Arc::new(AtomicUsize::new(0));
 | 
						|
 | 
						|
    let mut srv = Server::build()
 | 
						|
        .workers(2)
 | 
						|
        .disable_signals()
 | 
						|
        .bind("test", addr, {
 | 
						|
            let counter = counter.clone();
 | 
						|
            move || {
 | 
						|
                counter.fetch_add(1, Ordering::SeqCst);
 | 
						|
                fn_service(|_| async { Ok::<_, ()>(()) })
 | 
						|
            }
 | 
						|
        })
 | 
						|
        .unwrap()
 | 
						|
        .run();
 | 
						|
 | 
						|
    fn is_send<T: Send>(_: &T) {}
 | 
						|
    is_send(&srv);
 | 
						|
    is_send(&srv.handle());
 | 
						|
 | 
						|
    sleep(Duration::from_millis(1_000));
 | 
						|
    assert_eq!(counter.load(Ordering::SeqCst), 0);
 | 
						|
 | 
						|
    let rt = tokio::runtime::Builder::new_current_thread()
 | 
						|
        .enable_all()
 | 
						|
        .build()
 | 
						|
        .unwrap();
 | 
						|
 | 
						|
    rt.block_on(async move {
 | 
						|
        let _ = futures_util::poll!(&mut srv);
 | 
						|
 | 
						|
        // available after the first poll
 | 
						|
        sleep(Duration::from_millis(500));
 | 
						|
        assert_eq!(counter.load(Ordering::SeqCst), 2);
 | 
						|
 | 
						|
        let _ = srv.handle().stop(true);
 | 
						|
        srv.await
 | 
						|
    })
 | 
						|
    .unwrap();
 | 
						|
}
 |