diff options
Diffstat (limited to 'src/channel.rs')
-rw-r--r-- | src/channel.rs | 67 |
1 files changed, 62 insertions, 5 deletions
diff --git a/src/channel.rs b/src/channel.rs index a33a4be..c8c67b1 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -4,12 +4,14 @@ use std::borrow::Cow; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::ffi::{CStr, CString}; +use std::future::Future; use std::sync::Arc; use std::time::Duration; use std::{cmp, i32, ptr}; -use crate::grpc_sys::{ - self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args, +use crate::{ + grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args}, + Deadline, }; use libc::{self, c_char, c_int}; @@ -17,6 +19,7 @@ use crate::call::{Call, Method}; use crate::cq::CompletionQueue; use crate::env::Environment; use crate::error::Result; +use crate::task::CallTag; use crate::task::Kicker; use crate::CallOption; use crate::ResourceQuota; @@ -28,7 +31,7 @@ pub use crate::grpc_sys::{ /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents fn format_user_agent_string(agent: &str) -> CString { - let version = "0.7.1"; + let version = "0.8.2"; let trimed_agent = agent.trim(); let val = if trimed_agent.is_empty() { format!("grpc-rust/{}", version) @@ -589,12 +592,66 @@ impl Channel { } } - // If try_to_connect is true, the channel will try to establish a connection, potentially - // changing the state. + /// If try_to_connect is true, the channel will try to establish a connection, potentially + /// changing the state. pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState { self.inner.check_connectivity_state(try_to_connect) } + /// Blocking wait for channel state change or deadline expiration. + /// + /// `check_connectivity_state` needs to be called to get the current state. Returns false + /// means deadline excceeds before observing any state changes. + pub fn wait_for_state_change( + &self, + last_observed: ConnectivityState, + deadline: impl Into<Deadline>, + ) -> impl Future<Output = bool> { + let (cq_f, prom) = CallTag::action_pair(); + let prom_box = Box::new(prom); + let tag = Box::into_raw(prom_box); + let should_wait = if let Ok(cq_ref) = self.cq.borrow() { + unsafe { + grpcio_sys::grpc_channel_watch_connectivity_state( + self.inner.channel, + last_observed, + deadline.into().spec(), + cq_ref.as_ptr(), + tag as *mut _, + ) + } + true + } else { + // It's already shutdown. + false + }; + async move { should_wait && cq_f.await.unwrap() } + } + + /// Wait for this channel to be connected. + /// + /// Returns false means deadline excceeds before connection is connected. + pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool { + // Fast path, it's probably connected. + let mut state = self.check_connectivity_state(true); + if ConnectivityState::GRPC_CHANNEL_READY == state { + return true; + } + let deadline = deadline.into(); + loop { + if self.wait_for_state_change(state, deadline).await { + state = self.check_connectivity_state(true); + match state { + ConnectivityState::GRPC_CHANNEL_READY => return true, + ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false, + _ => (), + } + continue; + } + return false; + } + } + /// Create a Kicker. pub(crate) fn create_kicker(&self) -> Result<Kicker> { let cq_ref = self.cq.borrow()?; |