diff options
author | Haibo Huang <hhb@google.com> | 2020-11-23 15:24:33 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-11-23 15:24:33 +0000 |
commit | dfd0d76aa3a057d800eff424b09709d94339b216 (patch) | |
tree | 71b66a4c98a5ac5b2d407b3273ceae8adaf1744e | |
parent | 94a500da3d781ba92187384edd52487977538c91 (diff) | |
parent | fc7118641dd538ab56486911dfc392958c4ad18a (diff) | |
download | grpcio-dfd0d76aa3a057d800eff424b09709d94339b216.tar.gz |
Upgrade rust/crates/grpcio to 0.7.0 am: ee7a229247 am: fc7118641d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/grpcio/+/1485656
Change-Id: I9c4916532bba1d203d8048bc18d3945716bae282
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | .github/workflows/ci.yml | 14 | ||||
-rw-r--r-- | .travis.yml | 61 | ||||
-rw-r--r-- | Android.bp | 15 | ||||
-rw-r--r-- | CHANGELOG.md | 10 | ||||
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | Cargo.toml.orig | 9 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rwxr-xr-x | scripts/reset-submodule.cmd | 5 | ||||
-rw-r--r-- | src/buf.rs | 43 | ||||
-rw-r--r-- | src/call/client.rs | 27 | ||||
-rw-r--r-- | src/call/mod.rs | 124 | ||||
-rw-r--r-- | src/call/server.rs | 39 | ||||
-rw-r--r-- | src/channel.rs | 44 | ||||
-rw-r--r-- | src/codec.rs | 29 | ||||
-rw-r--r-- | src/env.rs | 30 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/metadata.rs | 32 | ||||
-rw-r--r-- | src/security/credentials.rs | 2 | ||||
-rw-r--r-- | src/server.rs | 16 |
20 files changed, 370 insertions, 147 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 90abfbc..43ff69f 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "19a53b5cc9612511dabb365dae05286e807668e5" + "sha1": "4112a2249ebaa7a354cf59179979e3b4b08a077f" } } diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e1faa0b..1c42285 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,24 +11,21 @@ on: env: RUST_BACKTRACE: 1 RUSTFLAGS: "--deny=warnings" - GRPC_VERSION: 1.29.1 + TEST_BIND: 1 jobs: - Linux-Format-PKG-Test: - name: Linux-Format-PKG-Test + Linux-Format: + name: Linux-Format runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - run: sudo apt-get install -y clang-tidy-9 - run: sudo update-alternatives --install /usr/bin/clang-tidy clang-tidy /usr/bin/clang-tidy-9 100 - - run: which go && go version && which cargo && cargo version && clang --version && openssl version + - run: which go && go version && which cargo && cargo version && clang --version && openssl version && which cmake && cmake --version - run: scripts/reset-submodule.cmd - run: cargo fmt --all -- --check - run: cargo clippy --all -- -D clippy::all && cargo clippy --all --no-default-features --features prost-codec -- -D clippy::all - run: scripts/lint-grpc-sys.sh && git diff-index --quiet HEAD - - run: git clone -b v$GRPC_VERSION https://github.com/grpc/grpc - - run: cd grpc && git submodule update --init && sudo make install_c - - run: GRPCIO_SYS_USE_PKG_CONFIG=1 RUSTFLAGS="-A unused-attributes" cargo test --all Linux-Stable: name: Linux-Stable @@ -37,7 +34,8 @@ jobs: - uses: actions/checkout@v2 - run: which go && go version && which cargo && cargo version && clang --version && openssl version - run: scripts/reset-submodule.cmd - - run: scripts/generate-bindings.sh && git diff --exit-code HEAD + - run: env TEST_BIND=0 scripts/generate-bindings.sh && git diff --exit-code HEAD + - run: scripts/generate-bindings.sh - run: cargo build --no-default-features - run: cargo build --no-default-features --features protobuf-codec - run: cargo build --no-default-features --features prost-codec diff --git a/.travis.yml b/.travis.yml index 0974f0c..822e0f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ # Travis is only used to test ARM64 Linux -dist: xenial +dist: focal sudo: false language: rust git: @@ -11,15 +11,11 @@ rust: stable env: global: - RUST_BACKTRACE=1 - - RUSTFLAGS="--deny=warnings" - -install: - - if ! which go 2>/dev/null; then - wget https://dl.google.com/go/go1.13.8.linux-arm64.tar.gz; - tar -xf go1.13.8.linux-arm64.tar.gz -C $HOME; - export PATH="$HOME/go/bin:$PATH"; - export GO_ROOT=$HOME/go; - fi + # absl deadlock detection performs poorly on arm, so we build it release + # mode to skip the check. Enabling debug-assertions to get safer test + # results. + - RUSTFLAGS="--deny=warnings -C debug-assertions" + - TEST_BIND=1 addons: apt: @@ -33,51 +29,20 @@ jobs: arch: arm64 before_script: - scripts/reset-submodule.cmd - - export GRPC_VERSION=1.29.1 - - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin" - - GRPC_HEADER="$HOME/.cache/include/grpc/grpc.h" - - if [[ ! -f "$GRPC_HEADER" ]] ; then - ( - git clone -b v$GRPC_VERSION https://github.com/grpc/grpc && - cd grpc && - git submodule update --init && - env prefix=$HOME/.cache make install_c - ); - fi - - eval "$(gimme stable)" - - export CPLUS_INCLUDE_PATH="$HOME/.cache/include" - - export LD_LIBRARY_PATH="$HOME/.cache/lib" - - export DYLD_LIBRARY_PATH="$HOME/.cache/lib" - - export LIBRARY_PATH="$HOME/.cache/lib" - - export PKG_CONFIG_PATH="$HOME/.cache/lib/pkgconfig" - script: - - which go && go version - - GRPCIO_SYS_USE_PKG_CONFIG=1 RUSTFLAGS="-A unused-attributes" cargo test --all - - - os: linux - arch: arm64 - before_script: - - scripts/reset-submodule.cmd - - export GRPC_VERSION=1.29.1 + - export GRPC_VERSION=1.33.1 - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin" - - GRPC_HEADER="$HOME/.cache/include/grpc/grpc.h" - - sudo apt-get update && sudo apt-get -y install libssl-dev; + - sudo apt-get update && sudo apt-get -y install libssl-dev + - which cmake && cmake --version && openssl version - eval "$(gimme stable)" - - export CPLUS_INCLUDE_PATH="$HOME/.cache/include" - - export LD_LIBRARY_PATH="$HOME/.cache/lib" - - export DYLD_LIBRARY_PATH="$HOME/.cache/lib" - - export LIBRARY_PATH="$HOME/.cache/lib" - - export PKG_CONFIG_PATH="$HOME/.cache/lib/pkgconfig" script: - - which go && go version - if [[ $TRAVIS_OS_NAME == "linux" ]] && [[ $TRAVIS_RUST_VERSION == "stable" ]]; then rustup component add rustfmt && cargo fmt --all -- --check; - scripts/generate-bindings.sh && git diff --exit-code HEAD; + env TEST_BIND=0 scripts/generate-bindings.sh && git diff --exit-code HEAD; fi + - ./scripts/generate-bindings.sh - cargo build --no-default-features - cargo build --no-default-features --features protobuf-codec - cargo build --no-default-features --features prost-codec - cargo build - - cargo test --all - - cargo test --features "openssl" --all - - cargo test --features "openssl-vendored" --all
\ No newline at end of file + - cargo test --release --all + - cargo test --release --features "openssl-vendored" --all @@ -21,12 +21,14 @@ rust_library { } // dependent_library ["feature_list"] -// aho-corasick-0.7.14 "default,std" +// aho-corasick-0.7.15 "default,std" // bindgen-0.51.1 // bitflags-1.2.1 "default" -// cc-1.0.61 +// boringssl-src-0.1.0 +// cc-1.0.62 // cexpr-0.3.6 // cfg-if-0.1.10 +// cfg-if-1.0.0 // clang-sys-0.28.1 "clang_6_0,gte_clang_3_6,gte_clang_3_7,gte_clang_3_8,gte_clang_3_9,gte_clang_4_0,gte_clang_5_0,gte_clang_6_0,libloading,runtime" // cmake-0.1.44 // futures-0.3.7 "alloc,async-await,default,executor,futures-executor,std" @@ -39,18 +41,19 @@ rust_library { // futures-task-0.3.7 "alloc,once_cell,std" // futures-util-0.3.7 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" // glob-0.3.0 -// grpcio-sys-0.6.0 "default" +// grpcio-sys-0.7.1 "default" +// instant-0.1.8 // lazy_static-1.4.0 // libc-0.2.80 "default,std" // libloading-0.5.2 // libz-sys-1.1.2 "default,libc,static,stock-zlib" -// lock_api-0.3.4 +// lock_api-0.4.1 // log-0.4.11 // memchr-2.3.4 "default,std,use_std" // nom-4.2.3 "alloc,default,std,verbose-errors" // once_cell-1.4.1 "std" -// parking_lot-0.10.2 "default" -// parking_lot_core-0.7.2 +// parking_lot-0.11.0 "default" +// parking_lot_core-0.8.0 // peeking_take_while-0.1.2 // pin-project-1.0.1 // pin-project-internal-1.0.1 diff --git a/CHANGELOG.md b/CHANGELOG.md index d4c0b52..65a0e8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +# 0.7.0 - 2020-11-02 + +- Add blocking callback to `EnvBuilder` (#474) +- Enhance sinks to make them batchable (#469) +- Remove `rustfmt_skip` attribute since it is unstable (#479) +- Use `grpc_slice` to reduce memory copy (#481) +- Fix the bug that server cannot shutdown itself when drop (#484) +- Add methods for channels from file descriptors (#488) +- Update gRPC C core to 1.33.1 (#492) + # 0.6.0 - 2020-06-12 - Switch to std::future (#447) @@ -13,7 +13,7 @@ [package] edition = "2018" name = "grpcio" -version = "0.6.0" +version = "0.7.0" authors = ["The TiKV Project Developers"] autoexamples = false description = "The rust language implementation of gRPC, base on the gRPC c core library." @@ -36,7 +36,7 @@ optional = true version = "0.3" [dependencies.grpcio-sys] -version = "0.6.0" +version = "0.7" [dependencies.libc] version = "0.2" @@ -45,7 +45,7 @@ version = "0.2" version = "0.4" [dependencies.parking_lot] -version = "0.10" +version = "0.11" [dependencies.prost] version = "0.6" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 7775f30..18457aa 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "grpcio" -version = "0.6.0" +version = "0.7.0" edition = "2018" authors = ["The TiKV Project Developers"] license = "Apache-2.0" @@ -17,14 +17,14 @@ autoexamples = false all-features = true [dependencies] -grpcio-sys = { path = "grpc-sys", version = "0.6.0" } +grpcio-sys = { path = "grpc-sys", version = "0.7" } libc = "0.2" futures = "0.3" protobuf = { version = "2.0", optional = true } prost = { version = "0.6", optional = true } bytes = { version = "0.5", optional = true } log = "0.4" -parking_lot = "0.10" +parking_lot = "0.11" [workspace] members = ["proto", "benchmark", "compiler", "interop", "tests-and-examples"] @@ -45,5 +45,4 @@ debug = true travis-ci = { repository = "tikv/grpc-rs" } [patch.crates-io] -grpcio-compiler = { path = "compiler", version = "0.6.0", default-features = false } -protobuf-build = { git = "https://github.com/tikv/protobuf-build.git" } +grpcio-compiler = { path = "compiler", version = "0.7.0", default-features = false } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/grpcio/grpcio-0.6.0.crate" + value: "https://static.crates.io/crates/grpcio/grpcio-0.7.0.crate" } - version: "0.6.0" + version: "0.7.0" license_type: NOTICE last_upgrade_date { year: 2020 - month: 10 - day: 14 + month: 11 + day: 8 } } diff --git a/scripts/reset-submodule.cmd b/scripts/reset-submodule.cmd index f873a75..bac6a35 100755 --- a/scripts/reset-submodule.cmd +++ b/scripts/reset-submodule.cmd @@ -1,8 +1,9 @@ git submodule update --init grpc-sys/grpc cd grpc-sys/grpc -git submodule update --init third_party/boringssl-with-bazel git submodule update --init third_party/cares/cares git submodule update --init third_party/abseil-cpp +git submodule update --init third_party/re2 +rm -rf third_party/boringssl-with-bazel/* cd third_party/zlib -git clean -f +git clean -df git reset --hard @@ -7,6 +7,11 @@ use std::fmt::{self, Debug, Formatter}; use std::io::{self, BufRead, Read}; use std::mem::{self, ManuallyDrop, MaybeUninit}; +/// Copied from grpc-sys/grpc/include/grpc/impl/codegen/slice.h. Unfortunately bindgen doesn't +/// generate it automatically. +const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut u8>() - 1 + + mem::size_of::<*mut libc::c_void>(); + /// A convenient rust wrapper for the type `grpc_slice`. /// /// It's expected that the slice should be initialized. @@ -58,6 +63,41 @@ impl GrpcSlice { pub fn from_static_str(s: &'static str) -> GrpcSlice { GrpcSlice::from_static_slice(s.as_bytes()) } + + /// Checks whether the slice stores bytes inline. + pub fn is_inline(&self) -> bool { + self.0.refcount.is_null() + } + + /// Reallocates current slice with given capacity. + /// + /// The length of returned slice is the exact same as given cap. + /// + /// ## Safety + /// + /// Caller is expected to initialize all available bytes to guarantee safety of this slice. + pub unsafe fn realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>] { + if cap <= INLINED_SIZE { + // Only inlined slice can be reused safely. + if !self.0.refcount.is_null() { + *self = GrpcSlice::default(); + } + self.0.data.inlined.length = cap as u8; + std::slice::from_raw_parts_mut( + self.0.data.inlined.bytes.as_mut_ptr() as *mut MaybeUninit<u8>, + cap, + ) + } else { + *self = GrpcSlice(grpcio_sys::grpc_slice_malloc_large(cap)); + let start = self.0.data.refcounted.bytes; + let len = self.0.data.refcounted.length; + std::slice::from_raw_parts_mut(start as *mut MaybeUninit<u8>, len) + } + } + + pub fn as_mut_ptr(&mut self) -> *mut grpc_slice { + &mut self.0 + } } impl Clone for GrpcSlice { @@ -91,6 +131,9 @@ impl Drop for GrpcSlice { } } +unsafe impl Send for GrpcSlice {} +unsafe impl Sync for GrpcSlice {} + impl PartialEq<[u8]> for GrpcSlice { fn eq(&self, r: &[u8]) -> bool { // Technically, the equal function inside vtable should be used. diff --git a/src/call/client.rs b/src/call/client.rs index eac0db4..279e619 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -14,6 +14,7 @@ use parking_lot::Mutex; use std::future::Future; use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags}; +use crate::buf::GrpcSlice; use crate::call::{check_run, Call, MessageReader, Method}; use crate::channel::Channel; use crate::codec::{DeserializeFn, SerializeFn}; @@ -108,14 +109,13 @@ impl Call { mut opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>> { let call = channel.create_call(method, &opt)?; - let mut payload = vec![]; + let mut payload = GrpcSlice::default(); (method.req_ser())(req, &mut payload); let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_unary( call.call, ctx, - payload.as_ptr() as *const _, - payload.len(), + payload.as_mut_ptr(), opt.write_flags.flags, opt.headers .as_mut() @@ -162,14 +162,13 @@ impl Call { mut opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>> { let call = channel.create_call(method, &opt)?; - let mut payload = vec![]; + let mut payload = GrpcSlice::default(); (method.req_ser())(req, &mut payload); let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_server_streaming( call.call, ctx, - payload.as_ptr() as _, - payload.len(), + payload.as_mut_ptr(), opt.write_flags.flags, opt.headers .as_mut() @@ -331,6 +330,19 @@ impl<Req> StreamingCallSink<Req> { } } + /// By default it always sends messages with their configured buffer hint. But when the + /// `enhance_batch` is enabled, messages will be batched together as many as possible. + /// The rules are listed as below: + /// - All messages except the last one will be sent with `buffer_hint` set to true. + /// - The last message will also be sent with `buffer_hint` set to true unless any message is + /// offered with buffer hint set to false. + /// + /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of + /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core. + pub fn enhance_batch(&mut self, flag: bool) { + self.sink_base.enhance_buffer_strategy = flag; + } + pub fn cancel(&mut self) { let call = self.call.lock(); call.call.cancel() @@ -373,7 +385,8 @@ impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> { let mut call = self.call.lock(); call.check_alive()?; } - Pin::new(&mut self.sink_base).poll_ready(cx) + let t = &mut *self; + Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { diff --git a/src/call/mod.rs b/src/call/mod.rs index 03e520a..a06e003 100644 --- a/src/call/mod.rs +++ b/src/call/mod.rs @@ -16,15 +16,12 @@ use futures::task::{Context, Poll}; use libc::c_void; use parking_lot::Mutex; -use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader}; +use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice}; use crate::codec::{DeserializeFn, Marshaller, SerializeFn}; use crate::error::{Error, Result}; use crate::grpc_sys::grpc_status_code::*; use crate::task::{self, BatchFuture, BatchType, CallTag}; -// By default buffers in `SinkBase` will be shrink to 4K size. -const BUF_SHRINK_SIZE: usize = 4 * 1024; - /// An gRPC status code structure. /// This type contains constants for all gRPC status codes. #[derive(PartialEq, Eq, Clone, Copy)] @@ -296,7 +293,7 @@ impl Call { /// Send a message asynchronously. pub fn start_send_message( &mut self, - msg: &[u8], + msg: &mut GrpcSlice, write_flags: u32, initial_meta: bool, ) -> Result<BatchFuture> { @@ -306,8 +303,7 @@ impl Call { grpc_sys::grpcwrap_call_send_message( self.call, ctx, - msg.as_ptr() as _, - msg.len(), + msg.as_mut_ptr(), write_flags, i, tag, @@ -350,20 +346,21 @@ impl Call { &mut self, status: &RpcStatus, send_empty_metadata: bool, - payload: &Option<Vec<u8>>, + payload: &mut Option<GrpcSlice>, write_flags: u32, ) -> Result<BatchFuture> { let _cq_ref = self.cq.borrow()?; let send_empty_metadata = if send_empty_metadata { 1 } else { 0 }; - let (payload_ptr, payload_len) = payload - .as_ref() - .map_or((ptr::null(), 0), |b| (b.as_ptr(), b.len())); let f = check_run(BatchType::Finish, |ctx, tag| unsafe { let details_ptr = status .details .as_ref() .map_or_else(ptr::null, |s| s.as_ptr() as _); let details_len = status.details.as_ref().map_or(0, String::len); + let payload_p = match payload { + Some(p) => p.as_mut_ptr(), + None => ptr::null_mut(), + }; grpc_sys::grpcwrap_call_send_status_from_server( self.call, ctx, @@ -372,8 +369,7 @@ impl Call { details_len, ptr::null_mut(), send_empty_metadata, - payload_ptr as _, - payload_len, + payload_p, write_flags, tag, ) @@ -407,8 +403,7 @@ impl Call { details_len, ptr::null_mut(), 1, - ptr::null(), - 0, + ptr::null_mut(), 0, tag_ptr as *mut c_void, ) @@ -628,17 +623,30 @@ impl WriteFlags { /// A helper struct for constructing Sink object for batch requests. struct SinkBase { + // Batch job to be executed in `poll_ready`. batch_f: Option<BatchFuture>, - buf: Vec<u8>, send_metadata: bool, + // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch + // messages as much as possible. + enhance_buffer_strategy: bool, + // Buffer used to store the data to be sent, send out the last data in this round of `start_send`. + buffer: GrpcSlice, + // Write flags used to control the data to be sent in `buffer`. + buf_flags: Option<WriteFlags>, + // Used to records whether a message in which `buffer_hint` is false exists. + // Note: only used in enhanced buffer strategy. + last_buf_hint: bool, } impl SinkBase { fn new(send_metadata: bool) -> SinkBase { SinkBase { batch_f: None, - buf: Vec::new(), + buffer: GrpcSlice::default(), + buf_flags: None, + last_buf_hint: true, send_metadata, + enhance_buffer_strategy: false, } } @@ -646,29 +654,35 @@ impl SinkBase { &mut self, call: &mut C, t: &T, - mut flags: WriteFlags, + flags: WriteFlags, ser: SerializeFn<T>, ) -> Result<()> { - // `start_send` is supposed to be called after `poll_ready` returns ready. - assert!(self.batch_f.is_none()); + // temporary fix: buffer hint with send meta will not send out any metadata. + // note: only the first message can enter this code block. + if self.send_metadata { + ser(t, &mut self.buffer); + self.buf_flags = Some(flags); + self.start_send_buffer_message(false, call)?; + self.send_metadata = false; + return Ok(()); + } - self.buf.clear(); - ser(t, &mut self.buf); - if flags.get_buffer_hint() && self.send_metadata { - // temporary fix: buffer hint with send meta will not send out any metadata. - flags = flags.buffer_hint(false); + // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate + // that this is not the last message. + if self.buf_flags.is_some() { + self.start_send_buffer_message(true, call)?; } - let write_f = call.call(|c| { - c.call - .start_send_message(&self.buf, flags.flags, self.send_metadata) - })?; - // NOTE: Content of `self.buf` is copied into grpc internal. - if self.buf.capacity() > BUF_SHRINK_SIZE { - self.buf.truncate(BUF_SHRINK_SIZE); - self.buf.shrink_to_fit(); + + ser(t, &mut self.buffer); + let hint = flags.get_buffer_hint(); + self.last_buf_hint &= hint; + self.buf_flags = Some(flags); + + // If sink disable batch, start sending the message in buffer immediately. + if !self.enhance_buffer_strategy { + self.start_send_buffer_message(hint, call)?; } - self.batch_f = Some(write_f); - self.send_metadata = false; + Ok(()) } @@ -683,4 +697,44 @@ impl SinkBase { self.batch_f.take(); Poll::Ready(Ok(())) } + + #[inline] + fn poll_flush<C: ShareCallHolder>( + &mut self, + cx: &mut Context, + call: &mut C, + ) -> Poll<Result<()>> { + if self.batch_f.is_some() { + ready!(self.poll_ready(cx)?); + } + if self.buf_flags.is_some() { + self.start_send_buffer_message(self.last_buf_hint, call)?; + ready!(self.poll_ready(cx)?); + } + self.last_buf_hint = true; + Poll::Ready(Ok(())) + } + + #[inline] + fn start_send_buffer_message<C: ShareCallHolder>( + &mut self, + buffer_hint: bool, + call: &mut C, + ) -> Result<()> { + // `start_send` is supposed to be called after `poll_ready` returns ready. + assert!(self.batch_f.is_none()); + + let mut flags = self.buf_flags.clone().unwrap(); + flags = flags.buffer_hint(buffer_hint); + let write_f = call.call(|c| { + c.call + .start_send_message(&mut self.buffer, flags.flags, self.send_metadata) + })?; + self.batch_f = Some(write_f); + if !self.buffer.is_inline() { + self.buffer = GrpcSlice::default(); + } + self.buf_flags.take(); + Ok(()) + } } diff --git a/src/call/server.rs b/src/call/server.rs index 8875d6d..add9874 100644 --- a/src/call/server.rs +++ b/src/call/server.rs @@ -17,6 +17,7 @@ use parking_lot::Mutex; use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags}; use crate::auth_context::AuthContext; +use crate::buf::GrpcSlice; use crate::call::{ BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase, }; @@ -322,7 +323,7 @@ macro_rules! impl_unary_sink { $t { call: Some(call), write_flags: 0, - ser: ser, + ser, } } @@ -335,8 +336,8 @@ macro_rules! impl_unary_sink { } fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt { - let data = t.as_ref().map(|t| { - let mut buf = vec![]; + let mut data = t.as_ref().map(|t| { + let mut buf = GrpcSlice::default(); (self.ser)(t, &mut buf); buf }); @@ -344,7 +345,7 @@ macro_rules! impl_unary_sink { let write_flags = self.write_flags; let res = self.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(&status, true, &data, write_flags) + .start_send_status_from_server(&status, true, &mut data, write_flags) }); let (cq_f, err) = match res { @@ -354,8 +355,8 @@ macro_rules! impl_unary_sink { $rt { call: self.call.take().unwrap(), - cq_f: cq_f, - err: err, + cq_f, + err, } } } @@ -420,10 +421,23 @@ macro_rules! impl_stream_sink { status: RpcStatus::ok(), flushed: false, closed: false, - ser: ser, + ser, } } + /// By default it always sends messages with their configured buffer hint. But when the + /// `enhance_batch` is enabled, messages will be batched together as many as possible. + /// The rules are listed as below: + /// - All messages except the last one will be sent with `buffer_hint` set to true. + /// - The last message will also be sent with `buffer_hint` set to true unless any message is + /// offered with buffer hint set to false. + /// + /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of + /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core. + pub fn enhance_batch(&mut self, flag: bool) { + self.base.enhance_buffer_strategy = flag; + } + pub fn set_status(&mut self, status: RpcStatus) { assert!(self.flush_f.is_none()); self.status = status; @@ -434,7 +448,7 @@ macro_rules! impl_stream_sink { let send_metadata = self.base.send_metadata; let res = self.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(&status, send_metadata, &None, 0) + .start_send_status_from_server(&status, send_metadata, &mut None, 0) }); let (fail_f, err) = match res { @@ -444,8 +458,8 @@ macro_rules! impl_stream_sink { $ft { call: self.call.take().unwrap(), - fail_f: fail_f, - err: err, + fail_f, + err, } } } @@ -487,7 +501,8 @@ macro_rules! impl_stream_sink { if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { return Poll::Ready(Err(Error::RemoteStopped)); } - Pin::new(&mut self.base).poll_ready(cx) + let t = &mut *self; + Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap()) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { @@ -499,7 +514,7 @@ macro_rules! impl_stream_sink { let status = &t.status; let flush_f = t.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(status, send_metadata, &None, 0) + .start_send_status_from_server(status, send_metadata, &mut None, 0) })?; t.flush_f = Some(flush_f); } diff --git a/src/channel.rs b/src/channel.rs index 8fb7fc8..bdf95ce 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -28,7 +28,7 @@ pub use crate::grpc_sys::{ /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents fn format_user_agent_string(agent: &str) -> CString { - let version = "0.6.0"; + let version = "0.7.0"; let trimed_agent = agent.trim(); let val = if trimed_agent.is_empty() { format!("grpc-rust/{}", version) @@ -111,7 +111,7 @@ impl ChannelBuilder { self } - /// Set maximum message length that the channel can receive. `usize::MAX` means unlimited. + /// Set maximum message length that the channel can receive. `-1` means unlimited. pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder { self.options.insert( Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH), @@ -391,7 +391,7 @@ impl ChannelBuilder { } /// Build `ChannelArgs` from the current configuration. - #[allow(clippy::identity_conversion)] + #[allow(clippy::useless_conversion)] pub fn build_args(&self) -> ChannelArgs { let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) }; for (i, (k, v)) in self.options.iter().enumerate() { @@ -442,6 +442,28 @@ impl ChannelBuilder { let channel = unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) }; + unsafe { Channel::new(self.env.pick_cq(), self.env, channel) } + } + + /// Build an insecure [`Channel`] taking over an established connection from + /// a file descriptor. The target string given is purely informative to + /// describe the endpoint of the connection. Takes ownership of the given + /// file descriptor and will close it when the connection is closed. + /// + /// This function is available on posix systems only. + /// + /// # Safety + /// + /// The file descriptor must correspond to a connected stream socket. After + /// this call, the socket must not be accessed (read / written / closed) + /// by other code. + #[cfg(unix)] + pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel { + let args = self.prepare_connect_args(); + let target = CString::new(target).unwrap(); + let target_ptr = target.as_ptr(); + let channel = grpc_sys::grpc_insecure_channel_create_from_fd(target_ptr, fd, args.args); + Channel::new(self.env.pick_cq(), self.env, channel) } } @@ -489,7 +511,7 @@ mod secure_channel { ) }; - Channel::new(self.env.pick_cq(), self.env, channel) + unsafe { Channel::new(self.env.pick_cq(), self.env, channel) } } } } @@ -548,7 +570,19 @@ unsafe impl Send for Channel {} unsafe impl Sync for Channel {} impl Channel { - fn new(cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel) -> Channel { + /// Create a new channel. Avoid using this directly and use + /// [`ChannelBuilder`] to build a [`Channel`] instead. + /// + /// # Safety + /// + /// The given grpc_channel must correspond to an instantiated grpc core + /// channel. Takes exclusive ownership of the channel and will close it after + /// use. + pub unsafe fn new( + cq: CompletionQueue, + env: Arc<Environment>, + channel: *mut grpc_channel, + ) -> Channel { Channel { inner: Arc::new(ChannelInner { _env: env, channel }), cq, diff --git a/src/codec.rs b/src/codec.rs index 4a84489..e0214b7 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -1,10 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::buf::GrpcSlice; use crate::call::MessageReader; use crate::error::Result; pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>; -pub type SerializeFn<T> = fn(&T, &mut Vec<u8>); +pub type SerializeFn<T> = fn(&T, &mut GrpcSlice); /// Defines how to serialize and deserialize between the specialized type and byte slice. pub struct Marshaller<T> { @@ -26,14 +27,21 @@ pub struct Marshaller<T> { #[cfg(feature = "protobuf-codec")] pub mod pb_codec { - use protobuf::{CodedInputStream, Message}; + use protobuf::{CodedInputStream, CodedOutputStream, Message}; use super::MessageReader; + use crate::buf::GrpcSlice; use crate::error::Result; #[inline] - pub fn ser<T: Message>(t: &T, buf: &mut Vec<u8>) { - t.write_to_vec(buf).unwrap() + pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) { + let cap = t.compute_size(); + unsafe { + let bytes = buf.realloc(cap as usize); + let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); + let mut s = CodedOutputStream::bytes(raw_bytes); + t.write_to_with_cached_sizes(&mut s).unwrap(); + } } #[inline] @@ -47,15 +55,22 @@ pub mod pb_codec { #[cfg(feature = "prost-codec")] pub mod pr_codec { - use bytes::buf::BufMut; use prost::Message; use super::MessageReader; + use crate::buf::GrpcSlice; use crate::error::Result; #[inline] - pub fn ser<M: Message, B: BufMut>(msg: &M, buf: &mut B) { - msg.encode(buf).expect("Writing message to buffer failed"); + pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) { + let size = msg.encoded_len(); + unsafe { + let bytes = buf.realloc(size); + let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); + msg.encode(&mut b) + .expect("Writing message to buffer failed"); + debug_assert!(b.is_empty()); + } } #[inline] @@ -38,6 +38,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) { pub struct EnvBuilder { cq_count: usize, name_prefix: Option<String>, + after_start: Option<Arc<dyn Fn() + Send + Sync>>, + before_stop: Option<Arc<dyn Fn() + Send + Sync>>, } impl EnvBuilder { @@ -46,6 +48,8 @@ impl EnvBuilder { EnvBuilder { cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize }, name_prefix: None, + after_start: None, + before_stop: None, } } @@ -67,6 +71,18 @@ impl EnvBuilder { self } + /// Execute function `f` after each thread is started but before it starts doing work. + pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder { + self.after_start = Some(Arc::new(f)); + self + } + + /// Execute function `f` before each thread stops. + pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder { + self.before_stop = Some(Arc::new(f)); + self + } + /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library. pub fn build(self) -> Environment { unsafe { @@ -81,7 +97,19 @@ impl EnvBuilder { if let Some(ref prefix) = self.name_prefix { builder = builder.name(format!("{}-{}", prefix, i)); } - let handle = builder.spawn(move || poll_queue(tx_i)).unwrap(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let handle = builder + .spawn(move || { + if let Some(f) = after_start { + f(); + } + poll_queue(tx_i); + if let Some(f) = before_stop { + f(); + } + }) + .unwrap(); handles.push(handle); } for _ in 0..self.cq_count { @@ -43,6 +43,7 @@ mod security; mod server; mod task; +pub use crate::buf::GrpcSlice; pub use crate::call::client::{ CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver, ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, StreamingCallSink, diff --git a/src/metadata.rs b/src/metadata.rs index ef49bcb..746b593 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -1,7 +1,8 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use crate::grpc_sys::{self, grpc_metadata_array}; +use crate::grpc_sys::{self, grpc_metadata, grpc_metadata_array}; use std::borrow::Cow; +use std::mem::ManuallyDrop; use std::{mem, slice, str}; use crate::error::{Error, Result}; @@ -184,6 +185,35 @@ impl Metadata { index: 0, } } + + /// Decomposes a Metadata array into its raw components. + /// + /// Returns the raw pointer to the underlying data, the length of the vector (in elements), + /// and the allocated capacity of the data (in elements). These are the same arguments in + /// the same order as the arguments to from_raw_parts. + /// + /// After calling this function, the caller is responsible for the memory previously managed + /// by the Metadata. The only way to do this is to convert the raw pointer, length, and + /// capacity back into a Metadata with the from_raw_parts function, allowing the destructor + /// to perform the cleanup. + pub fn into_raw_parts(self) -> (*mut grpc_metadata, usize, usize) { + let s = ManuallyDrop::new(self); + (s.0.metadata, s.0.count, s.0.capacity) + } + + /// Creates a Metadata directly from the raw components of another vector. + /// + /// ## Safety + /// + /// The operation is safe only if the three arguments are returned from `into_raw_parts` + /// and only convert once. + pub unsafe fn from_raw_parts(p: *mut grpc_metadata, len: usize, cap: usize) -> Metadata { + Metadata(grpc_metadata_array { + count: len, + capacity: cap, + metadata: p, + }) + } } impl Clone for Metadata { diff --git a/src/security/credentials.rs b/src/security/credentials.rs index 8d835ee..7d73009 100644 --- a/src/security/credentials.rs +++ b/src/security/credentials.rs @@ -352,7 +352,7 @@ impl ChannelCredentials { unsafe { grpc_sys::grpc_init(); } - let creds = unsafe { grpc_sys::grpc_google_default_credentials_create() }; + let creds = unsafe { grpc_sys::grpc_google_default_credentials_create(ptr::null_mut()) }; if creds.is_null() { Err(Error::GoogleAuthenticationFailed) } else { diff --git a/src/server.rs b/src/server.rs index 3dd8bf3..8cb6a87 100644 --- a/src/server.rs +++ b/src/server.rs @@ -561,13 +561,27 @@ impl Server { pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> { self.core.binders.iter().map(|b| (&b.host, b.port)) } + + /// Add an rpc channel for an established connection represented as a file + /// descriptor. Takes ownership of the file descriptor, closing it when + /// channel is closed. + /// + /// # Safety + /// + /// The file descriptor must correspond to a connected stream socket. After + /// this call, the socket must not be accessed (read / written / closed) + /// by other code. + #[cfg(unix)] + pub unsafe fn add_insecure_channel_from_fd(&self, fd: ::std::os::raw::c_int) { + grpc_sys::grpc_server_add_insecure_channel_from_fd(self.core.server, ptr::null_mut(), fd) + } } impl Drop for Server { fn drop(&mut self) { // if the server is not shutdown completely, destroy a server will core. // TODO: don't wait here - let f = if self.core.shutdown.load(Ordering::SeqCst) { + let f = if !self.core.shutdown.load(Ordering::SeqCst) { Some(self.shutdown()) } else { None |