diff options
Diffstat (limited to 'src/cq.rs')
-rw-r--r-- | src/cq.rs | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/src/cq.rs b/src/cq.rs new file mode 100644 index 0000000..60b6bb3 --- /dev/null +++ b/src/cq.rs @@ -0,0 +1,216 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::ptr; +use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::Arc; +use std::thread::{self, ThreadId}; + +use crate::error::{Error, Result}; +use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue}; +use crate::task::UnfinishedWork; + +pub use crate::grpc_sys::grpc_completion_type as EventType; +pub use crate::grpc_sys::grpc_event as Event; + +/// `CompletionQueueHandle` enable notification of the completion of asynchronous actions. +pub struct CompletionQueueHandle { + cq: *mut grpc_completion_queue, + // When `ref_cnt` < 0, a shutdown is pending, completion queue should not + // accept requests anymore; when `ref_cnt` == 0, completion queue should + // be shutdown; When `ref_cnt` > 0, completion queue can accept requests + // and should not be shutdown. + ref_cnt: AtomicIsize, +} + +unsafe impl Sync for CompletionQueueHandle {} +unsafe impl Send for CompletionQueueHandle {} + +impl CompletionQueueHandle { + pub fn new() -> CompletionQueueHandle { + CompletionQueueHandle { + cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) }, + ref_cnt: AtomicIsize::new(1), + } + } + + fn add_ref(&self) -> Result<()> { + loop { + let cnt = self.ref_cnt.load(Ordering::SeqCst); + if cnt <= 0 { + // `shutdown` has been called, reject any requests. + return Err(Error::QueueShutdown); + } + let new_cnt = cnt + 1; + if cnt + == self + .ref_cnt + .compare_and_swap(cnt, new_cnt, Ordering::SeqCst) + { + return Ok(()); + } + } + } + + fn unref(&self) { + let shutdown = loop { + let cnt = self.ref_cnt.load(Ordering::SeqCst); + // If `shutdown` is not called, `cnt` > 0, so minus 1 to unref. + // If `shutdown` is called, `cnt` < 0, so plus 1 to unref. + let new_cnt = cnt - cnt.signum(); + if cnt + == self + .ref_cnt + .compare_and_swap(cnt, new_cnt, Ordering::SeqCst) + { + break new_cnt == 0; + } + }; + if shutdown { + unsafe { + grpc_sys::grpc_completion_queue_shutdown(self.cq); + } + } + } + + fn shutdown(&self) { + let shutdown = loop { + let cnt = self.ref_cnt.load(Ordering::SeqCst); + if cnt <= 0 { + // `shutdown` is called, skipped. + return; + } + // Make cnt negative to indicate that `shutdown` has been called. + // Because `cnt` is initialized to 1, so minus 1 to make it reach + // toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`. + let new_cnt = -cnt + 1; + if cnt + == self + .ref_cnt + .compare_and_swap(cnt, new_cnt, Ordering::SeqCst) + { + break new_cnt == 0; + } + }; + if shutdown { + unsafe { + grpc_sys::grpc_completion_queue_shutdown(self.cq); + } + } + } +} + +impl Drop for CompletionQueueHandle { + fn drop(&mut self) { + unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) } + } +} + +pub struct CompletionQueueRef<'a> { + queue: &'a CompletionQueue, +} + +impl<'a> CompletionQueueRef<'a> { + pub fn as_ptr(&self) -> *mut grpc_completion_queue { + self.queue.handle.cq + } +} + +impl<'a> Drop for CompletionQueueRef<'a> { + fn drop(&mut self) { + self.queue.handle.unref(); + } +} + +/// `WorkQueue` stores the unfinished work of a completion queue. +/// +/// Every completion queue has a work queue, and every work queue belongs +/// to exact one completion queue. `WorkQueue` is a short path for future +/// notifications. When a future is ready to be polled, there are two way +/// to notify it. +/// 1. If it's in the same thread where the future is spawned, the future +/// will be pushed into `WorkQueue` and be polled when current call tag +/// is handled; +/// 2. If not, the future will be wrapped as a call tag and pushed into +/// completion queue and finally popped at the call to `grpc_completion_queue_next`. +pub struct WorkQueue { + id: ThreadId, + pending_work: UnsafeCell<VecDeque<UnfinishedWork>>, +} + +unsafe impl Sync for WorkQueue {} +unsafe impl Send for WorkQueue {} + +const QUEUE_CAPACITY: usize = 4096; + +impl WorkQueue { + pub fn new() -> WorkQueue { + WorkQueue { + id: std::thread::current().id(), + pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)), + } + } + + /// Pushes an unfinished work into the inner queue. + /// + /// If the method is not called from the same thread where it's created, + /// the work will returned and no work is pushed. + pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> { + if self.id == thread::current().id() { + unsafe { &mut *self.pending_work.get() }.push_back(work); + None + } else { + Some(work) + } + } + + /// Pops one unfinished work. + /// + /// It should only be called from the same thread where the queue is created. + /// Otherwise it leads to undefined behavior. + pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> { + let queue = &mut *self.pending_work.get(); + if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 { + queue.shrink_to_fit(); + } + { &mut *self.pending_work.get() }.pop_back() + } +} + +#[derive(Clone)] +pub struct CompletionQueue { + handle: Arc<CompletionQueueHandle>, + pub(crate) worker: Arc<WorkQueue>, +} + +impl CompletionQueue { + pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue { + CompletionQueue { handle, worker } + } + + /// Blocks until an event is available, the completion queue is being shut down. + pub fn next(&self) -> Event { + unsafe { + let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME); + grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut()) + } + } + + pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> { + self.handle.add_ref()?; + Ok(CompletionQueueRef { queue: self }) + } + + /// Begin destruction of a completion queue. + /// + /// Once all possible events are drained then `next()` will start to produce + /// `Event::QueueShutdown` events only. + pub fn shutdown(&self) { + self.handle.shutdown() + } + + pub fn worker_id(&self) -> ThreadId { + self.worker.id + } +} |