aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2020-11-23 14:42:57 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2020-11-23 14:42:57 +0000
commitfc7118641dd538ab56486911dfc392958c4ad18a (patch)
tree71b66a4c98a5ac5b2d407b3273ceae8adaf1744e /src
parent29da4258ca4d24020fac090035c5ca029c4bda5a (diff)
parentee7a229247011355da67df293220acc9175448e2 (diff)
downloadgrpcio-fc7118641dd538ab56486911dfc392958c4ad18a.tar.gz
Upgrade rust/crates/grpcio to 0.7.0 am: ee7a229247
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/grpcio/+/1485656 Change-Id: Idde1ab05a4d6bbf3c40ab114ab8cd55d07f4cc70
Diffstat (limited to 'src')
-rw-r--r--src/buf.rs43
-rw-r--r--src/call/client.rs27
-rw-r--r--src/call/mod.rs124
-rw-r--r--src/call/server.rs39
-rw-r--r--src/channel.rs44
-rw-r--r--src/codec.rs29
-rw-r--r--src/env.rs30
-rw-r--r--src/lib.rs1
-rw-r--r--src/metadata.rs32
-rw-r--r--src/security/credentials.rs2
-rw-r--r--src/server.rs16
11 files changed, 317 insertions, 70 deletions
diff --git a/src/buf.rs b/src/buf.rs
index 7731fd5..d51f274 100644
--- a/src/buf.rs
+++ b/src/buf.rs
@@ -7,6 +7,11 @@ use std::fmt::{self, Debug, Formatter};
use std::io::{self, BufRead, Read};
use std::mem::{self, ManuallyDrop, MaybeUninit};
+/// Copied from grpc-sys/grpc/include/grpc/impl/codegen/slice.h. Unfortunately bindgen doesn't
+/// generate it automatically.
+const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut u8>() - 1
+ + mem::size_of::<*mut libc::c_void>();
+
/// A convenient rust wrapper for the type `grpc_slice`.
///
/// It's expected that the slice should be initialized.
@@ -58,6 +63,41 @@ impl GrpcSlice {
pub fn from_static_str(s: &'static str) -> GrpcSlice {
GrpcSlice::from_static_slice(s.as_bytes())
}
+
+ /// Checks whether the slice stores bytes inline.
+ pub fn is_inline(&self) -> bool {
+ self.0.refcount.is_null()
+ }
+
+ /// Reallocates current slice with given capacity.
+ ///
+ /// The length of returned slice is the exact same as given cap.
+ ///
+ /// ## Safety
+ ///
+ /// Caller is expected to initialize all available bytes to guarantee safety of this slice.
+ pub unsafe fn realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>] {
+ if cap <= INLINED_SIZE {
+ // Only inlined slice can be reused safely.
+ if !self.0.refcount.is_null() {
+ *self = GrpcSlice::default();
+ }
+ self.0.data.inlined.length = cap as u8;
+ std::slice::from_raw_parts_mut(
+ self.0.data.inlined.bytes.as_mut_ptr() as *mut MaybeUninit<u8>,
+ cap,
+ )
+ } else {
+ *self = GrpcSlice(grpcio_sys::grpc_slice_malloc_large(cap));
+ let start = self.0.data.refcounted.bytes;
+ let len = self.0.data.refcounted.length;
+ std::slice::from_raw_parts_mut(start as *mut MaybeUninit<u8>, len)
+ }
+ }
+
+ pub fn as_mut_ptr(&mut self) -> *mut grpc_slice {
+ &mut self.0
+ }
}
impl Clone for GrpcSlice {
@@ -91,6 +131,9 @@ impl Drop for GrpcSlice {
}
}
+unsafe impl Send for GrpcSlice {}
+unsafe impl Sync for GrpcSlice {}
+
impl PartialEq<[u8]> for GrpcSlice {
fn eq(&self, r: &[u8]) -> bool {
// Technically, the equal function inside vtable should be used.
diff --git a/src/call/client.rs b/src/call/client.rs
index eac0db4..279e619 100644
--- a/src/call/client.rs
+++ b/src/call/client.rs
@@ -14,6 +14,7 @@ use parking_lot::Mutex;
use std::future::Future;
use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags};
+use crate::buf::GrpcSlice;
use crate::call::{check_run, Call, MessageReader, Method};
use crate::channel::Channel;
use crate::codec::{DeserializeFn, SerializeFn};
@@ -108,14 +109,13 @@ impl Call {
mut opt: CallOption,
) -> Result<ClientUnaryReceiver<Resp>> {
let call = channel.create_call(method, &opt)?;
- let mut payload = vec![];
+ let mut payload = GrpcSlice::default();
(method.req_ser())(req, &mut payload);
let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
grpc_sys::grpcwrap_call_start_unary(
call.call,
ctx,
- payload.as_ptr() as *const _,
- payload.len(),
+ payload.as_mut_ptr(),
opt.write_flags.flags,
opt.headers
.as_mut()
@@ -162,14 +162,13 @@ impl Call {
mut opt: CallOption,
) -> Result<ClientSStreamReceiver<Resp>> {
let call = channel.create_call(method, &opt)?;
- let mut payload = vec![];
+ let mut payload = GrpcSlice::default();
(method.req_ser())(req, &mut payload);
let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
grpc_sys::grpcwrap_call_start_server_streaming(
call.call,
ctx,
- payload.as_ptr() as _,
- payload.len(),
+ payload.as_mut_ptr(),
opt.write_flags.flags,
opt.headers
.as_mut()
@@ -331,6 +330,19 @@ impl<Req> StreamingCallSink<Req> {
}
}
+ /// By default it always sends messages with their configured buffer hint. But when the
+ /// `enhance_batch` is enabled, messages will be batched together as many as possible.
+ /// The rules are listed as below:
+ /// - All messages except the last one will be sent with `buffer_hint` set to true.
+ /// - The last message will also be sent with `buffer_hint` set to true unless any message is
+ /// offered with buffer hint set to false.
+ ///
+ /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
+ /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
+ pub fn enhance_batch(&mut self, flag: bool) {
+ self.sink_base.enhance_buffer_strategy = flag;
+ }
+
pub fn cancel(&mut self) {
let call = self.call.lock();
call.call.cancel()
@@ -373,7 +385,8 @@ impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> {
let mut call = self.call.lock();
call.check_alive()?;
}
- Pin::new(&mut self.sink_base).poll_ready(cx)
+ let t = &mut *self;
+ Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
diff --git a/src/call/mod.rs b/src/call/mod.rs
index 03e520a..a06e003 100644
--- a/src/call/mod.rs
+++ b/src/call/mod.rs
@@ -16,15 +16,12 @@ use futures::task::{Context, Poll};
use libc::c_void;
use parking_lot::Mutex;
-use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader};
+use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice};
use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
use crate::error::{Error, Result};
use crate::grpc_sys::grpc_status_code::*;
use crate::task::{self, BatchFuture, BatchType, CallTag};
-// By default buffers in `SinkBase` will be shrink to 4K size.
-const BUF_SHRINK_SIZE: usize = 4 * 1024;
-
/// An gRPC status code structure.
/// This type contains constants for all gRPC status codes.
#[derive(PartialEq, Eq, Clone, Copy)]
@@ -296,7 +293,7 @@ impl Call {
/// Send a message asynchronously.
pub fn start_send_message(
&mut self,
- msg: &[u8],
+ msg: &mut GrpcSlice,
write_flags: u32,
initial_meta: bool,
) -> Result<BatchFuture> {
@@ -306,8 +303,7 @@ impl Call {
grpc_sys::grpcwrap_call_send_message(
self.call,
ctx,
- msg.as_ptr() as _,
- msg.len(),
+ msg.as_mut_ptr(),
write_flags,
i,
tag,
@@ -350,20 +346,21 @@ impl Call {
&mut self,
status: &RpcStatus,
send_empty_metadata: bool,
- payload: &Option<Vec<u8>>,
+ payload: &mut Option<GrpcSlice>,
write_flags: u32,
) -> Result<BatchFuture> {
let _cq_ref = self.cq.borrow()?;
let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
- let (payload_ptr, payload_len) = payload
- .as_ref()
- .map_or((ptr::null(), 0), |b| (b.as_ptr(), b.len()));
let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
let details_ptr = status
.details
.as_ref()
.map_or_else(ptr::null, |s| s.as_ptr() as _);
let details_len = status.details.as_ref().map_or(0, String::len);
+ let payload_p = match payload {
+ Some(p) => p.as_mut_ptr(),
+ None => ptr::null_mut(),
+ };
grpc_sys::grpcwrap_call_send_status_from_server(
self.call,
ctx,
@@ -372,8 +369,7 @@ impl Call {
details_len,
ptr::null_mut(),
send_empty_metadata,
- payload_ptr as _,
- payload_len,
+ payload_p,
write_flags,
tag,
)
@@ -407,8 +403,7 @@ impl Call {
details_len,
ptr::null_mut(),
1,
- ptr::null(),
- 0,
+ ptr::null_mut(),
0,
tag_ptr as *mut c_void,
)
@@ -628,17 +623,30 @@ impl WriteFlags {
/// A helper struct for constructing Sink object for batch requests.
struct SinkBase {
+ // Batch job to be executed in `poll_ready`.
batch_f: Option<BatchFuture>,
- buf: Vec<u8>,
send_metadata: bool,
+ // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch
+ // messages as much as possible.
+ enhance_buffer_strategy: bool,
+ // Buffer used to store the data to be sent, send out the last data in this round of `start_send`.
+ buffer: GrpcSlice,
+ // Write flags used to control the data to be sent in `buffer`.
+ buf_flags: Option<WriteFlags>,
+ // Used to records whether a message in which `buffer_hint` is false exists.
+ // Note: only used in enhanced buffer strategy.
+ last_buf_hint: bool,
}
impl SinkBase {
fn new(send_metadata: bool) -> SinkBase {
SinkBase {
batch_f: None,
- buf: Vec::new(),
+ buffer: GrpcSlice::default(),
+ buf_flags: None,
+ last_buf_hint: true,
send_metadata,
+ enhance_buffer_strategy: false,
}
}
@@ -646,29 +654,35 @@ impl SinkBase {
&mut self,
call: &mut C,
t: &T,
- mut flags: WriteFlags,
+ flags: WriteFlags,
ser: SerializeFn<T>,
) -> Result<()> {
- // `start_send` is supposed to be called after `poll_ready` returns ready.
- assert!(self.batch_f.is_none());
+ // temporary fix: buffer hint with send meta will not send out any metadata.
+ // note: only the first message can enter this code block.
+ if self.send_metadata {
+ ser(t, &mut self.buffer);
+ self.buf_flags = Some(flags);
+ self.start_send_buffer_message(false, call)?;
+ self.send_metadata = false;
+ return Ok(());
+ }
- self.buf.clear();
- ser(t, &mut self.buf);
- if flags.get_buffer_hint() && self.send_metadata {
- // temporary fix: buffer hint with send meta will not send out any metadata.
- flags = flags.buffer_hint(false);
+ // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
+ // that this is not the last message.
+ if self.buf_flags.is_some() {
+ self.start_send_buffer_message(true, call)?;
}
- let write_f = call.call(|c| {
- c.call
- .start_send_message(&self.buf, flags.flags, self.send_metadata)
- })?;
- // NOTE: Content of `self.buf` is copied into grpc internal.
- if self.buf.capacity() > BUF_SHRINK_SIZE {
- self.buf.truncate(BUF_SHRINK_SIZE);
- self.buf.shrink_to_fit();
+
+ ser(t, &mut self.buffer);
+ let hint = flags.get_buffer_hint();
+ self.last_buf_hint &= hint;
+ self.buf_flags = Some(flags);
+
+ // If sink disable batch, start sending the message in buffer immediately.
+ if !self.enhance_buffer_strategy {
+ self.start_send_buffer_message(hint, call)?;
}
- self.batch_f = Some(write_f);
- self.send_metadata = false;
+
Ok(())
}
@@ -683,4 +697,44 @@ impl SinkBase {
self.batch_f.take();
Poll::Ready(Ok(()))
}
+
+ #[inline]
+ fn poll_flush<C: ShareCallHolder>(
+ &mut self,
+ cx: &mut Context,
+ call: &mut C,
+ ) -> Poll<Result<()>> {
+ if self.batch_f.is_some() {
+ ready!(self.poll_ready(cx)?);
+ }
+ if self.buf_flags.is_some() {
+ self.start_send_buffer_message(self.last_buf_hint, call)?;
+ ready!(self.poll_ready(cx)?);
+ }
+ self.last_buf_hint = true;
+ Poll::Ready(Ok(()))
+ }
+
+ #[inline]
+ fn start_send_buffer_message<C: ShareCallHolder>(
+ &mut self,
+ buffer_hint: bool,
+ call: &mut C,
+ ) -> Result<()> {
+ // `start_send` is supposed to be called after `poll_ready` returns ready.
+ assert!(self.batch_f.is_none());
+
+ let mut flags = self.buf_flags.clone().unwrap();
+ flags = flags.buffer_hint(buffer_hint);
+ let write_f = call.call(|c| {
+ c.call
+ .start_send_message(&mut self.buffer, flags.flags, self.send_metadata)
+ })?;
+ self.batch_f = Some(write_f);
+ if !self.buffer.is_inline() {
+ self.buffer = GrpcSlice::default();
+ }
+ self.buf_flags.take();
+ Ok(())
+ }
}
diff --git a/src/call/server.rs b/src/call/server.rs
index 8875d6d..add9874 100644
--- a/src/call/server.rs
+++ b/src/call/server.rs
@@ -17,6 +17,7 @@ use parking_lot::Mutex;
use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
use crate::auth_context::AuthContext;
+use crate::buf::GrpcSlice;
use crate::call::{
BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
};
@@ -322,7 +323,7 @@ macro_rules! impl_unary_sink {
$t {
call: Some(call),
write_flags: 0,
- ser: ser,
+ ser,
}
}
@@ -335,8 +336,8 @@ macro_rules! impl_unary_sink {
}
fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
- let data = t.as_ref().map(|t| {
- let mut buf = vec![];
+ let mut data = t.as_ref().map(|t| {
+ let mut buf = GrpcSlice::default();
(self.ser)(t, &mut buf);
buf
});
@@ -344,7 +345,7 @@ macro_rules! impl_unary_sink {
let write_flags = self.write_flags;
let res = self.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(&status, true, &data, write_flags)
+ .start_send_status_from_server(&status, true, &mut data, write_flags)
});
let (cq_f, err) = match res {
@@ -354,8 +355,8 @@ macro_rules! impl_unary_sink {
$rt {
call: self.call.take().unwrap(),
- cq_f: cq_f,
- err: err,
+ cq_f,
+ err,
}
}
}
@@ -420,10 +421,23 @@ macro_rules! impl_stream_sink {
status: RpcStatus::ok(),
flushed: false,
closed: false,
- ser: ser,
+ ser,
}
}
+ /// By default it always sends messages with their configured buffer hint. But when the
+ /// `enhance_batch` is enabled, messages will be batched together as many as possible.
+ /// The rules are listed as below:
+ /// - All messages except the last one will be sent with `buffer_hint` set to true.
+ /// - The last message will also be sent with `buffer_hint` set to true unless any message is
+ /// offered with buffer hint set to false.
+ ///
+ /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
+ /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
+ pub fn enhance_batch(&mut self, flag: bool) {
+ self.base.enhance_buffer_strategy = flag;
+ }
+
pub fn set_status(&mut self, status: RpcStatus) {
assert!(self.flush_f.is_none());
self.status = status;
@@ -434,7 +448,7 @@ macro_rules! impl_stream_sink {
let send_metadata = self.base.send_metadata;
let res = self.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(&status, send_metadata, &None, 0)
+ .start_send_status_from_server(&status, send_metadata, &mut None, 0)
});
let (fail_f, err) = match res {
@@ -444,8 +458,8 @@ macro_rules! impl_stream_sink {
$ft {
call: self.call.take().unwrap(),
- fail_f: fail_f,
- err: err,
+ fail_f,
+ err,
}
}
}
@@ -487,7 +501,8 @@ macro_rules! impl_stream_sink {
if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
return Poll::Ready(Err(Error::RemoteStopped));
}
- Pin::new(&mut self.base).poll_ready(cx)
+ let t = &mut *self;
+ Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
@@ -499,7 +514,7 @@ macro_rules! impl_stream_sink {
let status = &t.status;
let flush_f = t.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(status, send_metadata, &None, 0)
+ .start_send_status_from_server(status, send_metadata, &mut None, 0)
})?;
t.flush_f = Some(flush_f);
}
diff --git a/src/channel.rs b/src/channel.rs
index 8fb7fc8..bdf95ce 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -28,7 +28,7 @@ pub use crate::grpc_sys::{
/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
fn format_user_agent_string(agent: &str) -> CString {
- let version = "0.6.0";
+ let version = "0.7.0";
let trimed_agent = agent.trim();
let val = if trimed_agent.is_empty() {
format!("grpc-rust/{}", version)
@@ -111,7 +111,7 @@ impl ChannelBuilder {
self
}
- /// Set maximum message length that the channel can receive. `usize::MAX` means unlimited.
+ /// Set maximum message length that the channel can receive. `-1` means unlimited.
pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
self.options.insert(
Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
@@ -391,7 +391,7 @@ impl ChannelBuilder {
}
/// Build `ChannelArgs` from the current configuration.
- #[allow(clippy::identity_conversion)]
+ #[allow(clippy::useless_conversion)]
pub fn build_args(&self) -> ChannelArgs {
let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
for (i, (k, v)) in self.options.iter().enumerate() {
@@ -442,6 +442,28 @@ impl ChannelBuilder {
let channel =
unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) };
+ unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
+ }
+
+ /// Build an insecure [`Channel`] taking over an established connection from
+ /// a file descriptor. The target string given is purely informative to
+ /// describe the endpoint of the connection. Takes ownership of the given
+ /// file descriptor and will close it when the connection is closed.
+ ///
+ /// This function is available on posix systems only.
+ ///
+ /// # Safety
+ ///
+ /// The file descriptor must correspond to a connected stream socket. After
+ /// this call, the socket must not be accessed (read / written / closed)
+ /// by other code.
+ #[cfg(unix)]
+ pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
+ let args = self.prepare_connect_args();
+ let target = CString::new(target).unwrap();
+ let target_ptr = target.as_ptr();
+ let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args);
+
Channel::new(self.env.pick_cq(), self.env, channel)
}
}
@@ -489,7 +511,7 @@ mod secure_channel {
)
};
- Channel::new(self.env.pick_cq(), self.env, channel)
+ unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
}
}
}
@@ -548,7 +570,19 @@ unsafe impl Send for Channel {}
unsafe impl Sync for Channel {}
impl Channel {
- fn new(cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel) -> Channel {
+ /// Create a new channel. Avoid using this directly and use
+ /// [`ChannelBuilder`] to build a [`Channel`] instead.
+ ///
+ /// # Safety
+ ///
+ /// The given grpc_channel must correspond to an instantiated grpc core
+ /// channel. Takes exclusive ownership of the channel and will close it after
+ /// use.
+ pub unsafe fn new(
+ cq: CompletionQueue,
+ env: Arc<Environment>,
+ channel: *mut grpc_channel,
+ ) -> Channel {
Channel {
inner: Arc::new(ChannelInner { _env: env, channel }),
cq,
diff --git a/src/codec.rs b/src/codec.rs
index 4a84489..e0214b7 100644
--- a/src/codec.rs
+++ b/src/codec.rs
@@ -1,10 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+use crate::buf::GrpcSlice;
use crate::call::MessageReader;
use crate::error::Result;
pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>;
-pub type SerializeFn<T> = fn(&T, &mut Vec<u8>);
+pub type SerializeFn<T> = fn(&T, &mut GrpcSlice);
/// Defines how to serialize and deserialize between the specialized type and byte slice.
pub struct Marshaller<T> {
@@ -26,14 +27,21 @@ pub struct Marshaller<T> {
#[cfg(feature = "protobuf-codec")]
pub mod pb_codec {
- use protobuf::{CodedInputStream, Message};
+ use protobuf::{CodedInputStream, CodedOutputStream, Message};
use super::MessageReader;
+ use crate::buf::GrpcSlice;
use crate::error::Result;
#[inline]
- pub fn ser<T: Message>(t: &T, buf: &mut Vec<u8>) {
- t.write_to_vec(buf).unwrap()
+ pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) {
+ let cap = t.compute_size();
+ unsafe {
+ let bytes = buf.realloc(cap as usize);
+ let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+ let mut s = CodedOutputStream::bytes(raw_bytes);
+ t.write_to_with_cached_sizes(&mut s).unwrap();
+ }
}
#[inline]
@@ -47,15 +55,22 @@ pub mod pb_codec {
#[cfg(feature = "prost-codec")]
pub mod pr_codec {
- use bytes::buf::BufMut;
use prost::Message;
use super::MessageReader;
+ use crate::buf::GrpcSlice;
use crate::error::Result;
#[inline]
- pub fn ser<M: Message, B: BufMut>(msg: &M, buf: &mut B) {
- msg.encode(buf).expect("Writing message to buffer failed");
+ pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) {
+ let size = msg.encoded_len();
+ unsafe {
+ let bytes = buf.realloc(size);
+ let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+ msg.encode(&mut b)
+ .expect("Writing message to buffer failed");
+ debug_assert!(b.is_empty());
+ }
}
#[inline]
diff --git a/src/env.rs b/src/env.rs
index 8bad45e..5c2e199 100644
--- a/src/env.rs
+++ b/src/env.rs
@@ -38,6 +38,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
pub struct EnvBuilder {
cq_count: usize,
name_prefix: Option<String>,
+ after_start: Option<Arc<dyn Fn() + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
}
impl EnvBuilder {
@@ -46,6 +48,8 @@ impl EnvBuilder {
EnvBuilder {
cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
name_prefix: None,
+ after_start: None,
+ before_stop: None,
}
}
@@ -67,6 +71,18 @@ impl EnvBuilder {
self
}
+ /// Execute function `f` after each thread is started but before it starts doing work.
+ pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute function `f` before each thread stops.
+ pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
/// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
pub fn build(self) -> Environment {
unsafe {
@@ -81,7 +97,19 @@ impl EnvBuilder {
if let Some(ref prefix) = self.name_prefix {
builder = builder.name(format!("{}-{}", prefix, i));
}
- let handle = builder.spawn(move || poll_queue(tx_i)).unwrap();
+ let after_start = self.after_start.clone();
+ let before_stop = self.before_stop.clone();
+ let handle = builder
+ .spawn(move || {
+ if let Some(f) = after_start {
+ f();
+ }
+ poll_queue(tx_i);
+ if let Some(f) = before_stop {
+ f();
+ }
+ })
+ .unwrap();
handles.push(handle);
}
for _ in 0..self.cq_count {
diff --git a/src/lib.rs b/src/lib.rs
index 321e1f2..2bac988 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -43,6 +43,7 @@ mod security;
mod server;
mod task;
+pub use crate::buf::GrpcSlice;
pub use crate::call::client::{
CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver,
ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, StreamingCallSink,
diff --git a/src/metadata.rs b/src/metadata.rs
index ef49bcb..746b593 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -1,7 +1,8 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
-use crate::grpc_sys::{self, grpc_metadata_array};
+use crate::grpc_sys::{self, grpc_metadata, grpc_metadata_array};
use std::borrow::Cow;
+use std::mem::ManuallyDrop;
use std::{mem, slice, str};
use crate::error::{Error, Result};
@@ -184,6 +185,35 @@ impl Metadata {
index: 0,
}
}
+
+ /// Decomposes a Metadata array into its raw components.
+ ///
+ /// Returns the raw pointer to the underlying data, the length of the vector (in elements),
+ /// and the allocated capacity of the data (in elements). These are the same arguments in
+ /// the same order as the arguments to from_raw_parts.
+ ///
+ /// After calling this function, the caller is responsible for the memory previously managed
+ /// by the Metadata. The only way to do this is to convert the raw pointer, length, and
+ /// capacity back into a Metadata with the from_raw_parts function, allowing the destructor
+ /// to perform the cleanup.
+ pub fn into_raw_parts(self) -> (*mut grpc_metadata, usize, usize) {
+ let s = ManuallyDrop::new(self);
+ (s.0.metadata, s.0.count, s.0.capacity)
+ }
+
+ /// Creates a Metadata directly from the raw components of another vector.
+ ///
+ /// ## Safety
+ ///
+ /// The operation is safe only if the three arguments are returned from `into_raw_parts`
+ /// and only convert once.
+ pub unsafe fn from_raw_parts(p: *mut grpc_metadata, len: usize, cap: usize) -> Metadata {
+ Metadata(grpc_metadata_array {
+ count: len,
+ capacity: cap,
+ metadata: p,
+ })
+ }
}
impl Clone for Metadata {
diff --git a/src/security/credentials.rs b/src/security/credentials.rs
index 8d835ee..7d73009 100644
--- a/src/security/credentials.rs
+++ b/src/security/credentials.rs
@@ -352,7 +352,7 @@ impl ChannelCredentials {
unsafe {
grpc_sys::grpc_init();
}
- let creds = unsafe { grpc_sys::grpc_google_default_credentials_create() };
+ let creds = unsafe { grpc_sys::grpc_google_default_credentials_create(ptr::null_mut()) };
if creds.is_null() {
Err(Error::GoogleAuthenticationFailed)
} else {
diff --git a/src/server.rs b/src/server.rs
index 3dd8bf3..8cb6a87 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -561,13 +561,27 @@ impl Server {
pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
self.core.binders.iter().map(|b| (&b.host, b.port))
}
+
+ /// Add an rpc channel for an established connection represented as a file
+ /// descriptor. Takes ownership of the file descriptor, closing it when
+ /// channel is closed.
+ ///
+ /// # Safety
+ ///
+ /// The file descriptor must correspond to a connected stream socket. After
+ /// this call, the socket must not be accessed (read / written / closed)
+ /// by other code.
+ #[cfg(unix)]
+ pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) {
+ grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd)
+ }
}
impl Drop for Server {
fn drop(&mut self) {
// if the server is not shutdown completely, destroy a server will core.
// TODO: don't wait here
- let f = if self.core.shutdown.load(Ordering::SeqCst) {
+ let f = if !self.core.shutdown.load(Ordering::SeqCst) {
Some(self.shutdown())
} else {
None