aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/builder.rs
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2020-10-05 08:16:15 -0700
committerJoel Galenson <jgalenson@google.com>2020-10-05 08:16:15 -0700
commitf03b3ba785a6d336884bfc525046906f8c2a9904 (patch)
tree14e2bd707d8d152ea0476ec9e686deb2a2f55b34 /src/runtime/builder.rs
parent40b8b369b069afb314a9d4bb92be1bdd038979f8 (diff)
downloadtokio-f03b3ba785a6d336884bfc525046906f8c2a9904.tar.gz
Import tokio-0.2.22
Test: None Change-Id: Iea7ee5e62819c9b16dbfad05a6146775df72506a
Diffstat (limited to 'src/runtime/builder.rs')
-rw-r--r--src/runtime/builder.rs522
1 files changed, 522 insertions, 0 deletions
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
new file mode 100644
index 0000000..fad72c7
--- /dev/null
+++ b/src/runtime/builder.rs
@@ -0,0 +1,522 @@
+use crate::runtime::handle::Handle;
+use crate::runtime::shell::Shell;
+use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
+
+use std::fmt;
+#[cfg(not(loom))]
+use std::sync::Arc;
+
+/// Builds Tokio Runtime with custom configuration values.
+///
+/// Methods can be chained in order to set the configuration values. The
+/// Runtime is constructed by calling [`build`].
+///
+/// New instances of `Builder` are obtained via [`Builder::new`].
+///
+/// See function level documentation for details on the various configuration
+/// settings.
+///
+/// [`build`]: method@Self::build
+/// [`Builder::new`]: method@Self::new
+///
+/// # Examples
+///
+/// ```
+/// use tokio::runtime::Builder;
+///
+/// fn main() {
+/// // build runtime
+/// let runtime = Builder::new()
+/// .threaded_scheduler()
+/// .core_threads(4)
+/// .thread_name("my-custom-name")
+/// .thread_stack_size(3 * 1024 * 1024)
+/// .build()
+/// .unwrap();
+///
+/// // use runtime ...
+/// }
+/// ```
+pub struct Builder {
+ /// The task execution model to use.
+ kind: Kind,
+
+ /// Whether or not to enable the I/O driver
+ enable_io: bool,
+
+ /// Whether or not to enable the time driver
+ enable_time: bool,
+
+ /// The number of worker threads, used by Runtime.
+ ///
+ /// Only used when not using the current-thread executor.
+ core_threads: Option<usize>,
+
+ /// Cap on thread usage.
+ max_threads: usize,
+
+ /// Name used for threads spawned by the runtime.
+ pub(super) thread_name: String,
+
+ /// Stack size used for threads spawned by the runtime.
+ pub(super) thread_stack_size: Option<usize>,
+
+ /// Callback to run after each thread starts.
+ pub(super) after_start: Option<Callback>,
+
+ /// To run before each worker thread stops
+ pub(super) before_stop: Option<Callback>,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum Kind {
+ Shell,
+ #[cfg(feature = "rt-core")]
+ Basic,
+ #[cfg(feature = "rt-threaded")]
+ ThreadPool,
+}
+
+impl Builder {
+ /// Returns a new runtime builder initialized with default configuration
+ /// values.
+ ///
+ /// Configuration methods can be chained on the return value.
+ pub fn new() -> Builder {
+ Builder {
+ // No task execution by default
+ kind: Kind::Shell,
+
+ // I/O defaults to "off"
+ enable_io: false,
+
+ // Time defaults to "off"
+ enable_time: false,
+
+ // Default to lazy auto-detection (one thread per CPU core)
+ core_threads: None,
+
+ max_threads: 512,
+
+ // Default thread name
+ thread_name: "tokio-runtime-worker".into(),
+
+ // Do not set a stack size by default
+ thread_stack_size: None,
+
+ // No worker thread callbacks
+ after_start: None,
+ before_stop: None,
+ }
+ }
+
+ /// Enables both I/O and time drivers.
+ ///
+ /// Doing this is a shorthand for calling `enable_io` and `enable_time`
+ /// individually. If additional components are added to Tokio in the future,
+ /// `enable_all` will include these future components.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_all(&mut self) -> &mut Self {
+ #[cfg(feature = "io-driver")]
+ self.enable_io();
+ #[cfg(feature = "time")]
+ self.enable_time();
+
+ self
+ }
+
+ #[deprecated(note = "In future will be replaced by core_threads method")]
+ /// Sets the maximum number of worker threads for the `Runtime`'s thread pool.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ pub fn num_threads(&mut self, val: usize) -> &mut Self {
+ self.core_threads = Some(val);
+ self
+ }
+
+ /// Sets the core number of worker threads for the `Runtime`'s thread pool.
+ ///
+ /// This should be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ ///
+ /// These threads will be always active and running.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .core_threads(4)
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn core_threads(&mut self, val: usize) -> &mut Self {
+ assert_ne!(val, 0, "Core threads cannot be zero");
+ self.core_threads = Some(val);
+ self
+ }
+
+ /// Specifies limit for threads, spawned by the Runtime.
+ ///
+ /// This is number of threads to be used by Runtime, including `core_threads`
+ /// Having `max_threads` less than `core_threads` results in invalid configuration
+ /// when building multi-threaded `Runtime`, which would cause a panic.
+ ///
+ /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
+ ///
+ /// The default value is 512.
+ ///
+ /// When multi-threaded runtime is not used, will act as limit on additional threads.
+ ///
+ /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
+ /// blocking annotations) as `max_threads - core_threads`.
+ pub fn max_threads(&mut self, val: usize) -> &mut Self {
+ assert_ne!(val, 0, "Thread limit cannot be zero");
+ self.max_threads = val;
+ self
+ }
+
+ /// Sets name of threads spawned by the `Runtime`'s thread pool.
+ ///
+ /// The default name is "tokio-runtime-worker".
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let rt = runtime::Builder::new()
+ /// .thread_name("my-pool")
+ /// .build();
+ /// # }
+ /// ```
+ pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
+ self.thread_name = val.into();
+ self
+ }
+
+ /// Sets the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let rt = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .thread_stack_size(32 * 1024)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
+ self.thread_stack_size = Some(val);
+ self
+ }
+
+ /// Executes function `f` after each thread is started but before it starts
+ /// doing work.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let runtime = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .on_thread_start(|| {
+ /// println!("thread started");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Executes function `f` before each thread stops.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let runtime = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .on_thread_stop(|| {
+ /// println!("thread stopping");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Creates the configured `Runtime`.
+ ///
+ /// The returned `ThreadPool` instance is ready to spawn tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Builder;
+ ///
+ /// let mut rt = Builder::new().build().unwrap();
+ ///
+ /// rt.block_on(async {
+ /// println!("Hello from the Tokio runtime");
+ /// });
+ /// ```
+ pub fn build(&mut self) -> io::Result<Runtime> {
+ match self.kind {
+ Kind::Shell => self.build_shell_runtime(),
+ #[cfg(feature = "rt-core")]
+ Kind::Basic => self.build_basic_runtime(),
+ #[cfg(feature = "rt-threaded")]
+ Kind::ThreadPool => self.build_threaded_runtime(),
+ }
+ }
+
+ fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::Kind;
+
+ let clock = time::create_clock();
+
+ // Create I/O driver
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+
+ let spawner = Spawner::Shell;
+
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ Ok(Runtime {
+ kind: Kind::Shell(Shell::new(driver)),
+ handle: Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ },
+ blocking_pool,
+ })
+ }
+}
+
+cfg_io_driver! {
+ impl Builder {
+ /// Enables the I/O driver.
+ ///
+ /// Doing this enables using net, process, signal, and some I/O types on
+ /// the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_io()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_io(&mut self) -> &mut Self {
+ self.enable_io = true;
+ self
+ }
+ }
+}
+
+cfg_time! {
+ impl Builder {
+ /// Enables the time driver.
+ ///
+ /// Doing this enables using `tokio::time` on the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_time()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_time(&mut self) -> &mut Self {
+ self.enable_time = true;
+ self
+ }
+ }
+}
+
+cfg_rt_core! {
+ impl Builder {
+ /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread.
+ ///
+ /// The executor and all necessary drivers will all be run on the current
+ /// thread during [`block_on`] calls.
+ ///
+ /// See also [the module level documentation][1], which has a section on scheduler
+ /// types.
+ ///
+ /// [1]: index.html#runtime-configurations
+ /// [`block_on`]: Runtime::block_on
+ pub fn basic_scheduler(&mut self) -> &mut Self {
+ self.kind = Kind::Basic;
+ self
+ }
+
+ fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::{BasicScheduler, Kind};
+
+ let clock = time::create_clock();
+
+ // Create I/O driver
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+
+ // And now put a single-threaded scheduler on top of the timer. When
+ // there are no futures ready to do something, it'll let the timer or
+ // the reactor to generate some new stimuli for the futures to continue
+ // in their life.
+ let scheduler = BasicScheduler::new(driver);
+ let spawner = Spawner::Basic(scheduler.spawner().clone());
+
+ // Blocking pool
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ Ok(Runtime {
+ kind: Kind::Basic(scheduler),
+ handle: Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ },
+ blocking_pool,
+ })
+ }
+ }
+}
+
+cfg_rt_threaded! {
+ impl Builder {
+ /// Sets runtime to use a multi-threaded scheduler for executing tasks.
+ ///
+ /// See also [the module level documentation][1], which has a section on scheduler
+ /// types.
+ ///
+ /// [1]: index.html#runtime-configurations
+ pub fn threaded_scheduler(&mut self) -> &mut Self {
+ self.kind = Kind::ThreadPool;
+ self
+ }
+
+ fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::loom::sys::num_cpus;
+ use crate::runtime::{Kind, ThreadPool};
+ use crate::runtime::park::Parker;
+ use std::cmp;
+
+ let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
+ assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
+
+ let clock = time::create_clock();
+
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+ let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
+ let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
+
+ // Create the blocking pool
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ // Create the runtime handle
+ let handle = Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ };
+
+ // Spawn the thread pool workers
+ handle.enter(|| launch.launch());
+
+ Ok(Runtime {
+ kind: Kind::ThreadPool(scheduler),
+ handle,
+ blocking_pool,
+ })
+ }
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Builder")
+ .field("kind", &self.kind)
+ .field("core_threads", &self.core_threads)
+ .field("max_threads", &self.max_threads)
+ .field("thread_name", &self.thread_name)
+ .field("thread_stack_size", &self.thread_stack_size)
+ .field("after_start", &self.after_start.as_ref().map(|_| "..."))
+ .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
+ .finish()
+ }
+}