aboutsummaryrefslogtreecommitdiff
path: root/src/env.rs
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2020-10-14 15:21:00 +0200
committerJeff Vander Stoep <jeffv@google.com>2020-10-14 15:21:00 +0200
commit761577d44d5d8104fb718fbab47ca87353e75f9f (patch)
tree5647c69210cf8d9a287ac689476f45513371aefc /src/env.rs
parent352363d43b57e1a7c239098b358919d113aeb81a (diff)
downloadgrpcio-761577d44d5d8104fb718fbab47ca87353e75f9f.tar.gz
Import grpcio 0.6.0
And add metadata files using the following command: get_rust_pkg.py --add3prf -v grpcio-0.6.0 -o grpcio Test: none Change-Id: I53cc0feb5c9d24eacb62331b968cab4ec85f60a6
Diffstat (limited to 'src/env.rs')
-rw-r--r--src/env.rs174
1 files changed, 174 insertions, 0 deletions
diff --git a/src/env.rs b/src/env.rs
new file mode 100644
index 0000000..8bad45e
--- /dev/null
+++ b/src/env.rs
@@ -0,0 +1,174 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc;
+use std::sync::Arc;
+use std::thread::{Builder as ThreadBuilder, JoinHandle};
+
+use crate::grpc_sys;
+
+use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
+use crate::task::CallTag;
+
+// event loop
+fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
+ let cq = Arc::new(CompletionQueueHandle::new());
+ let worker_info = Arc::new(WorkQueue::new());
+ let cq = CompletionQueue::new(cq, worker_info);
+ tx.send(cq.clone()).expect("send back completion queue");
+ loop {
+ let e = cq.next();
+ match e.type_ {
+ EventType::GRPC_QUEUE_SHUTDOWN => break,
+ // timeout should not happen in theory.
+ EventType::GRPC_QUEUE_TIMEOUT => continue,
+ EventType::GRPC_OP_COMPLETE => {}
+ }
+
+ let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
+
+ tag.resolve(&cq, e.success != 0);
+ while let Some(work) = unsafe { cq.worker.pop_work() } {
+ work.finish();
+ }
+ }
+}
+
+/// [`Environment`] factory in order to configure the properties.
+pub struct EnvBuilder {
+ cq_count: usize,
+ name_prefix: Option<String>,
+}
+
+impl EnvBuilder {
+ /// Initialize a new [`EnvBuilder`].
+ pub fn new() -> EnvBuilder {
+ EnvBuilder {
+ cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
+ name_prefix: None,
+ }
+ }
+
+ /// Set the number of completion queues and polling threads. Each thread polls
+ /// one completion queue.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `count` is 0.
+ pub fn cq_count(mut self, count: usize) -> EnvBuilder {
+ assert!(count > 0);
+ self.cq_count = count;
+ self
+ }
+
+ /// Set the thread name prefix of each polling thread.
+ pub fn name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder {
+ self.name_prefix = Some(prefix.into());
+ self
+ }
+
+ /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
+ pub fn build(self) -> Environment {
+ unsafe {
+ grpc_sys::grpc_init();
+ }
+ let mut cqs = Vec::with_capacity(self.cq_count);
+ let mut handles = Vec::with_capacity(self.cq_count);
+ let (tx, rx) = mpsc::channel();
+ for i in 0..self.cq_count {
+ let tx_i = tx.clone();
+ let mut builder = ThreadBuilder::new();
+ if let Some(ref prefix) = self.name_prefix {
+ builder = builder.name(format!("{}-{}", prefix, i));
+ }
+ let handle = builder.spawn(move || poll_queue(tx_i)).unwrap();
+ handles.push(handle);
+ }
+ for _ in 0..self.cq_count {
+ cqs.push(rx.recv().unwrap());
+ }
+
+ Environment {
+ cqs,
+ idx: AtomicUsize::new(0),
+ _handles: handles,
+ }
+ }
+}
+
+/// An object that used to control concurrency and start gRPC event loop.
+pub struct Environment {
+ cqs: Vec<CompletionQueue>,
+ idx: AtomicUsize,
+ _handles: Vec<JoinHandle<()>>,
+}
+
+impl Environment {
+ /// Initialize gRPC and create a thread pool to poll completion queue. The thread pool size
+ /// and the number of completion queue is specified by `cq_count`. Each thread polls one
+ /// completion queue.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `cq_count` is 0.
+ pub fn new(cq_count: usize) -> Environment {
+ assert!(cq_count > 0);
+ EnvBuilder::new()
+ .name_prefix("grpc-poll")
+ .cq_count(cq_count)
+ .build()
+ }
+
+ /// Get all the created completion queues.
+ pub fn completion_queues(&self) -> &[CompletionQueue] {
+ self.cqs.as_slice()
+ }
+
+ /// Pick an arbitrary completion queue.
+ pub fn pick_cq(&self) -> CompletionQueue {
+ let idx = self.idx.fetch_add(1, Ordering::Relaxed);
+ self.cqs[idx % self.cqs.len()].clone()
+ }
+}
+
+impl Drop for Environment {
+ fn drop(&mut self) {
+ for cq in self.completion_queues() {
+ // it's safe to shutdown more than once.
+ cq.shutdown()
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_basic_loop() {
+ let mut env = Environment::new(2);
+
+ let q1 = env.pick_cq();
+ let q2 = env.pick_cq();
+ let q3 = env.pick_cq();
+ let cases = vec![(&q1, &q3, true), (&q1, &q2, false)];
+ for (lq, rq, is_eq) in cases {
+ let lq_ref = lq.borrow().unwrap();
+ let rq_ref = rq.borrow().unwrap();
+ if is_eq {
+ assert_eq!(lq_ref.as_ptr(), rq_ref.as_ptr());
+ } else {
+ assert_ne!(lq_ref.as_ptr(), rq_ref.as_ptr());
+ }
+ }
+
+ assert_eq!(env.completion_queues().len(), 2);
+ for cq in env.completion_queues() {
+ cq.shutdown();
+ }
+
+ for handle in env._handles.drain(..) {
+ handle.join().unwrap();
+ }
+ }
+}