diff options
author | Joel Galenson <jgalenson@google.com> | 2021-06-21 12:42:49 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-06-21 12:42:49 -0700 |
commit | 8e8acbd61c224df38a367a9cdd30075966860bf9 (patch) | |
tree | 657ca28347ca6d074f482a3191123b68375bce20 /src | |
parent | 3adce0c03a69c88fc538ff74a98a8a74019819d5 (diff) | |
download | grpcio-8e8acbd61c224df38a367a9cdd30075966860bf9.tar.gz |
Upgrade rust/crates/grpcio to 0.9.0android-s-beta-5android-s-beta-4android-s-beta-3android-s-beta-5android-s-beta-4
Test: make
Change-Id: I05fd19472fbf556926a0145466f45235e089ab9f
Diffstat (limited to 'src')
-rw-r--r-- | src/call/mod.rs | 123 | ||||
-rw-r--r-- | src/call/server.rs | 12 | ||||
-rw-r--r-- | src/channel.rs | 2 | ||||
-rw-r--r-- | src/error.rs | 11 | ||||
-rw-r--r-- | src/lib.rs | 21 | ||||
-rw-r--r-- | src/metadata.rs | 19 | ||||
-rw-r--r-- | src/server.rs | 2 | ||||
-rw-r--r-- | src/task/promise.rs | 4 |
8 files changed, 147 insertions, 47 deletions
diff --git a/src/call/mod.rs b/src/call/mod.rs index 7f1582f..3674623 100644 --- a/src/call/mod.rs +++ b/src/call/mod.rs @@ -8,8 +8,8 @@ use std::pin::Pin; use std::sync::Arc; use std::{ptr, slice}; -use crate::cq::CompletionQueue; use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context}; +use crate::{cq::CompletionQueue, Metadata, MetadataBuilder}; use futures::future::Future; use futures::ready; use futures::task::{Context, Poll}; @@ -156,10 +156,15 @@ impl<Req, Resp> Method<Req, Resp> { #[derive(Debug, Clone)] pub struct RpcStatus { /// gRPC status code. `Ok` indicates success, all other values indicate an error. - pub status: RpcStatusCode, + code: RpcStatusCode, - /// Optional detail string. - pub details: Option<String>, + /// error message. + message: String, + + /// Additional details for rich error model. + /// + /// See also https://grpc.io/docs/guides/error/#richer-error-model. + details: Vec<u8>, } impl Display for RpcStatus { @@ -170,16 +175,54 @@ impl Display for RpcStatus { impl RpcStatus { /// Create a new [`RpcStatus`]. - pub fn new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus { + pub fn new<T: Into<RpcStatusCode>>(code: T) -> RpcStatus { + RpcStatus::with_message(code, String::new()) + } + + /// Create a new [`RpcStatus`] with given message. + pub fn with_message<T: Into<RpcStatusCode>>(code: T, message: String) -> RpcStatus { + RpcStatus::with_details(code, message, vec![]) + } + + /// Create a new [`RpcStats`] with code, message and details. + /// + /// If using rich error model, `details` should be binary message that sets `code` and + /// `message` to the same value. Or you can use `into` method to do automatical + /// transformation if using `grpcio_proto::google::rpc::Status`. + pub fn with_details<T: Into<RpcStatusCode>>( + code: T, + message: String, + details: Vec<u8>, + ) -> RpcStatus { RpcStatus { - status: code.into(), + code: code.into(), + message, details, } } /// Create a new [`RpcStatus`] that status code is Ok. pub fn ok() -> RpcStatus { - RpcStatus::new(RpcStatusCode::OK, None) + RpcStatus::new(RpcStatusCode::OK) + } + + /// Return the instance's error code. + #[inline] + pub fn code(&self) -> RpcStatusCode { + self.code + } + + /// Return the instance's error message. + #[inline] + pub fn message(&self) -> &str { + &self.message + } + + /// Return the (binary) error details. + /// + /// Usually it contains a serialized `google.rpc.Status` proto. + pub fn details(&self) -> &[u8] { + &self.details } } @@ -216,21 +259,26 @@ impl BatchContext { grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx) }); - let details = if status == RpcStatusCode::OK { - None + if status == RpcStatusCode::OK { + RpcStatus::ok() } else { unsafe { - let mut details_len = 0; + let mut msg_len = 0; let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details( self.ctx, - &mut details_len, + &mut msg_len, ); - let details_slice = slice::from_raw_parts(details_ptr as *const _, details_len); - Some(String::from_utf8_lossy(details_slice).into_owned()) + let msg_slice = slice::from_raw_parts(details_ptr as *const _, msg_len); + let message = String::from_utf8_lossy(msg_slice).into_owned(); + let m_ptr = + grpc_sys::grpcwrap_batch_context_recv_status_on_client_trailing_metadata( + self.ctx, + ); + let metadata = &*(m_ptr as *const Metadata); + let details = metadata.search_binary_error_details().to_vec(); + RpcStatus::with_details(status, message, details) } - }; - - RpcStatus::new(status, details) + } } /// Fetch the response bytes of the rpc call. @@ -352,22 +400,31 @@ impl Call { let _cq_ref = self.cq.borrow()?; let send_empty_metadata = if send_empty_metadata { 1 } else { 0 }; let f = check_run(BatchType::Finish, |ctx, tag| unsafe { - let details_ptr = status - .details - .as_ref() - .map_or_else(ptr::null, |s| s.as_ptr() as _); - let details_len = status.details.as_ref().map_or(0, String::len); + let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK { + (ptr::null(), 0) + } else { + (status.message.as_ptr(), status.message.len()) + }; let payload_p = match payload { Some(p) => p.as_mut_ptr(), None => ptr::null_mut(), }; + let mut trailing_metadata = if status.details.is_empty() { + None + } else { + let mut builder = MetadataBuilder::new(); + builder.set_binary_error_details(&status.details); + Some(builder.build()) + }; grpc_sys::grpcwrap_call_send_status_from_server( self.call, ctx, - status.status.into(), - details_ptr, - details_len, - ptr::null_mut(), + status.code().into(), + msg_ptr as _, + msg_len, + trailing_metadata + .as_mut() + .map_or_else(ptr::null_mut, |m| m as *mut _ as _), send_empty_metadata, payload_p, write_flags, @@ -390,17 +447,17 @@ impl Call { let (batch_ptr, tag_ptr) = box_batch_tag(tag); let code = unsafe { - let details_ptr = status - .details - .as_ref() - .map_or_else(ptr::null, |s| s.as_ptr() as _); - let details_len = status.details.as_ref().map_or(0, String::len); + let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK { + (ptr::null(), 0) + } else { + (status.message.as_ptr(), status.message.len()) + }; grpc_sys::grpcwrap_call_send_status_from_server( call_ptr, batch_ptr, - status.status.into(), - details_ptr, - details_len, + status.code().into(), + msg_ptr as _, + msg_len, ptr::null_mut(), 1, ptr::null_mut(), diff --git a/src/call/server.rs b/src/call/server.rs index 0fed656..58c63bf 100644 --- a/src/call/server.rs +++ b/src/call/server.rs @@ -252,7 +252,7 @@ impl UnaryRequestContext { return execute(self.request, cq, reader, handler, checker); } - let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned())); + let status = RpcStatus::with_message(RpcStatusCode::INTERNAL, "No payload".to_owned()); self.request.call(cq.clone()).abort(&status) } } @@ -703,9 +703,9 @@ pub fn execute_unary<P, Q, F>( let request = match de(payload) { Ok(f) => f, Err(e) => { - let status = RpcStatus::new( + let status = RpcStatus::with_message( RpcStatusCode::INTERNAL, - Some(format!("Failed to deserialize response message: {:?}", e)), + format!("Failed to deserialize response message: {:?}", e), ); call.abort(&status); return; @@ -749,9 +749,9 @@ pub fn execute_server_streaming<P, Q, F>( let request = match de(payload) { Ok(t) => t, Err(e) => { - let status = RpcStatus::new( + let status = RpcStatus::with_message( RpcStatusCode::INTERNAL, - Some(format!("Failed to deserialize response message: {:?}", e)), + format!("Failed to deserialize response message: {:?}", e), ); call.abort(&status); return; @@ -786,7 +786,7 @@ pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) { let ctx = ctx; let mut call = ctx.call(cq); accept_call!(call); - call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None)) + call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED)) } // Helper function to call handler. diff --git a/src/channel.rs b/src/channel.rs index c8c67b1..3b2e10c 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -31,7 +31,7 @@ pub use crate::grpc_sys::{ /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents fn format_user_agent_string(agent: &str) -> CString { - let version = "0.8.2"; + let version = "0.9.0"; let trimed_agent = agent.trim(); let val = if trimed_agent.is_empty() { format!("grpc-rust/{}", version) diff --git a/src/error.rs b/src/error.rs index f12ffa4..7c180a9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -38,10 +38,13 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Error::RpcFailure(RpcStatus { status, details }) => match details { - Some(details) => write!(fmt, "RpcFailure: {} {}", status, details), - None => write!(fmt, "RpcFailure: {}", status), - }, + Error::RpcFailure(s) => { + if s.message().is_empty() { + write!(fmt, "RpcFailure: {}", s.code()) + } else { + write!(fmt, "RpcFailure: {} {}", s.code(), s.message()) + } + } other_error => write!(fmt, "{:?}", other_error), } } @@ -80,3 +80,24 @@ pub use crate::security::{ pub use crate::server::{ CheckResult, Server, ServerBuilder, ServerChecker, Service, ServiceBuilder, ShutdownFuture, }; + +/// A shortcut for implementing a service method by returning `UNIMPLEMENTED` status code. +/// +/// Compiler will provide a default implementations for all methods to invoke this macro, so +/// you usually won't call it directly. If you really need to, just call it like: +/// ```ignored +/// fn method(&self, ctx: grpcio::RpcContext, req: Request, resp: UnarySink<Response>) { +/// unimplemented_call!(ctx, resp); +/// } +/// ``` +#[macro_export] +macro_rules! unimplemented_call { + ($ctx:ident, $sink:ident) => {{ + let f = async move { + let _ = $sink + .fail($crate::RpcStatus::new($crate::RpcStatusCode::UNIMPLEMENTED)) + .await; + }; + $ctx.spawn(f) + }}; +} diff --git a/src/metadata.rs b/src/metadata.rs index 893f6e2..caaebc8 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -7,6 +7,8 @@ use std::{mem, slice, str}; use crate::error::{Error, Result}; +const BINARY_ERROR_DETAILS_KEY: &str = "grpc-status-details-bin"; + fn normalize_key(key: &str, binary: bool) -> Result<Cow<'_, str>> { if key.is_empty() { return Err(Error::InvalidMetadata( @@ -107,6 +109,13 @@ impl MetadataBuilder { Ok(self.add_metadata(&key, value)) } + /// Set binary error details to support rich error model. + /// + /// See also https://grpc.io/docs/guides/error/#richer-error-model. + pub(crate) fn set_binary_error_details(&mut self, value: &[u8]) -> &mut MetadataBuilder { + self.add_metadata(BINARY_ERROR_DETAILS_KEY, value) + } + /// Create `Metadata` with configured entries. pub fn build(mut self) -> Metadata { unsafe { @@ -214,6 +223,16 @@ impl Metadata { metadata: p, }) } + + /// Search for binary error details. + pub(crate) fn search_binary_error_details(&self) -> &[u8] { + for (k, v) in self.iter() { + if k == BINARY_ERROR_DETAILS_KEY { + return v; + } + } + &[] + } } impl Clone for Metadata { diff --git a/src/server.rs b/src/server.rs index a612b13..c8e28df 100644 --- a/src/server.rs +++ b/src/server.rs @@ -68,7 +68,7 @@ where /// Given a host and port, creates a string of the form "host:port" or /// "[host]:port", depending on whether the host is an IPv6 literal. fn join_host_port(host: &str, port: u16) -> String { - if host.starts_with("unix:") { + if host.starts_with("unix:") | host.starts_with("unix-abstract:") { format!("{}\0", host) } else if let Ok(ip) = host.parse::<IpAddr>() { format!("{}\0", SocketAddr::new(ip, port)) diff --git a/src/task/promise.rs b/src/task/promise.rs index 50add06..2d826d4 100644 --- a/src/task/promise.rs +++ b/src/task/promise.rs @@ -57,7 +57,7 @@ impl Batch { let mut guard = self.inner.lock(); if succeed { let status = self.ctx.rpc_status(); - if status.status == RpcStatusCode::OK { + if status.code() == RpcStatusCode::OK { guard.set_result(Ok(None)) } else { guard.set_result(Err(Error::RpcFailure(status))) @@ -73,7 +73,7 @@ impl Batch { let task = { let mut guard = self.inner.lock(); let status = self.ctx.rpc_status(); - if status.status == RpcStatusCode::OK { + if status.code() == RpcStatusCode::OK { guard.set_result(Ok(self.ctx.recv_message())) } else { guard.set_result(Err(Error::RpcFailure(status))) |