diff options
author | Haibo Huang <hhb@google.com> | 2020-11-23 14:42:57 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-11-23 14:42:57 +0000 |
commit | fc7118641dd538ab56486911dfc392958c4ad18a (patch) | |
tree | 71b66a4c98a5ac5b2d407b3273ceae8adaf1744e /src | |
parent | 29da4258ca4d24020fac090035c5ca029c4bda5a (diff) | |
parent | ee7a229247011355da67df293220acc9175448e2 (diff) | |
download | grpcio-fc7118641dd538ab56486911dfc392958c4ad18a.tar.gz |
Upgrade rust/crates/grpcio to 0.7.0 am: ee7a229247
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/grpcio/+/1485656
Change-Id: Idde1ab05a4d6bbf3c40ab114ab8cd55d07f4cc70
Diffstat (limited to 'src')
-rw-r--r-- | src/buf.rs | 43 | ||||
-rw-r--r-- | src/call/client.rs | 27 | ||||
-rw-r--r-- | src/call/mod.rs | 124 | ||||
-rw-r--r-- | src/call/server.rs | 39 | ||||
-rw-r--r-- | src/channel.rs | 44 | ||||
-rw-r--r-- | src/codec.rs | 29 | ||||
-rw-r--r-- | src/env.rs | 30 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/metadata.rs | 32 | ||||
-rw-r--r-- | src/security/credentials.rs | 2 | ||||
-rw-r--r-- | src/server.rs | 16 |
11 files changed, 317 insertions, 70 deletions
@@ -7,6 +7,11 @@ use std::fmt::{self, Debug, Formatter}; use std::io::{self, BufRead, Read}; use std::mem::{self, ManuallyDrop, MaybeUninit}; +/// Copied from grpc-sys/grpc/include/grpc/impl/codegen/slice.h. Unfortunately bindgen doesn't +/// generate it automatically. +const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut u8>() - 1 + + mem::size_of::<*mut libc::c_void>(); + /// A convenient rust wrapper for the type `grpc_slice`. /// /// It's expected that the slice should be initialized. @@ -58,6 +63,41 @@ impl GrpcSlice { pub fn from_static_str(s: &'static str) -> GrpcSlice { GrpcSlice::from_static_slice(s.as_bytes()) } + + /// Checks whether the slice stores bytes inline. + pub fn is_inline(&self) -> bool { + self.0.refcount.is_null() + } + + /// Reallocates current slice with given capacity. + /// + /// The length of returned slice is the exact same as given cap. + /// + /// ## Safety + /// + /// Caller is expected to initialize all available bytes to guarantee safety of this slice. + pub unsafe fn realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>] { + if cap <= INLINED_SIZE { + // Only inlined slice can be reused safely. + if !self.0.refcount.is_null() { + *self = GrpcSlice::default(); + } + self.0.data.inlined.length = cap as u8; + std::slice::from_raw_parts_mut( + self.0.data.inlined.bytes.as_mut_ptr() as *mut MaybeUninit<u8>, + cap, + ) + } else { + *self = GrpcSlice(grpcio_sys::grpc_slice_malloc_large(cap)); + let start = self.0.data.refcounted.bytes; + let len = self.0.data.refcounted.length; + std::slice::from_raw_parts_mut(start as *mut MaybeUninit<u8>, len) + } + } + + pub fn as_mut_ptr(&mut self) -> *mut grpc_slice { + &mut self.0 + } } impl Clone for GrpcSlice { @@ -91,6 +131,9 @@ impl Drop for GrpcSlice { } } +unsafe impl Send for GrpcSlice {} +unsafe impl Sync for GrpcSlice {} + impl PartialEq<[u8]> for GrpcSlice { fn eq(&self, r: &[u8]) -> bool { // Technically, the equal function inside vtable should be used. diff --git a/src/call/client.rs b/src/call/client.rs index eac0db4..279e619 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -14,6 +14,7 @@ use parking_lot::Mutex; use std::future::Future; use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags}; +use crate::buf::GrpcSlice; use crate::call::{check_run, Call, MessageReader, Method}; use crate::channel::Channel; use crate::codec::{DeserializeFn, SerializeFn}; @@ -108,14 +109,13 @@ impl Call { mut opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>> { let call = channel.create_call(method, &opt)?; - let mut payload = vec![]; + let mut payload = GrpcSlice::default(); (method.req_ser())(req, &mut payload); let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_unary( call.call, ctx, - payload.as_ptr() as *const _, - payload.len(), + payload.as_mut_ptr(), opt.write_flags.flags, opt.headers .as_mut() @@ -162,14 +162,13 @@ impl Call { mut opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>> { let call = channel.create_call(method, &opt)?; - let mut payload = vec![]; + let mut payload = GrpcSlice::default(); (method.req_ser())(req, &mut payload); let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_server_streaming( call.call, ctx, - payload.as_ptr() as _, - payload.len(), + payload.as_mut_ptr(), opt.write_flags.flags, opt.headers .as_mut() @@ -331,6 +330,19 @@ impl<Req> StreamingCallSink<Req> { } } + /// By default it always sends messages with their configured buffer hint. But when the + /// `enhance_batch` is enabled, messages will be batched together as many as possible. + /// The rules are listed as below: + /// - All messages except the last one will be sent with `buffer_hint` set to true. + /// - The last message will also be sent with `buffer_hint` set to true unless any message is + /// offered with buffer hint set to false. + /// + /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of + /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core. + pub fn enhance_batch(&mut self, flag: bool) { + self.sink_base.enhance_buffer_strategy = flag; + } + pub fn cancel(&mut self) { let call = self.call.lock(); call.call.cancel() @@ -373,7 +385,8 @@ impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> { let mut call = self.call.lock(); call.check_alive()?; } - Pin::new(&mut self.sink_base).poll_ready(cx) + let t = &mut *self; + Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { diff --git a/src/call/mod.rs b/src/call/mod.rs index 03e520a..a06e003 100644 --- a/src/call/mod.rs +++ b/src/call/mod.rs @@ -16,15 +16,12 @@ use futures::task::{Context, Poll}; use libc::c_void; use parking_lot::Mutex; -use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader}; +use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice}; use crate::codec::{DeserializeFn, Marshaller, SerializeFn}; use crate::error::{Error, Result}; use crate::grpc_sys::grpc_status_code::*; use crate::task::{self, BatchFuture, BatchType, CallTag}; -// By default buffers in `SinkBase` will be shrink to 4K size. -const BUF_SHRINK_SIZE: usize = 4 * 1024; - /// An gRPC status code structure. /// This type contains constants for all gRPC status codes. #[derive(PartialEq, Eq, Clone, Copy)] @@ -296,7 +293,7 @@ impl Call { /// Send a message asynchronously. pub fn start_send_message( &mut self, - msg: &[u8], + msg: &mut GrpcSlice, write_flags: u32, initial_meta: bool, ) -> Result<BatchFuture> { @@ -306,8 +303,7 @@ impl Call { grpc_sys::grpcwrap_call_send_message( self.call, ctx, - msg.as_ptr() as _, - msg.len(), + msg.as_mut_ptr(), write_flags, i, tag, @@ -350,20 +346,21 @@ impl Call { &mut self, status: &RpcStatus, send_empty_metadata: bool, - payload: &Option<Vec<u8>>, + payload: &mut Option<GrpcSlice>, write_flags: u32, ) -> Result<BatchFuture> { let _cq_ref = self.cq.borrow()?; let send_empty_metadata = if send_empty_metadata { 1 } else { 0 }; - let (payload_ptr, payload_len) = payload - .as_ref() - .map_or((ptr::null(), 0), |b| (b.as_ptr(), b.len())); let f = check_run(BatchType::Finish, |ctx, tag| unsafe { let details_ptr = status .details .as_ref() .map_or_else(ptr::null, |s| s.as_ptr() as _); let details_len = status.details.as_ref().map_or(0, String::len); + let payload_p = match payload { + Some(p) => p.as_mut_ptr(), + None => ptr::null_mut(), + }; grpc_sys::grpcwrap_call_send_status_from_server( self.call, ctx, @@ -372,8 +369,7 @@ impl Call { details_len, ptr::null_mut(), send_empty_metadata, - payload_ptr as _, - payload_len, + payload_p, write_flags, tag, ) @@ -407,8 +403,7 @@ impl Call { details_len, ptr::null_mut(), 1, - ptr::null(), - 0, + ptr::null_mut(), 0, tag_ptr as *mut c_void, ) @@ -628,17 +623,30 @@ impl WriteFlags { /// A helper struct for constructing Sink object for batch requests. struct SinkBase { + // Batch job to be executed in `poll_ready`. batch_f: Option<BatchFuture>, - buf: Vec<u8>, send_metadata: bool, + // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch + // messages as much as possible. + enhance_buffer_strategy: bool, + // Buffer used to store the data to be sent, send out the last data in this round of `start_send`. + buffer: GrpcSlice, + // Write flags used to control the data to be sent in `buffer`. + buf_flags: Option<WriteFlags>, + // Used to records whether a message in which `buffer_hint` is false exists. + // Note: only used in enhanced buffer strategy. + last_buf_hint: bool, } impl SinkBase { fn new(send_metadata: bool) -> SinkBase { SinkBase { batch_f: None, - buf: Vec::new(), + buffer: GrpcSlice::default(), + buf_flags: None, + last_buf_hint: true, send_metadata, + enhance_buffer_strategy: false, } } @@ -646,29 +654,35 @@ impl SinkBase { &mut self, call: &mut C, t: &T, - mut flags: WriteFlags, + flags: WriteFlags, ser: SerializeFn<T>, ) -> Result<()> { - // `start_send` is supposed to be called after `poll_ready` returns ready. - assert!(self.batch_f.is_none()); + // temporary fix: buffer hint with send meta will not send out any metadata. + // note: only the first message can enter this code block. + if self.send_metadata { + ser(t, &mut self.buffer); + self.buf_flags = Some(flags); + self.start_send_buffer_message(false, call)?; + self.send_metadata = false; + return Ok(()); + } - self.buf.clear(); - ser(t, &mut self.buf); - if flags.get_buffer_hint() && self.send_metadata { - // temporary fix: buffer hint with send meta will not send out any metadata. - flags = flags.buffer_hint(false); + // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate + // that this is not the last message. + if self.buf_flags.is_some() { + self.start_send_buffer_message(true, call)?; } - let write_f = call.call(|c| { - c.call - .start_send_message(&self.buf, flags.flags, self.send_metadata) - })?; - // NOTE: Content of `self.buf` is copied into grpc internal. - if self.buf.capacity() > BUF_SHRINK_SIZE { - self.buf.truncate(BUF_SHRINK_SIZE); - self.buf.shrink_to_fit(); + + ser(t, &mut self.buffer); + let hint = flags.get_buffer_hint(); + self.last_buf_hint &= hint; + self.buf_flags = Some(flags); + + // If sink disable batch, start sending the message in buffer immediately. + if !self.enhance_buffer_strategy { + self.start_send_buffer_message(hint, call)?; } - self.batch_f = Some(write_f); - self.send_metadata = false; + Ok(()) } @@ -683,4 +697,44 @@ impl SinkBase { self.batch_f.take(); Poll::Ready(Ok(())) } + + #[inline] + fn poll_flush<C: ShareCallHolder>( + &mut self, + cx: &mut Context, + call: &mut C, + ) -> Poll<Result<()>> { + if self.batch_f.is_some() { + ready!(self.poll_ready(cx)?); + } + if self.buf_flags.is_some() { + self.start_send_buffer_message(self.last_buf_hint, call)?; + ready!(self.poll_ready(cx)?); + } + self.last_buf_hint = true; + Poll::Ready(Ok(())) + } + + #[inline] + fn start_send_buffer_message<C: ShareCallHolder>( + &mut self, + buffer_hint: bool, + call: &mut C, + ) -> Result<()> { + // `start_send` is supposed to be called after `poll_ready` returns ready. + assert!(self.batch_f.is_none()); + + let mut flags = self.buf_flags.clone().unwrap(); + flags = flags.buffer_hint(buffer_hint); + let write_f = call.call(|c| { + c.call + .start_send_message(&mut self.buffer, flags.flags, self.send_metadata) + })?; + self.batch_f = Some(write_f); + if !self.buffer.is_inline() { + self.buffer = GrpcSlice::default(); + } + self.buf_flags.take(); + Ok(()) + } } diff --git a/src/call/server.rs b/src/call/server.rs index 8875d6d..add9874 100644 --- a/src/call/server.rs +++ b/src/call/server.rs @@ -17,6 +17,7 @@ use parking_lot::Mutex; use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags}; use crate::auth_context::AuthContext; +use crate::buf::GrpcSlice; use crate::call::{ BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase, }; @@ -322,7 +323,7 @@ macro_rules! impl_unary_sink { $t { call: Some(call), write_flags: 0, - ser: ser, + ser, } } @@ -335,8 +336,8 @@ macro_rules! impl_unary_sink { } fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt { - let data = t.as_ref().map(|t| { - let mut buf = vec![]; + let mut data = t.as_ref().map(|t| { + let mut buf = GrpcSlice::default(); (self.ser)(t, &mut buf); buf }); @@ -344,7 +345,7 @@ macro_rules! impl_unary_sink { let write_flags = self.write_flags; let res = self.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(&status, true, &data, write_flags) + .start_send_status_from_server(&status, true, &mut data, write_flags) }); let (cq_f, err) = match res { @@ -354,8 +355,8 @@ macro_rules! impl_unary_sink { $rt { call: self.call.take().unwrap(), - cq_f: cq_f, - err: err, + cq_f, + err, } } } @@ -420,10 +421,23 @@ macro_rules! impl_stream_sink { status: RpcStatus::ok(), flushed: false, closed: false, - ser: ser, + ser, } } + /// By default it always sends messages with their configured buffer hint. But when the + /// `enhance_batch` is enabled, messages will be batched together as many as possible. + /// The rules are listed as below: + /// - All messages except the last one will be sent with `buffer_hint` set to true. + /// - The last message will also be sent with `buffer_hint` set to true unless any message is + /// offered with buffer hint set to false. + /// + /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of + /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core. + pub fn enhance_batch(&mut self, flag: bool) { + self.base.enhance_buffer_strategy = flag; + } + pub fn set_status(&mut self, status: RpcStatus) { assert!(self.flush_f.is_none()); self.status = status; @@ -434,7 +448,7 @@ macro_rules! impl_stream_sink { let send_metadata = self.base.send_metadata; let res = self.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(&status, send_metadata, &None, 0) + .start_send_status_from_server(&status, send_metadata, &mut None, 0) }); let (fail_f, err) = match res { @@ -444,8 +458,8 @@ macro_rules! impl_stream_sink { $ft { call: self.call.take().unwrap(), - fail_f: fail_f, - err: err, + fail_f, + err, } } } @@ -487,7 +501,8 @@ macro_rules! impl_stream_sink { if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { return Poll::Ready(Err(Error::RemoteStopped)); } - Pin::new(&mut self.base).poll_ready(cx) + let t = &mut *self; + Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap()) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { @@ -499,7 +514,7 @@ macro_rules! impl_stream_sink { let status = &t.status; let flush_f = t.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(status, send_metadata, &None, 0) + .start_send_status_from_server(status, send_metadata, &mut None, 0) })?; t.flush_f = Some(flush_f); } diff --git a/src/channel.rs b/src/channel.rs index 8fb7fc8..bdf95ce 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -28,7 +28,7 @@ pub use crate::grpc_sys::{ /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents fn format_user_agent_string(agent: &str) -> CString { - let version = "0.6.0"; + let version = "0.7.0"; let trimed_agent = agent.trim(); let val = if trimed_agent.is_empty() { format!("grpc-rust/{}", version) @@ -111,7 +111,7 @@ impl ChannelBuilder { self } - /// Set maximum message length that the channel can receive. `usize::MAX` means unlimited. + /// Set maximum message length that the channel can receive. `-1` means unlimited. pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder { self.options.insert( Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), @@ -391,7 +391,7 @@ impl ChannelBuilder { } /// Build `ChannelArgs` from the current configuration. - #[allow(clippy::identity_conversion)] + #[allow(clippy::useless_conversion)] pub fn build_args(&self) -> ChannelArgs { let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) }; for (i, (k, v)) in self.options.iter().enumerate() { @@ -442,6 +442,28 @@ impl ChannelBuilder { let channel = unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) }; + unsafe { Channel::new(self.env.pick_cq(), self.env, channel) } + } + + /// Build an insecure [`Channel`] taking over an established connection from + /// a file descriptor. The target string given is purely informative to + /// describe the endpoint of the connection. Takes ownership of the given + /// file descriptor and will close it when the connection is closed. + /// + /// This function is available on posix systems only. + /// + /// # Safety + /// + /// The file descriptor must correspond to a connected stream socket. After + /// this call, the socket must not be accessed (read / written / closed) + /// by other code. + #[cfg(unix)] + pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel { + let args = self.prepare_connect_args(); + let target = CString::new(target).unwrap(); + let target_ptr = target.as_ptr(); + let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args); + Channel::new(self.env.pick_cq(), self.env, channel) } } @@ -489,7 +511,7 @@ mod secure_channel { ) }; - Channel::new(self.env.pick_cq(), self.env, channel) + unsafe { Channel::new(self.env.pick_cq(), self.env, channel) } } } } @@ -548,7 +570,19 @@ unsafe impl Send for Channel {} unsafe impl Sync for Channel {} impl Channel { - fn new(cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel) -> Channel { + /// Create a new channel. Avoid using this directly and use + /// [`ChannelBuilder`] to build a [`Channel`] instead. + /// + /// # Safety + /// + /// The given grpc_channel must correspond to an instantiated grpc core + /// channel. Takes exclusive ownership of the channel and will close it after + /// use. + pub unsafe fn new( + cq: CompletionQueue, + env: Arc<Environment>, + channel: *mut grpc_channel, + ) -> Channel { Channel { inner: Arc::new(ChannelInner { _env: env, channel }), cq, diff --git a/src/codec.rs b/src/codec.rs index 4a84489..e0214b7 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -1,10 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::buf::GrpcSlice; use crate::call::MessageReader; use crate::error::Result; pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>; -pub type SerializeFn<T> = fn(&T, &mut Vec<u8>); +pub type SerializeFn<T> = fn(&T, &mut GrpcSlice); /// Defines how to serialize and deserialize between the specialized type and byte slice. pub struct Marshaller<T> { @@ -26,14 +27,21 @@ pub struct Marshaller<T> { #[cfg(feature = "protobuf-codec")] pub mod pb_codec { - use protobuf::{CodedInputStream, Message}; + use protobuf::{CodedInputStream, CodedOutputStream, Message}; use super::MessageReader; + use crate::buf::GrpcSlice; use crate::error::Result; #[inline] - pub fn ser<T: Message>(t: &T, buf: &mut Vec<u8>) { - t.write_to_vec(buf).unwrap() + pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) { + let cap = t.compute_size(); + unsafe { + let bytes = buf.realloc(cap as usize); + let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); + let mut s = CodedOutputStream::bytes(raw_bytes); + t.write_to_with_cached_sizes(&mut s).unwrap(); + } } #[inline] @@ -47,15 +55,22 @@ pub mod pb_codec { #[cfg(feature = "prost-codec")] pub mod pr_codec { - use bytes::buf::BufMut; use prost::Message; use super::MessageReader; + use crate::buf::GrpcSlice; use crate::error::Result; #[inline] - pub fn ser<M: Message, B: BufMut>(msg: &M, buf: &mut B) { - msg.encode(buf).expect("Writing message to buffer failed"); + pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) { + let size = msg.encoded_len(); + unsafe { + let bytes = buf.realloc(size); + let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); + msg.encode(&mut b) + .expect("Writing message to buffer failed"); + debug_assert!(b.is_empty()); + } } #[inline] @@ -38,6 +38,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) { pub struct EnvBuilder { cq_count: usize, name_prefix: Option<String>, + after_start: Option<Arc<dyn Fn() + Send + Sync>>, + before_stop: Option<Arc<dyn Fn() + Send + Sync>>, } impl EnvBuilder { @@ -46,6 +48,8 @@ impl EnvBuilder { EnvBuilder { cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize }, name_prefix: None, + after_start: None, + before_stop: None, } } @@ -67,6 +71,18 @@ impl EnvBuilder { self } + /// Execute function `f` after each thread is started but before it starts doing work. + pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder { + self.after_start = Some(Arc::new(f)); + self + } + + /// Execute function `f` before each thread stops. + pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder { + self.before_stop = Some(Arc::new(f)); + self + } + /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library. pub fn build(self) -> Environment { unsafe { @@ -81,7 +97,19 @@ impl EnvBuilder { if let Some(ref prefix) = self.name_prefix { builder = builder.name(format!("{}-{}", prefix, i)); } - let handle = builder.spawn(move || poll_queue(tx_i)).unwrap(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let handle = builder + .spawn(move || { + if let Some(f) = after_start { + f(); + } + poll_queue(tx_i); + if let Some(f) = before_stop { + f(); + } + }) + .unwrap(); handles.push(handle); } for _ in 0..self.cq_count { @@ -43,6 +43,7 @@ mod security; mod server; mod task; +pub use crate::buf::GrpcSlice; pub use crate::call::client::{ CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver, ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, StreamingCallSink, diff --git a/src/metadata.rs b/src/metadata.rs index ef49bcb..746b593 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -1,7 +1,8 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use crate::grpc_sys::{self, grpc_metadata_array}; +use crate::grpc_sys::{self, grpc_metadata, grpc_metadata_array}; use std::borrow::Cow; +use std::mem::ManuallyDrop; use std::{mem, slice, str}; use crate::error::{Error, Result}; @@ -184,6 +185,35 @@ impl Metadata { index: 0, } } + + /// Decomposes a Metadata array into its raw components. + /// + /// Returns the raw pointer to the underlying data, the length of the vector (in elements), + /// and the allocated capacity of the data (in elements). These are the same arguments in + /// the same order as the arguments to from_raw_parts. + /// + /// After calling this function, the caller is responsible for the memory previously managed + /// by the Metadata. The only way to do this is to convert the raw pointer, length, and + /// capacity back into a Metadata with the from_raw_parts function, allowing the destructor + /// to perform the cleanup. + pub fn into_raw_parts(self) -> (*mut grpc_metadata, usize, usize) { + let s = ManuallyDrop::new(self); + (s.0.metadata, s.0.count, s.0.capacity) + } + + /// Creates a Metadata directly from the raw components of another vector. + /// + /// ## Safety + /// + /// The operation is safe only if the three arguments are returned from `into_raw_parts` + /// and only convert once. + pub unsafe fn from_raw_parts(p: *mut grpc_metadata, len: usize, cap: usize) -> Metadata { + Metadata(grpc_metadata_array { + count: len, + capacity: cap, + metadata: p, + }) + } } impl Clone for Metadata { diff --git a/src/security/credentials.rs b/src/security/credentials.rs index 8d835ee..7d73009 100644 --- a/src/security/credentials.rs +++ b/src/security/credentials.rs @@ -352,7 +352,7 @@ impl ChannelCredentials { unsafe { grpc_sys::grpc_init(); } - let creds = unsafe { grpc_sys::grpc_google_default_credentials_create() }; + let creds = unsafe { grpc_sys::grpc_google_default_credentials_create(ptr::null_mut()) }; if creds.is_null() { Err(Error::GoogleAuthenticationFailed) } else { diff --git a/src/server.rs b/src/server.rs index 3dd8bf3..8cb6a87 100644 --- a/src/server.rs +++ b/src/server.rs @@ -561,13 +561,27 @@ impl Server { pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> { self.core.binders.iter().map(|b| (&b.host, b.port)) } + + /// Add an rpc channel for an established connection represented as a file + /// descriptor. Takes ownership of the file descriptor, closing it when + /// channel is closed. + /// + /// # Safety + /// + /// The file descriptor must correspond to a connected stream socket. After + /// this call, the socket must not be accessed (read / written / closed) + /// by other code. + #[cfg(unix)] + pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) { + grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd) + } } impl Drop for Server { fn drop(&mut self) { // if the server is not shutdown completely, destroy a server will core. // TODO: don't wait here - let f = if self.core.shutdown.load(Ordering::SeqCst) { + let f = if !self.core.shutdown.load(Ordering::SeqCst) { Some(self.shutdown()) } else { None |