aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-06-21 12:42:49 -0700
committerJoel Galenson <jgalenson@google.com>2021-06-21 12:42:49 -0700
commit8e8acbd61c224df38a367a9cdd30075966860bf9 (patch)
tree657ca28347ca6d074f482a3191123b68375bce20 /src
parent3adce0c03a69c88fc538ff74a98a8a74019819d5 (diff)
downloadgrpcio-8e8acbd61c224df38a367a9cdd30075966860bf9.tar.gz
Test: make Change-Id: I05fd19472fbf556926a0145466f45235e089ab9f
Diffstat (limited to 'src')
-rw-r--r--src/call/mod.rs123
-rw-r--r--src/call/server.rs12
-rw-r--r--src/channel.rs2
-rw-r--r--src/error.rs11
-rw-r--r--src/lib.rs21
-rw-r--r--src/metadata.rs19
-rw-r--r--src/server.rs2
-rw-r--r--src/task/promise.rs4
8 files changed, 147 insertions, 47 deletions
diff --git a/src/call/mod.rs b/src/call/mod.rs
index 7f1582f..3674623 100644
--- a/src/call/mod.rs
+++ b/src/call/mod.rs
@@ -8,8 +8,8 @@ use std::pin::Pin;
use std::sync::Arc;
use std::{ptr, slice};
-use crate::cq::CompletionQueue;
use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
+use crate::{cq::CompletionQueue, Metadata, MetadataBuilder};
use futures::future::Future;
use futures::ready;
use futures::task::{Context, Poll};
@@ -156,10 +156,15 @@ impl<Req, Resp> Method<Req, Resp> {
#[derive(Debug, Clone)]
pub struct RpcStatus {
/// gRPC status code. `Ok` indicates success, all other values indicate an error.
- pub status: RpcStatusCode,
+ code: RpcStatusCode,
- /// Optional detail string.
- pub details: Option<String>,
+ /// error message.
+ message: String,
+
+ /// Additional details for rich error model.
+ ///
+ /// See also https://grpc.io/docs/guides/error/#richer-error-model.
+ details: Vec<u8>,
}
impl Display for RpcStatus {
@@ -170,16 +175,54 @@ impl Display for RpcStatus {
impl RpcStatus {
/// Create a new [`RpcStatus`].
- pub fn new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus {
+ pub fn new<T: Into<RpcStatusCode>>(code: T) -> RpcStatus {
+ RpcStatus::with_message(code, String::new())
+ }
+
+ /// Create a new [`RpcStatus`] with given message.
+ pub fn with_message<T: Into<RpcStatusCode>>(code: T, message: String) -> RpcStatus {
+ RpcStatus::with_details(code, message, vec![])
+ }
+
+ /// Create a new [`RpcStats`] with code, message and details.
+ ///
+ /// If using rich error model, `details` should be binary message that sets `code` and
+ /// `message` to the same value. Or you can use `into` method to do automatical
+ /// transformation if using `grpcio_proto::google::rpc::Status`.
+ pub fn with_details<T: Into<RpcStatusCode>>(
+ code: T,
+ message: String,
+ details: Vec<u8>,
+ ) -> RpcStatus {
RpcStatus {
- status: code.into(),
+ code: code.into(),
+ message,
details,
}
}
/// Create a new [`RpcStatus`] that status code is Ok.
pub fn ok() -> RpcStatus {
- RpcStatus::new(RpcStatusCode::OK, None)
+ RpcStatus::new(RpcStatusCode::OK)
+ }
+
+ /// Return the instance's error code.
+ #[inline]
+ pub fn code(&self) -> RpcStatusCode {
+ self.code
+ }
+
+ /// Return the instance's error message.
+ #[inline]
+ pub fn message(&self) -> &str {
+ &self.message
+ }
+
+ /// Return the (binary) error details.
+ ///
+ /// Usually it contains a serialized `google.rpc.Status` proto.
+ pub fn details(&self) -> &[u8] {
+ &self.details
}
}
@@ -216,21 +259,26 @@ impl BatchContext {
grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
});
- let details = if status == RpcStatusCode::OK {
- None
+ if status == RpcStatusCode::OK {
+ RpcStatus::ok()
} else {
unsafe {
- let mut details_len = 0;
+ let mut msg_len = 0;
let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
self.ctx,
- &mut details_len,
+ &mut msg_len,
);
- let details_slice = slice::from_raw_parts(details_ptr as *const _, details_len);
- Some(String::from_utf8_lossy(details_slice).into_owned())
+ let msg_slice = slice::from_raw_parts(details_ptr as *const _, msg_len);
+ let message = String::from_utf8_lossy(msg_slice).into_owned();
+ let m_ptr =
+ grpc_sys::grpcwrap_batch_context_recv_status_on_client_trailing_metadata(
+ self.ctx,
+ );
+ let metadata = &*(m_ptr as *const Metadata);
+ let details = metadata.search_binary_error_details().to_vec();
+ RpcStatus::with_details(status, message, details)
}
- };
-
- RpcStatus::new(status, details)
+ }
}
/// Fetch the response bytes of the rpc call.
@@ -352,22 +400,31 @@ impl Call {
let _cq_ref = self.cq.borrow()?;
let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
- let details_ptr = status
- .details
- .as_ref()
- .map_or_else(ptr::null, |s| s.as_ptr() as _);
- let details_len = status.details.as_ref().map_or(0, String::len);
+ let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK {
+ (ptr::null(), 0)
+ } else {
+ (status.message.as_ptr(), status.message.len())
+ };
let payload_p = match payload {
Some(p) => p.as_mut_ptr(),
None => ptr::null_mut(),
};
+ let mut trailing_metadata = if status.details.is_empty() {
+ None
+ } else {
+ let mut builder = MetadataBuilder::new();
+ builder.set_binary_error_details(&status.details);
+ Some(builder.build())
+ };
grpc_sys::grpcwrap_call_send_status_from_server(
self.call,
ctx,
- status.status.into(),
- details_ptr,
- details_len,
- ptr::null_mut(),
+ status.code().into(),
+ msg_ptr as _,
+ msg_len,
+ trailing_metadata
+ .as_mut()
+ .map_or_else(ptr::null_mut, |m| m as *mut _ as _),
send_empty_metadata,
payload_p,
write_flags,
@@ -390,17 +447,17 @@ impl Call {
let (batch_ptr, tag_ptr) = box_batch_tag(tag);
let code = unsafe {
- let details_ptr = status
- .details
- .as_ref()
- .map_or_else(ptr::null, |s| s.as_ptr() as _);
- let details_len = status.details.as_ref().map_or(0, String::len);
+ let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK {
+ (ptr::null(), 0)
+ } else {
+ (status.message.as_ptr(), status.message.len())
+ };
grpc_sys::grpcwrap_call_send_status_from_server(
call_ptr,
batch_ptr,
- status.status.into(),
- details_ptr,
- details_len,
+ status.code().into(),
+ msg_ptr as _,
+ msg_len,
ptr::null_mut(),
1,
ptr::null_mut(),
diff --git a/src/call/server.rs b/src/call/server.rs
index 0fed656..58c63bf 100644
--- a/src/call/server.rs
+++ b/src/call/server.rs
@@ -252,7 +252,7 @@ impl UnaryRequestContext {
return execute(self.request, cq, reader, handler, checker);
}
- let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned()));
+ let status = RpcStatus::with_message(RpcStatusCode::INTERNAL, "No payload".to_owned());
self.request.call(cq.clone()).abort(&status)
}
}
@@ -703,9 +703,9 @@ pub fn execute_unary<P, Q, F>(
let request = match de(payload) {
Ok(f) => f,
Err(e) => {
- let status = RpcStatus::new(
+ let status = RpcStatus::with_message(
RpcStatusCode::INTERNAL,
- Some(format!("Failed to deserialize response message: {:?}", e)),
+ format!("Failed to deserialize response message: {:?}", e),
);
call.abort(&status);
return;
@@ -749,9 +749,9 @@ pub fn execute_server_streaming<P, Q, F>(
let request = match de(payload) {
Ok(t) => t,
Err(e) => {
- let status = RpcStatus::new(
+ let status = RpcStatus::with_message(
RpcStatusCode::INTERNAL,
- Some(format!("Failed to deserialize response message: {:?}", e)),
+ format!("Failed to deserialize response message: {:?}", e),
);
call.abort(&status);
return;
@@ -786,7 +786,7 @@ pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
let ctx = ctx;
let mut call = ctx.call(cq);
accept_call!(call);
- call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None))
+ call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED))
}
// Helper function to call handler.
diff --git a/src/channel.rs b/src/channel.rs
index c8c67b1..3b2e10c 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -31,7 +31,7 @@ pub use crate::grpc_sys::{
/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
fn format_user_agent_string(agent: &str) -> CString {
- let version = "0.8.2";
+ let version = "0.9.0";
let trimed_agent = agent.trim();
let val = if trimed_agent.is_empty() {
format!("grpc-rust/{}", version)
diff --git a/src/error.rs b/src/error.rs
index f12ffa4..7c180a9 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -38,10 +38,13 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
- Error::RpcFailure(RpcStatus { status, details }) => match details {
- Some(details) => write!(fmt, "RpcFailure: {} {}", status, details),
- None => write!(fmt, "RpcFailure: {}", status),
- },
+ Error::RpcFailure(s) => {
+ if s.message().is_empty() {
+ write!(fmt, "RpcFailure: {}", s.code())
+ } else {
+ write!(fmt, "RpcFailure: {} {}", s.code(), s.message())
+ }
+ }
other_error => write!(fmt, "{:?}", other_error),
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 2bb0c11..4727331 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -80,3 +80,24 @@ pub use crate::security::{
pub use crate::server::{
CheckResult, Server, ServerBuilder, ServerChecker, Service, ServiceBuilder, ShutdownFuture,
};
+
+/// A shortcut for implementing a service method by returning `UNIMPLEMENTED` status code.
+///
+/// Compiler will provide a default implementations for all methods to invoke this macro, so
+/// you usually won't call it directly. If you really need to, just call it like:
+/// ```ignored
+/// fn method(&self, ctx: grpcio::RpcContext, req: Request, resp: UnarySink<Response>) {
+/// unimplemented_call!(ctx, resp);
+/// }
+/// ```
+#[macro_export]
+macro_rules! unimplemented_call {
+ ($ctx:ident, $sink:ident) => {{
+ let f = async move {
+ let _ = $sink
+ .fail($crate::RpcStatus::new($crate::RpcStatusCode::UNIMPLEMENTED))
+ .await;
+ };
+ $ctx.spawn(f)
+ }};
+}
diff --git a/src/metadata.rs b/src/metadata.rs
index 893f6e2..caaebc8 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -7,6 +7,8 @@ use std::{mem, slice, str};
use crate::error::{Error, Result};
+const BINARY_ERROR_DETAILS_KEY: &str = "grpc-status-details-bin";
+
fn normalize_key(key: &str, binary: bool) -> Result<Cow<'_, str>> {
if key.is_empty() {
return Err(Error::InvalidMetadata(
@@ -107,6 +109,13 @@ impl MetadataBuilder {
Ok(self.add_metadata(&key, value))
}
+ /// Set binary error details to support rich error model.
+ ///
+ /// See also https://grpc.io/docs/guides/error/#richer-error-model.
+ pub(crate) fn set_binary_error_details(&mut self, value: &[u8]) -> &mut MetadataBuilder {
+ self.add_metadata(BINARY_ERROR_DETAILS_KEY, value)
+ }
+
/// Create `Metadata` with configured entries.
pub fn build(mut self) -> Metadata {
unsafe {
@@ -214,6 +223,16 @@ impl Metadata {
metadata: p,
})
}
+
+ /// Search for binary error details.
+ pub(crate) fn search_binary_error_details(&self) -> &[u8] {
+ for (k, v) in self.iter() {
+ if k == BINARY_ERROR_DETAILS_KEY {
+ return v;
+ }
+ }
+ &[]
+ }
}
impl Clone for Metadata {
diff --git a/src/server.rs b/src/server.rs
index a612b13..c8e28df 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -68,7 +68,7 @@ where
/// Given a host and port, creates a string of the form "host:port" or
/// "[host]:port", depending on whether the host is an IPv6 literal.
fn join_host_port(host: &str, port: u16) -> String {
- if host.starts_with("unix:") {
+ if host.starts_with("unix:") | host.starts_with("unix-abstract:") {
format!("{}\0", host)
} else if let Ok(ip) = host.parse::<IpAddr>() {
format!("{}\0", SocketAddr::new(ip, port))
diff --git a/src/task/promise.rs b/src/task/promise.rs
index 50add06..2d826d4 100644
--- a/src/task/promise.rs
+++ b/src/task/promise.rs
@@ -57,7 +57,7 @@ impl Batch {
let mut guard = self.inner.lock();
if succeed {
let status = self.ctx.rpc_status();
- if status.status == RpcStatusCode::OK {
+ if status.code() == RpcStatusCode::OK {
guard.set_result(Ok(None))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
@@ -73,7 +73,7 @@ impl Batch {
let task = {
let mut guard = self.inner.lock();
let status = self.ctx.rpc_status();
- if status.status == RpcStatusCode::OK {
+ if status.code() == RpcStatusCode::OK {
guard.set_result(Ok(self.ctx.recv_message()))
} else {
guard.set_result(Err(Error::RpcFailure(status)))