aboutsummaryrefslogtreecommitdiff
path: root/src/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel.rs')
-rw-r--r--src/channel.rs67
1 files changed, 62 insertions, 5 deletions
diff --git a/src/channel.rs b/src/channel.rs
index a33a4be..c8c67b1 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -4,12 +4,14 @@ use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
+use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp, i32, ptr};
-use crate::grpc_sys::{
- self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args,
+use crate::{
+ grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
+ Deadline,
};
use libc::{self, c_char, c_int};
@@ -17,6 +19,7 @@ use crate::call::{Call, Method};
use crate::cq::CompletionQueue;
use crate::env::Environment;
use crate::error::Result;
+use crate::task::CallTag;
use crate::task::Kicker;
use crate::CallOption;
use crate::ResourceQuota;
@@ -28,7 +31,7 @@ pub use crate::grpc_sys::{
/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
fn format_user_agent_string(agent: &str) -> CString {
- let version = "0.7.1";
+ let version = "0.8.2";
let trimed_agent = agent.trim();
let val = if trimed_agent.is_empty() {
format!("grpc-rust/{}", version)
@@ -589,12 +592,66 @@ impl Channel {
}
}
- // If try_to_connect is true, the channel will try to establish a connection, potentially
- // changing the state.
+ /// If try_to_connect is true, the channel will try to establish a connection, potentially
+ /// changing the state.
pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
self.inner.check_connectivity_state(try_to_connect)
}
+ /// Blocking wait for channel state change or deadline expiration.
+ ///
+ /// `check_connectivity_state` needs to be called to get the current state. Returns false
+ /// means deadline excceeds before observing any state changes.
+ pub fn wait_for_state_change(
+ &self,
+ last_observed: ConnectivityState,
+ deadline: impl Into<Deadline>,
+ ) -> impl Future<Output = bool> {
+ let (cq_f, prom) = CallTag::action_pair();
+ let prom_box = Box::new(prom);
+ let tag = Box::into_raw(prom_box);
+ let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
+ unsafe {
+ grpcio_sys::grpc_channel_watch_connectivity_state(
+ self.inner.channel,
+ last_observed,
+ deadline.into().spec(),
+ cq_ref.as_ptr(),
+ tag as *mut _,
+ )
+ }
+ true
+ } else {
+ // It's already shutdown.
+ false
+ };
+ async move { should_wait && cq_f.await.unwrap() }
+ }
+
+ /// Wait for this channel to be connected.
+ ///
+ /// Returns false means deadline excceeds before connection is connected.
+ pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
+ // Fast path, it's probably connected.
+ let mut state = self.check_connectivity_state(true);
+ if ConnectivityState::GRPC_CHANNEL_READY == state {
+ return true;
+ }
+ let deadline = deadline.into();
+ loop {
+ if self.wait_for_state_change(state, deadline).await {
+ state = self.check_connectivity_state(true);
+ match state {
+ ConnectivityState::GRPC_CHANNEL_READY => return true,
+ ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
+ _ => (),
+ }
+ continue;
+ }
+ return false;
+ }
+ }
+
/// Create a Kicker.
pub(crate) fn create_kicker(&self) -> Result<Kicker> {
let cq_ref = self.cq.borrow()?;