mirror of https://github.com/fafhrd91/actix-net
				
				
				
			store the thread's handle with arbiter (#62)
This commit is contained in:
		
							parent
							
								
									f895c7d186
								
							
						
					
					
						commit
						7b7b9a600c
					
				|  | @ -6,6 +6,10 @@ | ||||||
| 
 | 
 | ||||||
| * Fix arbiter's thread panic message. | * Fix arbiter's thread panic message. | ||||||
| 
 | 
 | ||||||
|  | ### Added | ||||||
|  | 
 | ||||||
|  | * Allow to join arbiter's thread. #60 | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| ## [0.2.5] - 2019-09-02 | ## [0.2.5] - 2019-09-02 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -41,11 +41,20 @@ impl fmt::Debug for ArbiterCommand { | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Clone)] | #[derive(Debug)] | ||||||
| /// Arbiters provide an asynchronous execution environment for actors, functions
 | /// Arbiters provide an asynchronous execution environment for actors, functions
 | ||||||
| /// and futures. When an Arbiter is created, they spawn a new OS thread, and
 | /// and futures. When an Arbiter is created, they spawn a new OS thread, and
 | ||||||
| /// host an event loop. Some Arbiter functions execute on the current thread.
 | /// host an event loop. Some Arbiter functions execute on the current thread.
 | ||||||
| pub struct Arbiter(UnboundedSender<ArbiterCommand>); | pub struct Arbiter { | ||||||
|  |     sender: UnboundedSender<ArbiterCommand>, | ||||||
|  |     thread_handle: Option<thread::JoinHandle<()>>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Clone for Arbiter { | ||||||
|  |     fn clone(&self) -> Self { | ||||||
|  |         Self::with_sender(self.sender.clone()) | ||||||
|  |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| impl Default for Arbiter { | impl Default for Arbiter { | ||||||
|     fn default() -> Self { |     fn default() -> Self { | ||||||
|  | @ -57,7 +66,7 @@ impl Arbiter { | ||||||
|     pub(crate) fn new_system() -> Self { |     pub(crate) fn new_system() -> Self { | ||||||
|         let (tx, rx) = unbounded(); |         let (tx, rx) = unbounded(); | ||||||
| 
 | 
 | ||||||
|         let arb = Arbiter(tx); |         let arb = Arbiter::with_sender(tx); | ||||||
|         ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); |         ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); | ||||||
|         RUNNING.with(|cell| cell.set(false)); |         RUNNING.with(|cell| cell.set(false)); | ||||||
|         STORAGE.with(|cell| cell.borrow_mut().clear()); |         STORAGE.with(|cell| cell.borrow_mut().clear()); | ||||||
|  | @ -77,7 +86,7 @@ impl Arbiter { | ||||||
| 
 | 
 | ||||||
|     /// Stop arbiter from continuing it's event loop.
 |     /// Stop arbiter from continuing it's event loop.
 | ||||||
|     pub fn stop(&self) { |     pub fn stop(&self) { | ||||||
|         let _ = self.0.unbounded_send(ArbiterCommand::Stop); |         let _ = self.sender.unbounded_send(ArbiterCommand::Stop); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// Spawn new thread and run event loop in spawned thread.
 |     /// Spawn new thread and run event loop in spawned thread.
 | ||||||
|  | @ -89,9 +98,9 @@ impl Arbiter { | ||||||
|         let (arb_tx, arb_rx) = unbounded(); |         let (arb_tx, arb_rx) = unbounded(); | ||||||
|         let arb_tx2 = arb_tx.clone(); |         let arb_tx2 = arb_tx.clone(); | ||||||
| 
 | 
 | ||||||
|         let _ = thread::Builder::new().name(name.clone()).spawn(move || { |         let handle = thread::Builder::new().name(name.clone()).spawn(move || { | ||||||
|             let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); |             let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); | ||||||
|             let arb = Arbiter(arb_tx); |             let arb = Arbiter::with_sender(arb_tx); | ||||||
| 
 | 
 | ||||||
|             let (stop, stop_rx) = channel(); |             let (stop, stop_rx) = channel(); | ||||||
|             RUNNING.with(|cell| cell.set(true)); |             RUNNING.with(|cell| cell.set(true)); | ||||||
|  | @ -121,9 +130,9 @@ impl Arbiter { | ||||||
|             let _ = System::current() |             let _ = System::current() | ||||||
|                 .sys() |                 .sys() | ||||||
|                 .unbounded_send(SystemCommand::UnregisterArbiter(id)); |                 .unbounded_send(SystemCommand::UnregisterArbiter(id)); | ||||||
|         }); |         }).unwrap_or_else(|err| panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)); | ||||||
| 
 | 
 | ||||||
|         Arbiter(arb_tx2) |         Arbiter{sender: arb_tx2, thread_handle: Some(handle)} | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub(crate) fn run_system() { |     pub(crate) fn run_system() { | ||||||
|  | @ -178,7 +187,7 @@ impl Arbiter { | ||||||
|         F: Future<Output = ()> + Send + Unpin + 'static, |         F: Future<Output = ()> + Send + Unpin + 'static, | ||||||
|     { |     { | ||||||
|         let _ = self |         let _ = self | ||||||
|             .0 |             .sender | ||||||
|             .unbounded_send(ArbiterCommand::Execute(Box::new(future))); |             .unbounded_send(ArbiterCommand::Execute(Box::new(future))); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -189,7 +198,7 @@ impl Arbiter { | ||||||
|         F: FnOnce() + Send + 'static, |         F: FnOnce() + Send + 'static, | ||||||
|     { |     { | ||||||
|         let _ = self |         let _ = self | ||||||
|             .0 |             .sender | ||||||
|             .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { |             .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { | ||||||
|                 f(); |                 f(); | ||||||
|             }))); |             }))); | ||||||
|  | @ -205,7 +214,7 @@ impl Arbiter { | ||||||
|     { |     { | ||||||
|         let (tx, rx) = channel(); |         let (tx, rx) = channel(); | ||||||
|         let _ = self |         let _ = self | ||||||
|             .0 |             .sender | ||||||
|             .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { |             .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { | ||||||
|                 if !tx.is_canceled() { |                 if !tx.is_canceled() { | ||||||
|                     let _ = tx.send(f()); |                     let _ = tx.send(f()); | ||||||
|  | @ -257,6 +266,20 @@ impl Arbiter { | ||||||
|             f(item) |             f(item) | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self { | ||||||
|  |         Self{sender, thread_handle: None} | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Wait for the event loop to stop by joining the underlying thread (if have Some).
 | ||||||
|  |     pub fn join(&mut self) -> thread::Result<()>{ | ||||||
|  |         if let Some(thread_handle) = self.thread_handle.take() { | ||||||
|  |             thread_handle.join() | ||||||
|  |         } | ||||||
|  |         else { | ||||||
|  |             Ok(()) | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| struct ArbiterController { | struct ArbiterController { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue