aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2020-11-23 15:24:33 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2020-11-23 15:24:33 +0000
commitdfd0d76aa3a057d800eff424b09709d94339b216 (patch)
tree71b66a4c98a5ac5b2d407b3273ceae8adaf1744e
parent94a500da3d781ba92187384edd52487977538c91 (diff)
parentfc7118641dd538ab56486911dfc392958c4ad18a (diff)
downloadgrpcio-dfd0d76aa3a057d800eff424b09709d94339b216.tar.gz
Upgrade rust/crates/grpcio to 0.7.0 am: ee7a229247 am: fc7118641d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/grpcio/+/1485656 Change-Id: I9c4916532bba1d203d8048bc18d3945716bae282
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--.github/workflows/ci.yml14
-rw-r--r--.travis.yml61
-rw-r--r--Android.bp15
-rw-r--r--CHANGELOG.md10
-rw-r--r--Cargo.toml6
-rw-r--r--Cargo.toml.orig9
-rw-r--r--METADATA8
-rwxr-xr-xscripts/reset-submodule.cmd5
-rw-r--r--src/buf.rs43
-rw-r--r--src/call/client.rs27
-rw-r--r--src/call/mod.rs124
-rw-r--r--src/call/server.rs39
-rw-r--r--src/channel.rs44
-rw-r--r--src/codec.rs29
-rw-r--r--src/env.rs30
-rw-r--r--src/lib.rs1
-rw-r--r--src/metadata.rs32
-rw-r--r--src/security/credentials.rs2
-rw-r--r--src/server.rs16
20 files changed, 370 insertions, 147 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 90abfbc..43ff69f 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "19a53b5cc9612511dabb365dae05286e807668e5"
+ "sha1": "4112a2249ebaa7a354cf59179979e3b4b08a077f"
}
}
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e1faa0b..1c42285 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -11,24 +11,21 @@ on:
env:
RUST_BACKTRACE: 1
RUSTFLAGS: "--deny=warnings"
- GRPC_VERSION: 1.29.1
+ TEST_BIND: 1
jobs:
- Linux-Format-PKG-Test:
- name: Linux-Format-PKG-Test
+ Linux-Format:
+ name: Linux-Format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: sudo apt-get install -y clang-tidy-9
- run: sudo update-alternatives --install /usr/bin/clang-tidy clang-tidy /usr/bin/clang-tidy-9 100
- - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version && which cmake && cmake --version
- run: scripts/reset-submodule.cmd
- run: cargo fmt --all -- --check
- run: cargo clippy --all -- -D clippy::all && cargo clippy --all --no-default-features --features prost-codec -- -D clippy::all
- run: scripts/lint-grpc-sys.sh && git diff-index --quiet HEAD
- - run: git clone -b v$GRPC_VERSION https://github.com/grpc/grpc
- - run: cd grpc && git submodule update --init && sudo make install_c
- - run: GRPCIO_SYS_USE_PKG_CONFIG=1 RUSTFLAGS="-A unused-attributes" cargo test --all
Linux-Stable:
name: Linux-Stable
@@ -37,7 +34,8 @@ jobs:
- uses: actions/checkout@v2
- run: which go && go version && which cargo && cargo version && clang --version && openssl version
- run: scripts/reset-submodule.cmd
- - run: scripts/generate-bindings.sh && git diff --exit-code HEAD
+ - run: env TEST_BIND=0 scripts/generate-bindings.sh && git diff --exit-code HEAD
+ - run: scripts/generate-bindings.sh
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features protobuf-codec
- run: cargo build --no-default-features --features prost-codec
diff --git a/.travis.yml b/.travis.yml
index 0974f0c..822e0f1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,6 @@
# Travis is only used to test ARM64 Linux
-dist: xenial
+dist: focal
sudo: false
language: rust
git:
@@ -11,15 +11,11 @@ rust: stable
env:
global:
- RUST_BACKTRACE=1
- - RUSTFLAGS="--deny=warnings"
-
-install:
- - if ! which go 2>/dev/null; then
- wget https://dl.google.com/go/go1.13.8.linux-arm64.tar.gz;
- tar -xf go1.13.8.linux-arm64.tar.gz -C $HOME;
- export PATH="$HOME/go/bin:$PATH";
- export GO_ROOT=$HOME/go;
- fi
+ # absl deadlock detection performs poorly on arm, so we build it release
+ # mode to skip the check. Enabling debug-assertions to get safer test
+ # results.
+ - RUSTFLAGS="--deny=warnings -C debug-assertions"
+ - TEST_BIND=1
addons:
apt:
@@ -33,51 +29,20 @@ jobs:
arch: arm64
before_script:
- scripts/reset-submodule.cmd
- - export GRPC_VERSION=1.29.1
- - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin"
- - GRPC_HEADER="$HOME/.cache/include/grpc/grpc.h"
- - if [[ ! -f "$GRPC_HEADER" ]] ; then
- (
- git clone -b v$GRPC_VERSION https://github.com/grpc/grpc &&
- cd grpc &&
- git submodule update --init &&
- env prefix=$HOME/.cache make install_c
- );
- fi
- - eval "$(gimme stable)"
- - export CPLUS_INCLUDE_PATH="$HOME/.cache/include"
- - export LD_LIBRARY_PATH="$HOME/.cache/lib"
- - export DYLD_LIBRARY_PATH="$HOME/.cache/lib"
- - export LIBRARY_PATH="$HOME/.cache/lib"
- - export PKG_CONFIG_PATH="$HOME/.cache/lib/pkgconfig"
- script:
- - which go && go version
- - GRPCIO_SYS_USE_PKG_CONFIG=1 RUSTFLAGS="-A unused-attributes" cargo test --all
-
- - os: linux
- arch: arm64
- before_script:
- - scripts/reset-submodule.cmd
- - export GRPC_VERSION=1.29.1
+ - export GRPC_VERSION=1.33.1
- export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin"
- - GRPC_HEADER="$HOME/.cache/include/grpc/grpc.h"
- - sudo apt-get update && sudo apt-get -y install libssl-dev;
+ - sudo apt-get update && sudo apt-get -y install libssl-dev
+ - which cmake && cmake --version && openssl version
- eval "$(gimme stable)"
- - export CPLUS_INCLUDE_PATH="$HOME/.cache/include"
- - export LD_LIBRARY_PATH="$HOME/.cache/lib"
- - export DYLD_LIBRARY_PATH="$HOME/.cache/lib"
- - export LIBRARY_PATH="$HOME/.cache/lib"
- - export PKG_CONFIG_PATH="$HOME/.cache/lib/pkgconfig"
script:
- - which go && go version
- if [[ $TRAVIS_OS_NAME == "linux" ]] && [[ $TRAVIS_RUST_VERSION == "stable" ]]; then
rustup component add rustfmt && cargo fmt --all -- --check;
- scripts/generate-bindings.sh && git diff --exit-code HEAD;
+ env TEST_BIND=0 scripts/generate-bindings.sh && git diff --exit-code HEAD;
fi
+ - ./scripts/generate-bindings.sh
- cargo build --no-default-features
- cargo build --no-default-features --features protobuf-codec
- cargo build --no-default-features --features prost-codec
- cargo build
- - cargo test --all
- - cargo test --features "openssl" --all
- - cargo test --features "openssl-vendored" --all \ No newline at end of file
+ - cargo test --release --all
+ - cargo test --release --features "openssl-vendored" --all
diff --git a/Android.bp b/Android.bp
index 6c7c996..eae450c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -21,12 +21,14 @@ rust_library {
}
// dependent_library ["feature_list"]
-// aho-corasick-0.7.14 "default,std"
+// aho-corasick-0.7.15 "default,std"
// bindgen-0.51.1
// bitflags-1.2.1 "default"
-// cc-1.0.61
+// boringssl-src-0.1.0
+// cc-1.0.62
// cexpr-0.3.6
// cfg-if-0.1.10
+// cfg-if-1.0.0
// clang-sys-0.28.1 "clang_6_0,gte_clang_3_6,gte_clang_3_7,gte_clang_3_8,gte_clang_3_9,gte_clang_4_0,gte_clang_5_0,gte_clang_6_0,libloading,runtime"
// cmake-0.1.44
// futures-0.3.7 "alloc,async-await,default,executor,futures-executor,std"
@@ -39,18 +41,19 @@ rust_library {
// futures-task-0.3.7 "alloc,once_cell,std"
// futures-util-0.3.7 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
// glob-0.3.0
-// grpcio-sys-0.6.0 "default"
+// grpcio-sys-0.7.1 "default"
+// instant-0.1.8
// lazy_static-1.4.0
// libc-0.2.80 "default,std"
// libloading-0.5.2
// libz-sys-1.1.2 "default,libc,static,stock-zlib"
-// lock_api-0.3.4
+// lock_api-0.4.1
// log-0.4.11
// memchr-2.3.4 "default,std,use_std"
// nom-4.2.3 "alloc,default,std,verbose-errors"
// once_cell-1.4.1 "std"
-// parking_lot-0.10.2 "default"
-// parking_lot_core-0.7.2
+// parking_lot-0.11.0 "default"
+// parking_lot_core-0.8.0
// peeking_take_while-0.1.2
// pin-project-1.0.1
// pin-project-internal-1.0.1
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d4c0b52..65a0e8c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,13 @@
+# 0.7.0 - 2020-11-02
+
+- Add blocking callback to `EnvBuilder` (#474)
+- Enhance sinks to make them batchable (#469)
+- Remove `rustfmt_skip` attribute since it is unstable (#479)
+- Use `grpc_slice` to reduce memory copy (#481)
+- Fix the bug that server cannot shutdown itself when drop (#484)
+- Add methods for channels from file descriptors (#488)
+- Update gRPC C core to 1.33.1 (#492)
+
# 0.6.0 - 2020-06-12
- Switch to std::future (#447)
diff --git a/Cargo.toml b/Cargo.toml
index 75bf8c1..822e052 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "grpcio"
-version = "0.6.0"
+version = "0.7.0"
authors = ["The TiKV Project Developers"]
autoexamples = false
description = "The rust language implementation of gRPC, base on the gRPC c core library."
@@ -36,7 +36,7 @@ optional = true
version = "0.3"
[dependencies.grpcio-sys]
-version = "0.6.0"
+version = "0.7"
[dependencies.libc]
version = "0.2"
@@ -45,7 +45,7 @@ version = "0.2"
version = "0.4"
[dependencies.parking_lot]
-version = "0.10"
+version = "0.11"
[dependencies.prost]
version = "0.6"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 7775f30..18457aa 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "grpcio"
-version = "0.6.0"
+version = "0.7.0"
edition = "2018"
authors = ["The TiKV Project Developers"]
license = "Apache-2.0"
@@ -17,14 +17,14 @@ autoexamples = false
all-features = true
[dependencies]
-grpcio-sys = { path = "grpc-sys", version = "0.6.0" }
+grpcio-sys = { path = "grpc-sys", version = "0.7" }
libc = "0.2"
futures = "0.3"
protobuf = { version = "2.0", optional = true }
prost = { version = "0.6", optional = true }
bytes = { version = "0.5", optional = true }
log = "0.4"
-parking_lot = "0.10"
+parking_lot = "0.11"
[workspace]
members = ["proto", "benchmark", "compiler", "interop", "tests-and-examples"]
@@ -45,5 +45,4 @@ debug = true
travis-ci = { repository = "tikv/grpc-rs" }
[patch.crates-io]
-grpcio-compiler = { path = "compiler", version = "0.6.0", default-features = false }
-protobuf-build = { git = "https://github.com/tikv/protobuf-build.git" }
+grpcio-compiler = { path = "compiler", version = "0.7.0", default-features = false }
diff --git a/METADATA b/METADATA
index a82c991..17985c5 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/grpcio/grpcio-0.6.0.crate"
+ value: "https://static.crates.io/crates/grpcio/grpcio-0.7.0.crate"
}
- version: "0.6.0"
+ version: "0.7.0"
license_type: NOTICE
last_upgrade_date {
year: 2020
- month: 10
- day: 14
+ month: 11
+ day: 8
}
}
diff --git a/scripts/reset-submodule.cmd b/scripts/reset-submodule.cmd
index f873a75..bac6a35 100755
--- a/scripts/reset-submodule.cmd
+++ b/scripts/reset-submodule.cmd
@@ -1,8 +1,9 @@
git submodule update --init grpc-sys/grpc
cd grpc-sys/grpc
-git submodule update --init third_party/boringssl-with-bazel
git submodule update --init third_party/cares/cares
git submodule update --init third_party/abseil-cpp
+git submodule update --init third_party/re2
+rm -rf third_party/boringssl-with-bazel/*
cd third_party/zlib
-git clean -f
+git clean -df
git reset --hard
diff --git a/src/buf.rs b/src/buf.rs
index 7731fd5..d51f274 100644
--- a/src/buf.rs
+++ b/src/buf.rs
@@ -7,6 +7,11 @@ use std::fmt::{self, Debug, Formatter};
use std::io::{self, BufRead, Read};
use std::mem::{self, ManuallyDrop, MaybeUninit};
+/// Copied from grpc-sys/grpc/include/grpc/impl/codegen/slice.h. Unfortunately bindgen doesn't
+/// generate it automatically.
+const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut u8>() - 1
+ + mem::size_of::<*mut libc::c_void>();
+
/// A convenient rust wrapper for the type `grpc_slice`.
///
/// It's expected that the slice should be initialized.
@@ -58,6 +63,41 @@ impl GrpcSlice {
pub fn from_static_str(s: &'static str) -> GrpcSlice {
GrpcSlice::from_static_slice(s.as_bytes())
}
+
+ /// Checks whether the slice stores bytes inline.
+ pub fn is_inline(&self) -> bool {
+ self.0.refcount.is_null()
+ }
+
+ /// Reallocates current slice with given capacity.
+ ///
+ /// The length of returned slice is the exact same as given cap.
+ ///
+ /// ## Safety
+ ///
+ /// Caller is expected to initialize all available bytes to guarantee safety of this slice.
+ pub unsafe fn realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>] {
+ if cap <= INLINED_SIZE {
+ // Only inlined slice can be reused safely.
+ if !self.0.refcount.is_null() {
+ *self = GrpcSlice::default();
+ }
+ self.0.data.inlined.length = cap as u8;
+ std::slice::from_raw_parts_mut(
+ self.0.data.inlined.bytes.as_mut_ptr() as *mut MaybeUninit<u8>,
+ cap,
+ )
+ } else {
+ *self = GrpcSlice(grpcio_sys::grpc_slice_malloc_large(cap));
+ let start = self.0.data.refcounted.bytes;
+ let len = self.0.data.refcounted.length;
+ std::slice::from_raw_parts_mut(start as *mut MaybeUninit<u8>, len)
+ }
+ }
+
+ pub fn as_mut_ptr(&mut self) -> *mut grpc_slice {
+ &mut self.0
+ }
}
impl Clone for GrpcSlice {
@@ -91,6 +131,9 @@ impl Drop for GrpcSlice {
}
}
+unsafe impl Send for GrpcSlice {}
+unsafe impl Sync for GrpcSlice {}
+
impl PartialEq<[u8]> for GrpcSlice {
fn eq(&self, r: &[u8]) -> bool {
// Technically, the equal function inside vtable should be used.
diff --git a/src/call/client.rs b/src/call/client.rs
index eac0db4..279e619 100644
--- a/src/call/client.rs
+++ b/src/call/client.rs
@@ -14,6 +14,7 @@ use parking_lot::Mutex;
use std::future::Future;
use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags};
+use crate::buf::GrpcSlice;
use crate::call::{check_run, Call, MessageReader, Method};
use crate::channel::Channel;
use crate::codec::{DeserializeFn, SerializeFn};
@@ -108,14 +109,13 @@ impl Call {
mut opt: CallOption,
) -> Result<ClientUnaryReceiver<Resp>> {
let call = channel.create_call(method, &opt)?;
- let mut payload = vec![];
+ let mut payload = GrpcSlice::default();
(method.req_ser())(req, &mut payload);
let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
grpc_sys::grpcwrap_call_start_unary(
call.call,
ctx,
- payload.as_ptr() as *const _,
- payload.len(),
+ payload.as_mut_ptr(),
opt.write_flags.flags,
opt.headers
.as_mut()
@@ -162,14 +162,13 @@ impl Call {
mut opt: CallOption,
) -> Result<ClientSStreamReceiver<Resp>> {
let call = channel.create_call(method, &opt)?;
- let mut payload = vec![];
+ let mut payload = GrpcSlice::default();
(method.req_ser())(req, &mut payload);
let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
grpc_sys::grpcwrap_call_start_server_streaming(
call.call,
ctx,
- payload.as_ptr() as _,
- payload.len(),
+ payload.as_mut_ptr(),
opt.write_flags.flags,
opt.headers
.as_mut()
@@ -331,6 +330,19 @@ impl<Req> StreamingCallSink<Req> {
}
}
+ /// By default it always sends messages with their configured buffer hint. But when the
+ /// `enhance_batch` is enabled, messages will be batched together as many as possible.
+ /// The rules are listed as below:
+ /// - All messages except the last one will be sent with `buffer_hint` set to true.
+ /// - The last message will also be sent with `buffer_hint` set to true unless any message is
+ /// offered with buffer hint set to false.
+ ///
+ /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
+ /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
+ pub fn enhance_batch(&mut self, flag: bool) {
+ self.sink_base.enhance_buffer_strategy = flag;
+ }
+
pub fn cancel(&mut self) {
let call = self.call.lock();
call.call.cancel()
@@ -373,7 +385,8 @@ impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> {
let mut call = self.call.lock();
call.check_alive()?;
}
- Pin::new(&mut self.sink_base).poll_ready(cx)
+ let t = &mut *self;
+ Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
diff --git a/src/call/mod.rs b/src/call/mod.rs
index 03e520a..a06e003 100644
--- a/src/call/mod.rs
+++ b/src/call/mod.rs
@@ -16,15 +16,12 @@ use futures::task::{Context, Poll};
use libc::c_void;
use parking_lot::Mutex;
-use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader};
+use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice};
use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
use crate::error::{Error, Result};
use crate::grpc_sys::grpc_status_code::*;
use crate::task::{self, BatchFuture, BatchType, CallTag};
-// By default buffers in `SinkBase` will be shrink to 4K size.
-const BUF_SHRINK_SIZE: usize = 4 * 1024;
-
/// An gRPC status code structure.
/// This type contains constants for all gRPC status codes.
#[derive(PartialEq, Eq, Clone, Copy)]
@@ -296,7 +293,7 @@ impl Call {
/// Send a message asynchronously.
pub fn start_send_message(
&mut self,
- msg: &[u8],
+ msg: &mut GrpcSlice,
write_flags: u32,
initial_meta: bool,
) -> Result<BatchFuture> {
@@ -306,8 +303,7 @@ impl Call {
grpc_sys::grpcwrap_call_send_message(
self.call,
ctx,
- msg.as_ptr() as _,
- msg.len(),
+ msg.as_mut_ptr(),
write_flags,
i,
tag,
@@ -350,20 +346,21 @@ impl Call {
&mut self,
status: &RpcStatus,
send_empty_metadata: bool,
- payload: &Option<Vec<u8>>,
+ payload: &mut Option<GrpcSlice>,
write_flags: u32,
) -> Result<BatchFuture> {
let _cq_ref = self.cq.borrow()?;
let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
- let (payload_ptr, payload_len) = payload
- .as_ref()
- .map_or((ptr::null(), 0), |b| (b.as_ptr(), b.len()));
let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
let details_ptr = status
.details
.as_ref()
.map_or_else(ptr::null, |s| s.as_ptr() as _);
let details_len = status.details.as_ref().map_or(0, String::len);
+ let payload_p = match payload {
+ Some(p) => p.as_mut_ptr(),
+ None => ptr::null_mut(),
+ };
grpc_sys::grpcwrap_call_send_status_from_server(
self.call,
ctx,
@@ -372,8 +369,7 @@ impl Call {
details_len,
ptr::null_mut(),
send_empty_metadata,
- payload_ptr as _,
- payload_len,
+ payload_p,
write_flags,
tag,
)
@@ -407,8 +403,7 @@ impl Call {
details_len,
ptr::null_mut(),
1,
- ptr::null(),
- 0,
+ ptr::null_mut(),
0,
tag_ptr as *mut c_void,
)
@@ -628,17 +623,30 @@ impl WriteFlags {
/// A helper struct for constructing Sink object for batch requests.
struct SinkBase {
+ // Batch job to be executed in `poll_ready`.
batch_f: Option<BatchFuture>,
- buf: Vec<u8>,
send_metadata: bool,
+ // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch
+ // messages as much as possible.
+ enhance_buffer_strategy: bool,
+ // Buffer used to store the data to be sent, send out the last data in this round of `start_send`.
+ buffer: GrpcSlice,
+ // Write flags used to control the data to be sent in `buffer`.
+ buf_flags: Option<WriteFlags>,
+ // Used to records whether a message in which `buffer_hint` is false exists.
+ // Note: only used in enhanced buffer strategy.
+ last_buf_hint: bool,
}
impl SinkBase {
fn new(send_metadata: bool) -> SinkBase {
SinkBase {
batch_f: None,
- buf: Vec::new(),
+ buffer: GrpcSlice::default(),
+ buf_flags: None,
+ last_buf_hint: true,
send_metadata,
+ enhance_buffer_strategy: false,
}
}
@@ -646,29 +654,35 @@ impl SinkBase {
&mut self,
call: &mut C,
t: &T,
- mut flags: WriteFlags,
+ flags: WriteFlags,
ser: SerializeFn<T>,
) -> Result<()> {
- // `start_send` is supposed to be called after `poll_ready` returns ready.
- assert!(self.batch_f.is_none());
+ // temporary fix: buffer hint with send meta will not send out any metadata.
+ // note: only the first message can enter this code block.
+ if self.send_metadata {
+ ser(t, &mut self.buffer);
+ self.buf_flags = Some(flags);
+ self.start_send_buffer_message(false, call)?;
+ self.send_metadata = false;
+ return Ok(());
+ }
- self.buf.clear();
- ser(t, &mut self.buf);
- if flags.get_buffer_hint() && self.send_metadata {
- // temporary fix: buffer hint with send meta will not send out any metadata.
- flags = flags.buffer_hint(false);
+ // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
+ // that this is not the last message.
+ if self.buf_flags.is_some() {
+ self.start_send_buffer_message(true, call)?;
}
- let write_f = call.call(|c| {
- c.call
- .start_send_message(&self.buf, flags.flags, self.send_metadata)
- })?;
- // NOTE: Content of `self.buf` is copied into grpc internal.
- if self.buf.capacity() > BUF_SHRINK_SIZE {
- self.buf.truncate(BUF_SHRINK_SIZE);
- self.buf.shrink_to_fit();
+
+ ser(t, &mut self.buffer);
+ let hint = flags.get_buffer_hint();
+ self.last_buf_hint &= hint;
+ self.buf_flags = Some(flags);
+
+ // If sink disable batch, start sending the message in buffer immediately.
+ if !self.enhance_buffer_strategy {
+ self.start_send_buffer_message(hint, call)?;
}
- self.batch_f = Some(write_f);
- self.send_metadata = false;
+
Ok(())
}
@@ -683,4 +697,44 @@ impl SinkBase {
self.batch_f.take();
Poll::Ready(Ok(()))
}
+
+ #[inline]
+ fn poll_flush<C: ShareCallHolder>(
+ &mut self,
+ cx: &mut Context,
+ call: &mut C,
+ ) -> Poll<Result<()>> {
+ if self.batch_f.is_some() {
+ ready!(self.poll_ready(cx)?);
+ }
+ if self.buf_flags.is_some() {
+ self.start_send_buffer_message(self.last_buf_hint, call)?;
+ ready!(self.poll_ready(cx)?);
+ }
+ self.last_buf_hint = true;
+ Poll::Ready(Ok(()))
+ }
+
+ #[inline]
+ fn start_send_buffer_message<C: ShareCallHolder>(
+ &mut self,
+ buffer_hint: bool,
+ call: &mut C,
+ ) -> Result<()> {
+ // `start_send` is supposed to be called after `poll_ready` returns ready.
+ assert!(self.batch_f.is_none());
+
+ let mut flags = self.buf_flags.clone().unwrap();
+ flags = flags.buffer_hint(buffer_hint);
+ let write_f = call.call(|c| {
+ c.call
+ .start_send_message(&mut self.buffer, flags.flags, self.send_metadata)
+ })?;
+ self.batch_f = Some(write_f);
+ if !self.buffer.is_inline() {
+ self.buffer = GrpcSlice::default();
+ }
+ self.buf_flags.take();
+ Ok(())
+ }
}
diff --git a/src/call/server.rs b/src/call/server.rs
index 8875d6d..add9874 100644
--- a/src/call/server.rs
+++ b/src/call/server.rs
@@ -17,6 +17,7 @@ use parking_lot::Mutex;
use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
use crate::auth_context::AuthContext;
+use crate::buf::GrpcSlice;
use crate::call::{
BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
};
@@ -322,7 +323,7 @@ macro_rules! impl_unary_sink {
$t {
call: Some(call),
write_flags: 0,
- ser: ser,
+ ser,
}
}
@@ -335,8 +336,8 @@ macro_rules! impl_unary_sink {
}
fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
- let data = t.as_ref().map(|t| {
- let mut buf = vec![];
+ let mut data = t.as_ref().map(|t| {
+ let mut buf = GrpcSlice::default();
(self.ser)(t, &mut buf);
buf
});
@@ -344,7 +345,7 @@ macro_rules! impl_unary_sink {
let write_flags = self.write_flags;
let res = self.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(&status, true, &data, write_flags)
+ .start_send_status_from_server(&status, true, &mut data, write_flags)
});
let (cq_f, err) = match res {
@@ -354,8 +355,8 @@ macro_rules! impl_unary_sink {
$rt {
call: self.call.take().unwrap(),
- cq_f: cq_f,
- err: err,
+ cq_f,
+ err,
}
}
}
@@ -420,10 +421,23 @@ macro_rules! impl_stream_sink {
status: RpcStatus::ok(),
flushed: false,
closed: false,
- ser: ser,
+ ser,
}
}
+ /// By default it always sends messages with their configured buffer hint. But when the
+ /// `enhance_batch` is enabled, messages will be batched together as many as possible.
+ /// The rules are listed as below:
+ /// - All messages except the last one will be sent with `buffer_hint` set to true.
+ /// - The last message will also be sent with `buffer_hint` set to true unless any message is
+ /// offered with buffer hint set to false.
+ ///
+ /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
+ /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
+ pub fn enhance_batch(&mut self, flag: bool) {
+ self.base.enhance_buffer_strategy = flag;
+ }
+
pub fn set_status(&mut self, status: RpcStatus) {
assert!(self.flush_f.is_none());
self.status = status;
@@ -434,7 +448,7 @@ macro_rules! impl_stream_sink {
let send_metadata = self.base.send_metadata;
let res = self.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(&status, send_metadata, &None, 0)
+ .start_send_status_from_server(&status, send_metadata, &mut None, 0)
});
let (fail_f, err) = match res {
@@ -444,8 +458,8 @@ macro_rules! impl_stream_sink {
$ft {
call: self.call.take().unwrap(),
- fail_f: fail_f,
- err: err,
+ fail_f,
+ err,
}
}
}
@@ -487,7 +501,8 @@ macro_rules! impl_stream_sink {
if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
return Poll::Ready(Err(Error::RemoteStopped));
}
- Pin::new(&mut self.base).poll_ready(cx)
+ let t = &mut *self;
+ Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
@@ -499,7 +514,7 @@ macro_rules! impl_stream_sink {
let status = &t.status;
let flush_f = t.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(status, send_metadata, &None, 0)
+ .start_send_status_from_server(status, send_metadata, &mut None, 0)
})?;
t.flush_f = Some(flush_f);
}
diff --git a/src/channel.rs b/src/channel.rs
index 8fb7fc8..bdf95ce 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -28,7 +28,7 @@ pub use crate::grpc_sys::{
/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
fn format_user_agent_string(agent: &str) -> CString {
- let version = "0.6.0";
+ let version = "0.7.0";
let trimed_agent = agent.trim();
let val = if trimed_agent.is_empty() {
format!("grpc-rust/{}", version)
@@ -111,7 +111,7 @@ impl ChannelBuilder {
self
}
- /// Set maximum message length that the channel can receive. `usize::MAX` means unlimited.
+ /// Set maximum message length that the channel can receive. `-1` means unlimited.
pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
self.options.insert(
Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
@@ -391,7 +391,7 @@ impl ChannelBuilder {
}
/// Build `ChannelArgs` from the current configuration.
- #[allow(clippy::identity_conversion)]
+ #[allow(clippy::useless_conversion)]
pub fn build_args(&self) -> ChannelArgs {
let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
for (i, (k, v)) in self.options.iter().enumerate() {
@@ -442,6 +442,28 @@ impl ChannelBuilder {
let channel =
unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) };
+ unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
+ }
+
+ /// Build an insecure [`Channel`] taking over an established connection from
+ /// a file descriptor. The target string given is purely informative to
+ /// describe the endpoint of the connection. Takes ownership of the given
+ /// file descriptor and will close it when the connection is closed.
+ ///
+ /// This function is available on posix systems only.
+ ///
+ /// # Safety
+ ///
+ /// The file descriptor must correspond to a connected stream socket. After
+ /// this call, the socket must not be accessed (read / written / closed)
+ /// by other code.
+ #[cfg(unix)]
+ pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
+ let args = self.prepare_connect_args();
+ let target = CString::new(target).unwrap();
+ let target_ptr = target.as_ptr();
+ let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args);
+
Channel::new(self.env.pick_cq(), self.env, channel)
}
}
@@ -489,7 +511,7 @@ mod secure_channel {
)
};
- Channel::new(self.env.pick_cq(), self.env, channel)
+ unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
}
}
}
@@ -548,7 +570,19 @@ unsafe impl Send for Channel {}
unsafe impl Sync for Channel {}
impl Channel {
- fn new(cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel) -> Channel {
+ /// Create a new channel. Avoid using this directly and use
+ /// [`ChannelBuilder`] to build a [`Channel`] instead.
+ ///
+ /// # Safety
+ ///
+ /// The given grpc_channel must correspond to an instantiated grpc core
+ /// channel. Takes exclusive ownership of the channel and will close it after
+ /// use.
+ pub unsafe fn new(
+ cq: CompletionQueue,
+ env: Arc<Environment>,
+ channel: *mut grpc_channel,
+ ) -> Channel {
Channel {
inner: Arc::new(ChannelInner { _env: env, channel }),
cq,
diff --git a/src/codec.rs b/src/codec.rs
index 4a84489..e0214b7 100644
--- a/src/codec.rs
+++ b/src/codec.rs
@@ -1,10 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+use crate::buf::GrpcSlice;
use crate::call::MessageReader;
use crate::error::Result;
pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>;
-pub type SerializeFn<T> = fn(&T, &mut Vec<u8>);
+pub type SerializeFn<T> = fn(&T, &mut GrpcSlice);
/// Defines how to serialize and deserialize between the specialized type and byte slice.
pub struct Marshaller<T> {
@@ -26,14 +27,21 @@ pub struct Marshaller<T> {
#[cfg(feature = "protobuf-codec")]
pub mod pb_codec {
- use protobuf::{CodedInputStream, Message};
+ use protobuf::{CodedInputStream, CodedOutputStream, Message};
use super::MessageReader;
+ use crate::buf::GrpcSlice;
use crate::error::Result;
#[inline]
- pub fn ser<T: Message>(t: &T, buf: &mut Vec<u8>) {
- t.write_to_vec(buf).unwrap()
+ pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) {
+ let cap = t.compute_size();
+ unsafe {
+ let bytes = buf.realloc(cap as usize);
+ let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+ let mut s = CodedOutputStream::bytes(raw_bytes);
+ t.write_to_with_cached_sizes(&mut s).unwrap();
+ }
}
#[inline]
@@ -47,15 +55,22 @@ pub mod pb_codec {
#[cfg(feature = "prost-codec")]
pub mod pr_codec {
- use bytes::buf::BufMut;
use prost::Message;
use super::MessageReader;
+ use crate::buf::GrpcSlice;
use crate::error::Result;
#[inline]
- pub fn ser<M: Message, B: BufMut>(msg: &M, buf: &mut B) {
- msg.encode(buf).expect("Writing message to buffer failed");
+ pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) {
+ let size = msg.encoded_len();
+ unsafe {
+ let bytes = buf.realloc(size);
+ let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+ msg.encode(&mut b)
+ .expect("Writing message to buffer failed");
+ debug_assert!(b.is_empty());
+ }
}
#[inline]
diff --git a/src/env.rs b/src/env.rs
index 8bad45e..5c2e199 100644
--- a/src/env.rs
+++ b/src/env.rs
@@ -38,6 +38,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
pub struct EnvBuilder {
cq_count: usize,
name_prefix: Option<String>,
+ after_start: Option<Arc<dyn Fn() + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
}
impl EnvBuilder {
@@ -46,6 +48,8 @@ impl EnvBuilder {
EnvBuilder {
cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
name_prefix: None,
+ after_start: None,
+ before_stop: None,
}
}
@@ -67,6 +71,18 @@ impl EnvBuilder {
self
}
+ /// Execute function `f` after each thread is started but before it starts doing work.
+ pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute function `f` before each thread stops.
+ pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
/// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
pub fn build(self) -> Environment {
unsafe {
@@ -81,7 +97,19 @@ impl EnvBuilder {
if let Some(ref prefix) = self.name_prefix {
builder = builder.name(format!("{}-{}", prefix, i));
}
- let handle = builder.spawn(move || poll_queue(tx_i)).unwrap();
+ let after_start = self.after_start.clone();
+ let before_stop = self.before_stop.clone();
+ let handle = builder
+ .spawn(move || {
+ if let Some(f) = after_start {
+ f();
+ }
+ poll_queue(tx_i);
+ if let Some(f) = before_stop {
+ f();
+ }
+ })
+ .unwrap();
handles.push(handle);
}
for _ in 0..self.cq_count {
diff --git a/src/lib.rs b/src/lib.rs
index 321e1f2..2bac988 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -43,6 +43,7 @@ mod security;
mod server;
mod task;
+pub use crate::buf::GrpcSlice;
pub use crate::call::client::{
CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver,
ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, StreamingCallSink,
diff --git a/src/metadata.rs b/src/metadata.rs
index ef49bcb..746b593 100644
--- a/src/metadata.rs
+++ b/src/metadata.rs
@@ -1,7 +1,8 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
-use crate::grpc_sys::{self, grpc_metadata_array};
+use crate::grpc_sys::{self, grpc_metadata, grpc_metadata_array};
use std::borrow::Cow;
+use std::mem::ManuallyDrop;
use std::{mem, slice, str};
use crate::error::{Error, Result};
@@ -184,6 +185,35 @@ impl Metadata {
index: 0,
}
}
+
+ /// Decomposes a Metadata array into its raw components.
+ ///
+ /// Returns the raw pointer to the underlying data, the length of the vector (in elements),
+ /// and the allocated capacity of the data (in elements). These are the same arguments in
+ /// the same order as the arguments to from_raw_parts.
+ ///
+ /// After calling this function, the caller is responsible for the memory previously managed
+ /// by the Metadata. The only way to do this is to convert the raw pointer, length, and
+ /// capacity back into a Metadata with the from_raw_parts function, allowing the destructor
+ /// to perform the cleanup.
+ pub fn into_raw_parts(self) -> (*mut grpc_metadata, usize, usize) {
+ let s = ManuallyDrop::new(self);
+ (s.0.metadata, s.0.count, s.0.capacity)
+ }
+
+ /// Creates a Metadata directly from the raw components of another vector.
+ ///
+ /// ## Safety
+ ///
+ /// The operation is safe only if the three arguments are returned from `into_raw_parts`
+ /// and only convert once.
+ pub unsafe fn from_raw_parts(p: *mut grpc_metadata, len: usize, cap: usize) -> Metadata {
+ Metadata(grpc_metadata_array {
+ count: len,
+ capacity: cap,
+ metadata: p,
+ })
+ }
}
impl Clone for Metadata {
diff --git a/src/security/credentials.rs b/src/security/credentials.rs
index 8d835ee..7d73009 100644
--- a/src/security/credentials.rs
+++ b/src/security/credentials.rs
@@ -352,7 +352,7 @@ impl ChannelCredentials {
unsafe {
grpc_sys::grpc_init();
}
- let creds = unsafe { grpc_sys::grpc_google_default_credentials_create() };
+ let creds = unsafe { grpc_sys::grpc_google_default_credentials_create(ptr::null_mut()) };
if creds.is_null() {
Err(Error::GoogleAuthenticationFailed)
} else {
diff --git a/src/server.rs b/src/server.rs
index 3dd8bf3..8cb6a87 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -561,13 +561,27 @@ impl Server {
pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
self.core.binders.iter().map(|b| (&b.host, b.port))
}
+
+ /// Add an rpc channel for an established connection represented as a file
+ /// descriptor. Takes ownership of the file descriptor, closing it when
+ /// channel is closed.
+ ///
+ /// # Safety
+ ///
+ /// The file descriptor must correspond to a connected stream socket. After
+ /// this call, the socket must not be accessed (read / written / closed)
+ /// by other code.
+ #[cfg(unix)]
+ pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) {
+ grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd)
+ }
}
impl Drop for Server {
fn drop(&mut self) {
// if the server is not shutdown completely, destroy a server will core.
// TODO: don't wait here
- let f = if self.core.shutdown.load(Ordering::SeqCst) {
+ let f = if !self.core.shutdown.load(Ordering::SeqCst) {
Some(self.shutdown())
} else {
None