diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2020-10-14 15:21:00 +0200 |
---|---|---|
committer | Jeff Vander Stoep <jeffv@google.com> | 2020-10-14 15:21:00 +0200 |
commit | 761577d44d5d8104fb718fbab47ca87353e75f9f (patch) | |
tree | 5647c69210cf8d9a287ac689476f45513371aefc /src/env.rs | |
parent | 352363d43b57e1a7c239098b358919d113aeb81a (diff) | |
download | grpcio-761577d44d5d8104fb718fbab47ca87353e75f9f.tar.gz |
Import grpcio 0.6.0
And add metadata files using the following command:
get_rust_pkg.py --add3prf -v grpcio-0.6.0 -o grpcio
Test: none
Change-Id: I53cc0feb5c9d24eacb62331b968cab4ec85f60a6
Diffstat (limited to 'src/env.rs')
-rw-r--r-- | src/env.rs | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/src/env.rs b/src/env.rs new file mode 100644 index 0000000..8bad45e --- /dev/null +++ b/src/env.rs @@ -0,0 +1,174 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::sync::Arc; +use std::thread::{Builder as ThreadBuilder, JoinHandle}; + +use crate::grpc_sys; + +use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue}; +use crate::task::CallTag; + +// event loop +fn poll_queue(tx: mpsc::Sender<CompletionQueue>) { + let cq = Arc::new(CompletionQueueHandle::new()); + let worker_info = Arc::new(WorkQueue::new()); + let cq = CompletionQueue::new(cq, worker_info); + tx.send(cq.clone()).expect("send back completion queue"); + loop { + let e = cq.next(); + match e.type_ { + EventType::GRPC_QUEUE_SHUTDOWN => break, + // timeout should not happen in theory. + EventType::GRPC_QUEUE_TIMEOUT => continue, + EventType::GRPC_OP_COMPLETE => {} + } + + let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) }; + + tag.resolve(&cq, e.success != 0); + while let Some(work) = unsafe { cq.worker.pop_work() } { + work.finish(); + } + } +} + +/// [`Environment`] factory in order to configure the properties. +pub struct EnvBuilder { + cq_count: usize, + name_prefix: Option<String>, +} + +impl EnvBuilder { + /// Initialize a new [`EnvBuilder`]. + pub fn new() -> EnvBuilder { + EnvBuilder { + cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize }, + name_prefix: None, + } + } + + /// Set the number of completion queues and polling threads. Each thread polls + /// one completion queue. + /// + /// # Panics + /// + /// This method will panic if `count` is 0. + pub fn cq_count(mut self, count: usize) -> EnvBuilder { + assert!(count > 0); + self.cq_count = count; + self + } + + /// Set the thread name prefix of each polling thread. + pub fn name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder { + self.name_prefix = Some(prefix.into()); + self + } + + /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library. + pub fn build(self) -> Environment { + unsafe { + grpc_sys::grpc_init(); + } + let mut cqs = Vec::with_capacity(self.cq_count); + let mut handles = Vec::with_capacity(self.cq_count); + let (tx, rx) = mpsc::channel(); + for i in 0..self.cq_count { + let tx_i = tx.clone(); + let mut builder = ThreadBuilder::new(); + if let Some(ref prefix) = self.name_prefix { + builder = builder.name(format!("{}-{}", prefix, i)); + } + let handle = builder.spawn(move || poll_queue(tx_i)).unwrap(); + handles.push(handle); + } + for _ in 0..self.cq_count { + cqs.push(rx.recv().unwrap()); + } + + Environment { + cqs, + idx: AtomicUsize::new(0), + _handles: handles, + } + } +} + +/// An object that used to control concurrency and start gRPC event loop. +pub struct Environment { + cqs: Vec<CompletionQueue>, + idx: AtomicUsize, + _handles: Vec<JoinHandle<()>>, +} + +impl Environment { + /// Initialize gRPC and create a thread pool to poll completion queue. The thread pool size + /// and the number of completion queue is specified by `cq_count`. Each thread polls one + /// completion queue. + /// + /// # Panics + /// + /// This method will panic if `cq_count` is 0. + pub fn new(cq_count: usize) -> Environment { + assert!(cq_count > 0); + EnvBuilder::new() + .name_prefix("grpc-poll") + .cq_count(cq_count) + .build() + } + + /// Get all the created completion queues. + pub fn completion_queues(&self) -> &[CompletionQueue] { + self.cqs.as_slice() + } + + /// Pick an arbitrary completion queue. + pub fn pick_cq(&self) -> CompletionQueue { + let idx = self.idx.fetch_add(1, Ordering::Relaxed); + self.cqs[idx % self.cqs.len()].clone() + } +} + +impl Drop for Environment { + fn drop(&mut self) { + for cq in self.completion_queues() { + // it's safe to shutdown more than once. + cq.shutdown() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic_loop() { + let mut env = Environment::new(2); + + let q1 = env.pick_cq(); + let q2 = env.pick_cq(); + let q3 = env.pick_cq(); + let cases = vec![(&q1, &q3, true), (&q1, &q2, false)]; + for (lq, rq, is_eq) in cases { + let lq_ref = lq.borrow().unwrap(); + let rq_ref = rq.borrow().unwrap(); + if is_eq { + assert_eq!(lq_ref.as_ptr(), rq_ref.as_ptr()); + } else { + assert_ne!(lq_ref.as_ptr(), rq_ref.as_ptr()); + } + } + + assert_eq!(env.completion_queues().len(), 2); + for cq in env.completion_queues() { + cq.shutdown(); + } + + for handle in env._handles.drain(..) { + handle.join().unwrap(); + } + } +} |