Move sockets to Accept

This commit is contained in:
fakeshadow 2021-04-25 17:07:23 +08:00
parent 40544d01ae
commit a19cc228aa
1 changed files with 48 additions and 68 deletions

View File

@ -77,6 +77,7 @@ impl AcceptLoop {
struct Accept { struct Accept {
poll: Poll, poll: Poll,
waker_rx: WakerRx, waker_rx: WakerRx,
sockets: Box<[ServerSocketInfo]>,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
next: usize, next: usize,
@ -174,7 +175,7 @@ impl Accept {
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let mut sockets = socks let sockets = socks
.into_iter() .into_iter()
.map(|(token, mut lst)| { .map(|(token, mut lst)| {
// Start listening for incoming connections // Start listening for incoming connections
@ -188,7 +189,7 @@ impl Accept {
timeout: None, timeout: None,
} }
}) })
.collect::<Vec<_>>(); .collect();
let mut avail = Availability::default(); let mut avail = Availability::default();
@ -198,6 +199,7 @@ impl Accept {
let mut accept = Accept { let mut accept = Accept {
poll, poll,
waker_rx, waker_rx,
sockets,
handles, handles,
srv, srv,
next: 0, next: 0,
@ -209,49 +211,49 @@ impl Accept {
let waker = waker::from_registry(accept.poll.registry()).unwrap(); let waker = waker::from_registry(accept.poll.registry()).unwrap();
let cx = &mut Context::from_waker(&waker); let cx = &mut Context::from_waker(&waker);
accept.poll_with(&mut sockets, cx); accept.poll(cx);
}) })
.unwrap(); .unwrap();
} }
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) { fn poll(&mut self, cx: &mut Context<'_>) {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
// poll waker channel once and register the context/waker. // poll waker channel once and register the context/waker.
let exit = self.poll_waker(sockets, cx); let exit = self.poll_waker(cx);
if exit { if exit {
info!("Accept is stopped."); info!("Accept is stopped.");
return; return;
} }
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { match self.poll.poll(&mut events, None) {
match e.kind() { Ok(_) => {}
io::ErrorKind::Interrupted => continue, Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e), Err(e) => panic!("Poll error: {}", e),
}
} }
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();
match token { match token {
WAKER_TOKEN => { WAKER_TOKEN => {
let exit = self.poll_waker(sockets, cx); let exit = self.poll_waker(cx);
if exit { if exit {
info!("Accept is stopped."); info!("Accept is stopped.");
return; return;
} }
} }
_ => { _ => {
println!("polling event");
let token = usize::from(token); let token = usize::from(token);
self.accept(sockets, token); self.accept(token);
} }
} }
} }
} }
} }
fn poll_waker(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) -> bool { fn poll_waker(&mut self, cx: &mut Context<'_>) -> bool {
// This is a loop because interests for command from previous version was // This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown // a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue. // if it's necessary/good practice to actively drain the waker queue.
@ -261,7 +263,7 @@ impl Accept {
self.avail.set_available(idx, true); self.avail.set_available(idx, true);
if !self.paused { if !self.paused {
self.accept_all(sockets); self.accept_all();
} }
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
@ -270,29 +272,26 @@ impl Accept {
self.handles.push(handle); self.handles.push(handle);
if !self.paused { if !self.paused {
self.accept_all(sockets); self.accept_all();
} }
} }
Some(WakerInterest::Pause) => { Some(WakerInterest::Pause) => {
if !self.paused { if !self.paused {
self.paused = true; self.paused = true;
self.deregister_all(sockets); self.deregister_all();
} }
} }
Some(WakerInterest::Resume) => { Some(WakerInterest::Resume) => {
if self.paused { if self.paused {
self.paused = false; self.paused = false;
self.register_all();
sockets.iter_mut().for_each(|info| { self.accept_all();
self.register_logged(info);
});
self.accept_all(sockets);
} }
} }
Some(WakerInterest::Stop) | None => { Some(WakerInterest::Stop) | None => {
if !self.paused { if !self.paused {
self.deregister_all(sockets); self.deregister_all();
} }
return true; return true;
@ -303,47 +302,20 @@ impl Accept {
false false
} }
#[cfg(not(target_os = "windows"))] fn register_all(&mut self) {
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { let reg = self.poll.registry();
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
}
#[cfg(target_os = "windows")] self.sockets.iter_mut().for_each(|info| {
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { let token = MioToken(info.token);
// On windows, calling register without deregister cause an error. match reg.register(&mut info.lst, token, Interest::READABLE) {
// See https://github.com/actix/actix-web/issues/905 Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
// Calling reregister seems to fix the issue. Err(e) => error!("Can not register server socket {}", e),
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| {
self.poll
.registry()
.reregister(&mut info.lst, token, Interest::READABLE)
})
}
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => {
error!("Can not deregister server socket {}", e)
} }
} })
} }
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { fn deregister_all(&mut self) {
let reg = self.poll.registry();
// This is a best effort implementation with following limitation: // This is a best effort implementation with following limitation:
// //
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout // Every ServerSocketInfo with associate timeout will be skipped and it's timeout
@ -352,14 +324,17 @@ impl Accept {
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
// before expected timing. // before expected timing.
sockets self.sockets
.iter_mut() .iter_mut()
// Take all timeout. // Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards. // This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|info| (info.timeout.take(), info)) .map(|info| (info.timeout.take(), &mut info.lst))
// Socket info with a timeout is already deregistered so skip them. // Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none()) .filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info)); .for_each(|(_, lst)| match reg.deregister(lst) {
Ok(_) => info!("Paused accepting connections on {}", lst.local_addr()),
Err(e) => error!("Can not deregister server socket {}", e),
});
} }
// Send connection to worker and handle error. // Send connection to worker and handle error.
@ -419,9 +394,9 @@ impl Accept {
} }
} }
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { fn accept(&mut self, token: usize) {
while self.avail.available() { while self.avail.available() {
let info = &mut sockets[token]; let info = &mut self.sockets[token];
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {
@ -434,7 +409,12 @@ impl Accept {
error!("Error accepting connection: {}", e); error!("Error accepting connection: {}", e);
// deregister listener temporary // deregister listener temporary
self.deregister_logged(info); match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => {
info!("Paused accepting connections on {}", info.lst.local_addr())
}
Err(e) => error!("Can not deregister server socket {}", e),
};
// sleep after error. write the timeout to socket info as later // sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's // the poll would need it mark which socket and when it's
@ -447,13 +427,13 @@ impl Accept {
} }
} }
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { fn accept_all(&mut self) {
sockets self.sockets
.iter_mut() .iter_mut()
.map(|info| info.token) .map(|info| info.token)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter() .into_iter()
.for_each(|idx| self.accept(sockets, idx)) .for_each(|idx| self.accept(idx))
} }
#[inline(always)] #[inline(always)]