use crate::{Interest, Token}; use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP}; use log::error; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, i32, io, ptr}; /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); #[derive(Debug)] pub struct Selector { #[cfg(debug_assertions)] id: usize, ep: RawFd, #[cfg(debug_assertions)] has_waker: AtomicBool, } impl Selector { pub fn new() -> io::Result { // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API < // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform, // so we use it instead. #[cfg(target_os = "android")] let flag = libc::O_CLOEXEC; #[cfg(not(target_os = "android"))] let flag = libc::EPOLL_CLOEXEC; syscall!(epoll_create1(flag)).map(|ep| Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), ep, #[cfg(debug_assertions)] has_waker: AtomicBool::new(false), }) } pub fn try_clone(&self) -> io::Result { syscall!(dup(self.ep)).map(|ep| Selector { // It's the same selector, so we use the same id. #[cfg(debug_assertions)] id: self.id, ep, #[cfg(debug_assertions)] has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits // architectures. The magic number is the same constant used by libuv. #[cfg(target_pointer_width = "32")] const MAX_SAFE_TIMEOUT: u128 = 1789569; #[cfg(not(target_pointer_width = "32"))] const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128; let timeout = timeout .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int) .unwrap_or(-1); events.clear(); syscall!(epoll_wait( self.ep, events.as_mut_ptr(), events.capacity() as i32, timeout, )) .map(|n_events| { // This is safe because `epoll_wait` ensures that `n_events` are // assigned. unsafe { events.set_len(n_events as usize) }; }) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { let mut event = libc::epoll_event { events: interests_to_epoll(interests), u64: usize::from(token) as u64, }; syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ()) } pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { let mut event = libc::epoll_event { events: interests_to_epoll(interests), u64: usize::from(token) as u64, }; syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ()) } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ()) } #[cfg(debug_assertions)] pub fn register_waker(&self) -> bool { self.has_waker.swap(true, Ordering::AcqRel) } } cfg_io_source! { impl Selector { #[cfg(debug_assertions)] pub fn id(&self) -> usize { self.id } } } impl AsRawFd for Selector { fn as_raw_fd(&self) -> RawFd { self.ep } } impl Drop for Selector { fn drop(&mut self) { if let Err(err) = syscall!(close(self.ep)) { error!("error closing epoll: {}", err); } } } fn interests_to_epoll(interests: Interest) -> u32 { let mut kind = EPOLLET; if interests.is_readable() { kind = kind | EPOLLIN | EPOLLRDHUP; } if interests.is_writable() { kind |= EPOLLOUT; } kind as u32 } pub type Event = libc::epoll_event; pub type Events = Vec; pub mod event { use std::fmt; use crate::sys::Event; use crate::Token; pub fn token(event: &Event) -> Token { Token(event.u64 as usize) } pub fn is_readable(event: &Event) -> bool { (event.events as libc::c_int & libc::EPOLLIN) != 0 || (event.events as libc::c_int & libc::EPOLLPRI) != 0 } pub fn is_writable(event: &Event) -> bool { (event.events as libc::c_int & libc::EPOLLOUT) != 0 } pub fn is_error(event: &Event) -> bool { (event.events as libc::c_int & libc::EPOLLERR) != 0 } pub fn is_read_closed(event: &Event) -> bool { // Both halves of the socket have closed event.events as libc::c_int & libc::EPOLLHUP != 0 // Socket has received FIN or called shutdown(SHUT_RD) || (event.events as libc::c_int & libc::EPOLLIN != 0 && event.events as libc::c_int & libc::EPOLLRDHUP != 0) } pub fn is_write_closed(event: &Event) -> bool { // Both halves of the socket have closed event.events as libc::c_int & libc::EPOLLHUP != 0 // Unix pipe write end has closed || (event.events as libc::c_int & libc::EPOLLOUT != 0 && event.events as libc::c_int & libc::EPOLLERR != 0) // The other side (read end) of a Unix pipe has closed. || event.events as libc::c_int == libc::EPOLLERR } pub fn is_priority(event: &Event) -> bool { (event.events as libc::c_int & libc::EPOLLPRI) != 0 } pub fn is_aio(_: &Event) -> bool { // Not supported in the kernel, only in libc. false } pub fn is_lio(_: &Event) -> bool { // Not supported. false } pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { #[allow(clippy::trivially_copy_pass_by_ref)] fn check_events(got: &u32, want: &libc::c_int) -> bool { (*got as libc::c_int & want) != 0 } debug_detail!( EventsDetails(u32), check_events, libc::EPOLLIN, libc::EPOLLPRI, libc::EPOLLOUT, libc::EPOLLRDNORM, libc::EPOLLRDBAND, libc::EPOLLWRNORM, libc::EPOLLWRBAND, libc::EPOLLMSG, libc::EPOLLERR, libc::EPOLLHUP, libc::EPOLLET, libc::EPOLLRDHUP, libc::EPOLLONESHOT, #[cfg(any(target_os = "linux", target_os = "solaris"))] libc::EPOLLEXCLUSIVE, #[cfg(any(target_os = "android", target_os = "linux"))] libc::EPOLLWAKEUP, libc::EPOLL_CLOEXEC, ); // Can't reference fields in packed structures. let e_u64 = event.u64; f.debug_struct("epoll_event") .field("events", &EventsDetails(event.events)) .field("u64", &e_u64) .finish() } } #[cfg(target_os = "android")] #[test] fn assert_close_on_exec_flag() { // This assertion need to be true for Selector::new. assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC); }