aboutsummaryrefslogtreecommitdiff
path: root/examples/tcp_listenfd_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/tcp_listenfd_server.rs')
-rw-r--r--examples/tcp_listenfd_server.rs209
1 files changed, 209 insertions, 0 deletions
diff --git a/examples/tcp_listenfd_server.rs b/examples/tcp_listenfd_server.rs
new file mode 100644
index 0000000..941d7f0
--- /dev/null
+++ b/examples/tcp_listenfd_server.rs
@@ -0,0 +1,209 @@
+// You can run this example from the root of the mio repo:
+// cargo run --example tcp_listenfd_server --features="os-poll net"
+// or with wasi:
+// cargo +nightly build --target wasm32-wasi --example tcp_listenfd_server --features="os-poll net"
+// wasmtime run --tcplisten 127.0.0.1:9000 --env 'LISTEN_FDS=1' target/wasm32-wasi/debug/examples/tcp_listenfd_server.wasm
+
+use mio::event::Event;
+use mio::net::{TcpListener, TcpStream};
+use mio::{Events, Interest, Poll, Registry, Token};
+use std::collections::HashMap;
+use std::io::{self, Read, Write};
+use std::str::from_utf8;
+
+// Setup some tokens to allow us to identify which event is for which socket.
+const SERVER: Token = Token(0);
+
+// Some data we'll send over the connection.
+const DATA: &[u8] = b"Hello world!\n";
+
+#[cfg(not(windows))]
+fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
+ #[cfg(unix)]
+ use std::os::unix::io::FromRawFd;
+ #[cfg(target_os = "wasi")]
+ use std::os::wasi::io::FromRawFd;
+
+ let stdlistener = unsafe { std::net::TcpListener::from_raw_fd(3) };
+ stdlistener.set_nonblocking(true).unwrap();
+ Some(stdlistener)
+}
+
+#[cfg(windows)]
+fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
+ // Windows does not support `LISTEN_FDS`
+ None
+}
+
+fn main() -> io::Result<()> {
+ env_logger::init();
+
+ std::env::var("LISTEN_FDS").expect("LISTEN_FDS environment variable unset");
+
+ // Create a poll instance.
+ let mut poll = Poll::new()?;
+ // Create storage for events.
+ let mut events = Events::with_capacity(128);
+
+ // Setup the TCP server socket.
+ let mut server = {
+ let stdlistener = get_first_listen_fd_listener().unwrap();
+ println!("Using preopened socket FD 3");
+ println!("You can connect to the server using `nc`:");
+ match stdlistener.local_addr() {
+ Ok(a) => println!(" $ nc {} {}", a.ip(), a.port()),
+ Err(_) => println!(" $ nc <IP> <PORT>"),
+ }
+ println!("You'll see our welcome message and anything you type will be printed here.");
+ TcpListener::from_std(stdlistener)
+ };
+
+ // Register the server with poll we can receive events for it.
+ poll.registry()
+ .register(&mut server, SERVER, Interest::READABLE)?;
+
+ // Map of `Token` -> `TcpStream`.
+ let mut connections = HashMap::new();
+ // Unique token for each incoming connection.
+ let mut unique_token = Token(SERVER.0 + 1);
+
+ loop {
+ poll.poll(&mut events, None)?;
+
+ for event in events.iter() {
+ match event.token() {
+ SERVER => loop {
+ // Received an event for the TCP server socket, which
+ // indicates we can accept an connection.
+ let (mut connection, address) = match server.accept() {
+ Ok((connection, address)) => (connection, address),
+ Err(ref e) if would_block(e) => {
+ // If we get a `WouldBlock` error we know our
+ // listener has no more incoming connections queued,
+ // so we can return to polling and wait for some
+ // more.
+ break;
+ }
+ Err(e) => {
+ // If it was any other kind of error, something went
+ // wrong and we terminate with an error.
+ return Err(e);
+ }
+ };
+
+ println!("Accepted connection from: {}", address);
+
+ let token = next(&mut unique_token);
+ poll.registry()
+ .register(&mut connection, token, Interest::WRITABLE)?;
+
+ connections.insert(token, connection);
+ },
+ token => {
+ // Maybe received an event for a TCP connection.
+ let done = if let Some(connection) = connections.get_mut(&token) {
+ handle_connection_event(poll.registry(), connection, event)?
+ } else {
+ // Sporadic events happen, we can safely ignore them.
+ false
+ };
+ if done {
+ if let Some(mut connection) = connections.remove(&token) {
+ poll.registry().deregister(&mut connection)?;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+fn next(current: &mut Token) -> Token {
+ let next = current.0;
+ current.0 += 1;
+ Token(next)
+}
+
+/// Returns `true` if the connection is done.
+fn handle_connection_event(
+ registry: &Registry,
+ connection: &mut TcpStream,
+ event: &Event,
+) -> io::Result<bool> {
+ if event.is_writable() {
+ // We can (maybe) write to the connection.
+ match connection.write(DATA) {
+ // We want to write the entire `DATA` buffer in a single go. If we
+ // write less we'll return a short write error (same as
+ // `io::Write::write_all` does).
+ Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
+ Ok(_) => {
+ // After we've written something we'll reregister the connection
+ // to only respond to readable events.
+ registry.reregister(connection, event.token(), Interest::READABLE)?
+ }
+ // Would block "errors" are the OS's way of saying that the
+ // connection is not actually ready to perform this I/O operation.
+ Err(ref err) if would_block(err) => {}
+ // Got interrupted (how rude!), we'll try again.
+ Err(ref err) if interrupted(err) => {
+ return handle_connection_event(registry, connection, event)
+ }
+ // Other errors we'll consider fatal.
+ Err(err) => return Err(err),
+ }
+ }
+
+ if event.is_readable() {
+ let mut connection_closed = false;
+ let mut received_data = vec![0; 4096];
+ let mut bytes_read = 0;
+ // We can (maybe) read from the connection.
+ loop {
+ match connection.read(&mut received_data[bytes_read..]) {
+ Ok(0) => {
+ // Reading 0 bytes means the other side has closed the
+ // connection or is done writing, then so are we.
+ connection_closed = true;
+ break;
+ }
+ Ok(n) => {
+ bytes_read += n;
+ if bytes_read == received_data.len() {
+ received_data.resize(received_data.len() + 1024, 0);
+ }
+ }
+ // Would block "errors" are the OS's way of saying that the
+ // connection is not actually ready to perform this I/O operation.
+ Err(ref err) if would_block(err) => break,
+ Err(ref err) if interrupted(err) => continue,
+ // Other errors we'll consider fatal.
+ Err(err) => return Err(err),
+ }
+ }
+
+ if bytes_read != 0 {
+ let received_data = &received_data[..bytes_read];
+ if let Ok(str_buf) = from_utf8(received_data) {
+ println!("Received data: {}", str_buf.trim_end());
+ } else {
+ println!("Received (none UTF-8) data: {:?}", received_data);
+ }
+ }
+
+ if connection_closed {
+ println!("Connection closed");
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+}
+
+fn would_block(err: &io::Error) -> bool {
+ err.kind() == io::ErrorKind::WouldBlock
+}
+
+fn interrupted(err: &io::Error) -> bool {
+ err.kind() == io::ErrorKind::Interrupted
+}