diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2021-04-03 03:04:47 +0000 |
---|---|---|
committer | android-build-team Robot <android-build-team-robot@google.com> | 2021-04-03 03:04:47 +0000 |
commit | dd3964ccf8bc6c5cfeb51d8c80843379cbe8423d (patch) | |
tree | a54e24d149f00ad487d9da00d4ded4043d6d1787 | |
parent | 884f170defbaf90f7676fdbf7c7bf9382ed3157c (diff) | |
parent | cd0dfe04a46259f7e56110c7d5d11bacd8e221ae (diff) | |
download | grpcio-dd3964ccf8bc6c5cfeb51d8c80843379cbe8423d.tar.gz |
Snap for 7256110 from cd0dfe04a46259f7e56110c7d5d11bacd8e221ae to sc-v2-releaseandroid-vts-12.1_r9android-vts-12.1_r8android-vts-12.1_r7android-vts-12.1_r6android-vts-12.1_r5android-vts-12.1_r4android-vts-12.1_r3android-vts-12.1_r2android-vts-12.1_r10android-vts-12.1_r1android-platform-12.1.0_r9android-platform-12.1.0_r8android-platform-12.1.0_r7android-platform-12.1.0_r6android-platform-12.1.0_r5android-platform-12.1.0_r4android-platform-12.1.0_r3android-platform-12.1.0_r28android-platform-12.1.0_r27android-platform-12.1.0_r26android-platform-12.1.0_r25android-platform-12.1.0_r24android-platform-12.1.0_r23android-platform-12.1.0_r22android-platform-12.1.0_r21android-platform-12.1.0_r20android-platform-12.1.0_r2android-platform-12.1.0_r19android-platform-12.1.0_r18android-platform-12.1.0_r17android-platform-12.1.0_r16android-platform-12.1.0_r15android-platform-12.1.0_r14android-platform-12.1.0_r13android-platform-12.1.0_r12android-platform-12.1.0_r11android-platform-12.1.0_r10android-platform-12.1.0_r1android-cts-12.1_r9android-cts-12.1_r8android-cts-12.1_r7android-cts-12.1_r6android-cts-12.1_r5android-cts-12.1_r4android-cts-12.1_r3android-cts-12.1_r2android-cts-12.1_r10android-cts-12.1_r1android-12.1.0_r6android-12.1.0_r5android-12.1.0_r4android-12.1.0_r3android-12.1.0_r27android-12.1.0_r2android-12.1.0_r1android12L-tests-releaseandroid12L-s1-releaseandroid12L-releaseandroid12L-platform-releaseandroid12L-gsi
Change-Id: I1c52577cae8827f904e834e08fa70a97bc4bd653
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | .clang-tidy | 42 | ||||
-rw-r--r-- | .github/workflows/ci.yml | 9 | ||||
-rw-r--r-- | .travis.yml | 23 | ||||
-rw-r--r-- | Android.bp | 68 | ||||
-rw-r--r-- | CHANGELOG.md | 20 | ||||
-rw-r--r-- | Cargo.toml | 12 | ||||
-rw-r--r-- | Cargo.toml.orig | 13 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rwxr-xr-x | scripts/generate-bindings.sh | 2 | ||||
-rw-r--r-- | src/buf.rs | 4 | ||||
-rw-r--r-- | src/call/client.rs | 2 | ||||
-rw-r--r-- | src/call/mod.rs | 8 | ||||
-rw-r--r-- | src/call/server.rs | 26 | ||||
-rw-r--r-- | src/channel.rs | 67 | ||||
-rw-r--r-- | src/cq.rs | 48 | ||||
-rw-r--r-- | src/metadata.rs | 16 | ||||
-rw-r--r-- | src/server.rs | 12 | ||||
-rw-r--r-- | src/task/mod.rs | 25 | ||||
-rw-r--r-- | src/task/promise.rs | 20 |
21 files changed, 287 insertions, 144 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index c4d77c4..05823ce 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "37188956eb8e71631326d708b8afb0940918f5d8" + "sha1": "b35467b1bfe58eedf4e77025432074d361be591f" } } diff --git a/.clang-tidy b/.clang-tidy index d217441..752b25e 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -1,6 +1,44 @@ --- -Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*' -WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*' +# Disable abseil-no-namespace: https://bugs.llvm.org/show_bug.cgi?id=47947 +Checks: '-*, + abseil-*, + -abseil-no-namespace, + bugprone-*, + -bugprone-narrowing-conversions, + -bugprone-too-small-loop-variable, + performance-*, + -performance-unnecessary-copy-initialization, + -performance-unnecessary-value-param, + google-*, + -google-runtime-int, + -google-runtime-references, + misc-definitions-in-headers, + misc-static-assert, + misc-unconventional-assign-operator, + misc-uniqueptr-reset-release, + misc-unused-alias-decls, + misc-unused-using-decls, + modernize-make-unique, + -modernize-redundant-void-arg, + modernize-replace-auto-ptr, + modernize-shrink-to-fit, + modernize-use-bool-literals, + modernize-use-nullptr, + modernize-use-override, + readability-container-size-empty, + readability-deleted-default, + readability-function-size, + readability-inconsistent-declaration-parameter-name, + readability-redundant-control-flow, + readability-redundant-smartptr-get, + readability-string-compare' +WarningsAsErrors: '*' CheckOptions: - key: readability-function-size.StatementThreshold value: '450' + - key: modernize-make-unique.MakeSmartPtrFunction + value: 'absl::make_unique' + - key: modernize-make-unique.MakeSmartPtrFunctionHeader + value: 'absl/memory/memory.h' + - key: google-readability-braces-around-statements.ShortStatementLines + value: 1 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c42285..3731f9e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,6 +39,7 @@ jobs: - run: cargo build --no-default-features - run: cargo build --no-default-features --features protobuf-codec - run: cargo build --no-default-features --features prost-codec + - run: cd proto && cargo build --no-default-features --features prost-codec - run: cargo build - run: cargo test --all @@ -75,9 +76,9 @@ jobs: - uses: actions/checkout@v2 - run: which go && go version && which cargo && cargo version && clang --version && openssl version - run: scripts/reset-submodule.cmd - - run: cargo build --no-default-features - - run: cargo build --no-default-features --features protobuf-codec - - run: cargo build --no-default-features --features prost-codec + - run: cargo build --no-default-features --features use-bindgen + - run: cargo build --no-default-features --features "protobuf-codec use-bindgen" + - run: cargo build --no-default-features --features "prost-codec use-bindgen" - run: cargo build - run: cargo test --all @@ -95,6 +96,8 @@ jobs: Win: name: Windows runs-on: windows-latest + env: + LIBCLANG_PATH: 'C:\Program Files\LLVM\bin' steps: - uses: actions/checkout@v2 - run: choco install -y llvm diff --git a/.travis.yml b/.travis.yml index 822e0f1..9cc0821 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ # Travis is only used to test ARM64 Linux dist: focal -sudo: false +sudo: true language: rust git: submodules: false @@ -26,12 +26,12 @@ addons: jobs: include: - os: linux - arch: arm64 + arch: arm64-graviton2 + vm: virt before_script: - scripts/reset-submodule.cmd - - export GRPC_VERSION=1.33.1 + - export GRPC_VERSION=1.35.0 - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin" - - sudo apt-get update && sudo apt-get -y install libssl-dev - which cmake && cmake --version && openssl version - eval "$(gimme stable)" script: @@ -44,5 +44,16 @@ jobs: - cargo build --no-default-features --features protobuf-codec - cargo build --no-default-features --features prost-codec - cargo build - - cargo test --release --all - - cargo test --release --features "openssl-vendored" --all + - travis_wait 40 cargo test --release --all + - os: linux + arch: arm64-graviton2 + vm: virt + before_script: + - scripts/reset-submodule.cmd + - export GRPC_VERSION=1.35.0 + - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin" + - sudo apt-get update && sudo apt-get -y install libssl-dev + - which cmake && cmake --version && openssl version + - eval "$(gimme stable)" + script: + - travis_wait 40 cargo test --release --features "openssl-vendored" --all @@ -1,4 +1,5 @@ // This file is generated by cargo2android.py --run --device --dependencies --features=protobuf,protobuf-codec. +// Do not modify this file as changes will be overridden on upgrade. package { default_applicable_licenses: ["external_rust_crates_grpcio_license"], @@ -38,59 +39,40 @@ rust_library { } // dependent_library ["feature_list"] -// aho-corasick-0.7.15 "default,std" -// bindgen-0.51.1 -// bitflags-1.2.1 "default" -// boringssl-src-0.1.0 -// cc-1.0.66 -// cexpr-0.3.6 -// cfg-if-0.1.10 +// boringssl-src-0.2.0 +// cc-1.0.67 // cfg-if-1.0.0 -// clang-sys-0.28.1 "clang_6_0,gte_clang_3_6,gte_clang_3_7,gte_clang_3_8,gte_clang_3_9,gte_clang_4_0,gte_clang_5_0,gte_clang_6_0,libloading,runtime" // cmake-0.1.45 -// futures-0.3.8 "alloc,async-await,default,executor,futures-executor,std" -// futures-channel-0.3.8 "alloc,futures-sink,sink,std" -// futures-core-0.3.8 "alloc,std" -// futures-executor-0.3.8 "std" -// futures-io-0.3.8 "std" -// futures-macro-0.3.8 -// futures-sink-0.3.8 "alloc,std" -// futures-task-0.3.8 "alloc,once_cell,std" -// futures-util-0.3.8 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" -// glob-0.3.0 -// grpcio-sys-0.7.2 "default" +// futures-0.3.13 "alloc,async-await,default,executor,futures-executor,std" +// futures-channel-0.3.13 "alloc,futures-sink,sink,std" +// futures-core-0.3.13 "alloc,std" +// futures-executor-0.3.13 "std" +// futures-io-0.3.13 "std" +// futures-macro-0.3.13 +// futures-sink-0.3.13 "alloc,std" +// futures-task-0.3.13 "alloc,std" +// futures-util-0.3.13 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std" +// grpcio-sys-0.8.1 // instant-0.1.9 -// lazy_static-1.4.0 -// libc-0.2.81 "default,std" -// libloading-0.5.2 +// libc-0.2.92 "default,std" // libz-sys-1.1.2 "default,libc,static,stock-zlib" // lock_api-0.4.2 -// log-0.4.11 -// memchr-2.3.4 "default,std,use_std" -// nom-4.2.3 "alloc,default,std,verbose-errors" -// once_cell-1.5.2 "alloc,std" +// log-0.4.14 +// memchr-2.3.4 "default,std" // parking_lot-0.11.1 "default" -// parking_lot_core-0.8.1 -// peeking_take_while-0.1.2 -// pin-project-1.0.2 -// pin-project-internal-1.0.2 +// parking_lot_core-0.8.3 +// pin-project-lite-0.2.6 // pin-utils-0.1.0 // pkg-config-0.3.19 // proc-macro-hack-0.5.19 -// proc-macro-nested-0.1.6 -// proc-macro2-1.0.24 "default,proc-macro" -// protobuf-2.18.1 -// quote-1.0.7 "default,proc-macro" -// regex-1.4.2 "aho-corasick,default,memchr,perf,perf-cache,perf-dfa,perf-inline,perf-literal,std,thread_local,unicode,unicode-age,unicode-bool,unicode-case,unicode-gencat,unicode-perl,unicode-script,unicode-segment" -// regex-syntax-0.6.21 "default,unicode,unicode-age,unicode-bool,unicode-case,unicode-gencat,unicode-perl,unicode-script,unicode-segment" -// rustc-hash-1.1.0 "default,std" +// proc-macro-nested-0.1.7 +// proc-macro2-1.0.26 "default,proc-macro" +// protobuf-2.22.1 +// quote-1.0.9 "default,proc-macro" // same-file-1.0.6 // scopeguard-1.1.0 -// shlex-0.1.1 // slab-0.4.2 -// smallvec-1.5.1 -// syn-1.0.54 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" -// thread_local-1.0.1 +// smallvec-1.6.1 +// syn-1.0.68 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" // unicode-xid-0.2.1 "default" -// version_check-0.1.5 -// walkdir-2.3.1 +// walkdir-2.3.2 diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a6dfdb..f928b24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,23 @@ +# 0.8.2 - 2012-03-10 + +- Fix send requirement in connectivity APIs (#516) + +# 0.8.1 - 2021-03-05 + +- Support watch connectivity state (#513) +- Fix prost build of grpcio-proto (#515) + +# grpcio-sys 0.8.1 - 2021-03-02 + +- Detect changes ahead to ease pain of upgrading compiler (#511) + +# 0.8.0 - 2021-02-19 + +- Fix clippy warnings (#504) +- Add a way to not use bindgen (#499) +- Update gRPC C core to 1.35.0 (#506) +- Update bindgen to 0.57.0 (#507) + # 0.7.1 - 2020-12-18 - Allow CXX environment variable to override g++ for musl build (#500) @@ -13,7 +13,7 @@ [package] edition = "2018" name = "grpcio" -version = "0.7.1" +version = "0.8.2" authors = ["The TiKV Project Developers"] autoexamples = false description = "The rust language implementation of gRPC, base on the gRPC c core library." @@ -29,14 +29,15 @@ all-features = true [profile.release] debug = true [dependencies.bytes] -version = "0.5" +version = "1.0" optional = true [dependencies.futures] version = "0.3" [dependencies.grpcio-sys] -version = "0.7" +version = "0.8" +default-features = false [dependencies.libc] version = "0.2" @@ -48,7 +49,7 @@ version = "0.4" version = "0.11" [dependencies.prost] -version = "0.6" +version = "0.7" optional = true [dependencies.protobuf] @@ -56,12 +57,13 @@ version = "2.0" optional = true [features] -default = ["protobuf-codec", "secure"] +default = ["protobuf-codec", "secure", "use-bindgen"] no-omit-frame-pointer = ["grpcio-sys/no-omit-frame-pointer"] openssl = ["secure", "grpcio-sys/openssl"] openssl-vendored = ["secure", "grpcio-sys/openssl-vendored"] prost-codec = ["prost", "bytes"] protobuf-codec = ["protobuf"] secure = ["grpcio-sys/secure"] +use-bindgen = ["grpcio-sys/use-bindgen"] [badges.travis-ci] repository = "tikv/grpc-rs" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 7daa61e..8938f97 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "grpcio" -version = "0.7.1" +version = "0.8.2" edition = "2018" authors = ["The TiKV Project Developers"] license = "Apache-2.0" @@ -17,12 +17,12 @@ autoexamples = false all-features = true [dependencies] -grpcio-sys = { path = "grpc-sys", version = "0.7" } +grpcio-sys = { path = "grpc-sys", version = "0.8", default-features = false } libc = "0.2" futures = "0.3" protobuf = { version = "2.0", optional = true } -prost = { version = "0.6", optional = true } -bytes = { version = "0.5", optional = true } +prost = { version = "0.7", optional = true } +bytes = { version = "1.0", optional = true } log = "0.4" parking_lot = "0.11" @@ -30,13 +30,14 @@ parking_lot = "0.11" members = ["proto", "benchmark", "compiler", "interop", "tests-and-examples"] [features] -default = ["protobuf-codec", "secure"] +default = ["protobuf-codec", "secure", "use-bindgen"] protobuf-codec = ["protobuf"] prost-codec = ["prost", "bytes"] secure = ["grpcio-sys/secure"] openssl = ["secure", "grpcio-sys/openssl"] openssl-vendored = ["secure", "grpcio-sys/openssl-vendored"] no-omit-frame-pointer = ["grpcio-sys/no-omit-frame-pointer"] +use-bindgen = ["grpcio-sys/use-bindgen"] [profile.release] debug = true @@ -45,4 +46,4 @@ debug = true travis-ci = { repository = "tikv/grpc-rs" } [patch.crates-io] -grpcio-compiler = { path = "compiler", version = "0.7.0", default-features = false } +grpcio-compiler = { path = "compiler", version = "0.8.0", default-features = false } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/grpcio/grpcio-0.7.1.crate" + value: "https://static.crates.io/crates/grpcio/grpcio-0.8.2.crate" } - version: "0.7.1" + version: "0.8.2" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 12 - day: 18 + year: 2021 + month: 4 + day: 1 } } @@ -31,7 +31,7 @@ This project is still under development. The following features with the check m - LLVM and Clang >= 3.9 if you need to generate bindings at compile time. - By default, the [secure feature](#feature-secure) is provided by boringssl. You can also use openssl instead by enabling [openssl feature](#feature-openssl). -For Linux and MacOS, you also need to install gcc (or clang) too. +For Linux and MacOS, you also need to install gcc 4.9+ (or clang) too. Bindings are pre-generated for x86_64/arm64 Linux. For other platforms, bindings are generated at compile time. diff --git a/scripts/generate-bindings.sh b/scripts/generate-bindings.sh index 462b2fa..81a043e 100755 --- a/scripts/generate-bindings.sh +++ b/scripts/generate-bindings.sh @@ -11,5 +11,5 @@ export UPDATE_BIND=1 cargo build -p grpcio-sys --target ${ARCH}-unknown-linux-gnu rustfmt grpc-sys/bindings/* if [ "$(uname -s)" == "Linux" ]; then - sed -i '/pub type .*= ::std::os::raw::.*/d' grpc-sys/bindings/* + sed -i '/^pub type .*= ::std::os::raw::.*/d' grpc-sys/bindings/* fi @@ -427,7 +427,7 @@ impl bytes::Buf for GrpcByteBufferReader { self.remain } - fn bytes(&self) -> &[u8] { + fn chunk(&self) -> &[u8] { // This is similar but not identical to `BuffRead::fill_buf`, since `self` // is not mutable, we can only return bytes up to the end of the current // slice. @@ -595,7 +595,7 @@ mod tests { let mut count = 100; while reader.remaining() > 0 { assert_eq!(remaining, reader.remaining()); - let bytes = Buf::bytes(&reader); + let bytes = Buf::chunk(&reader); bytes.iter().for_each(|b| assert_eq!(*b, len as u8)); let mut read = bytes.len(); // We don't have to advance by the whole amount we read. diff --git a/src/call/client.rs b/src/call/client.rs index 279e619..d1047bf 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -399,7 +399,7 @@ impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> { t.close_f = Some(close_f); } - if let Poll::Pending = Pin::new(t.close_f.as_mut().unwrap()).poll(cx)? { + if Pin::new(t.close_f.as_mut().unwrap()).poll(cx)?.is_pending() { // if call is finished, can return early here. call.check_alive()?; return Poll::Pending; diff --git a/src/call/mod.rs b/src/call/mod.rs index a06e003..7f1582f 100644 --- a/src/call/mod.rs +++ b/src/call/mod.rs @@ -33,9 +33,9 @@ impl From<i32> for RpcStatusCode { } } -impl Into<i32> for RpcStatusCode { - fn into(self) -> i32 { - self.0 +impl From<RpcStatusCode> for i32 { + fn from(code: RpcStatusCode) -> i32 { + code.0 } } @@ -533,7 +533,7 @@ impl StreamingBase { if !skip_finish_check { let mut finished = false; if let Some(close_f) = &mut self.close_f { - if let Poll::Ready(_) = Pin::new(close_f).poll(cx)? { + if Pin::new(close_f).poll(cx)?.is_ready() { // Don't return immediately, there may be pending data. finished = true; } diff --git a/src/call/server.rs b/src/call/server.rs index 875555e..0fed656 100644 --- a/src/call/server.rs +++ b/src/call/server.rs @@ -3,6 +3,7 @@ use std::ffi::CStr; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use std::{result, slice}; use crate::grpc_sys::{ @@ -30,8 +31,10 @@ use crate::server::{BoxHandler, RequestCallContext}; use crate::task::{BatchFuture, CallTag, Executor, Kicker}; use crate::CheckResult; +/// A time point that an rpc or operation should finished before it. +#[derive(Clone, Copy)] pub struct Deadline { - spec: gpr_timespec, + pub(crate) spec: gpr_timespec, } impl Deadline { @@ -44,12 +47,27 @@ impl Deadline { } } - pub fn exceeded(&self) -> bool { + /// Checks if the deadline is exceeded. + pub fn exceeded(self) -> bool { unsafe { let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME); grpc_sys::gpr_time_cmp(now, self.spec) >= 0 } } + + pub(crate) fn spec(self) -> gpr_timespec { + self.spec + } +} + +impl From<Duration> for Deadline { + /// Build a deadline from given duration. + /// + /// The deadline will be `now + duration`. + #[inline] + fn from(dur: Duration) -> Deadline { + Deadline::new(dur.into()) + } } /// Context for accepting a request. @@ -626,8 +644,8 @@ impl<'a> RpcContext<'a> { self.ctx.host() } - pub fn deadline(&self) -> &Deadline { - &self.deadline + pub fn deadline(&self) -> Deadline { + self.deadline } /// Get the initial metadata sent by client. diff --git a/src/channel.rs b/src/channel.rs index a33a4be..c8c67b1 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -4,12 +4,14 @@ use std::borrow::Cow; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::ffi::{CStr, CString}; +use std::future::Future; use std::sync::Arc; use std::time::Duration; use std::{cmp, i32, ptr}; -use crate::grpc_sys::{ - self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args, +use crate::{ + grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args}, + Deadline, }; use libc::{self, c_char, c_int}; @@ -17,6 +19,7 @@ use crate::call::{Call, Method}; use crate::cq::CompletionQueue; use crate::env::Environment; use crate::error::Result; +use crate::task::CallTag; use crate::task::Kicker; use crate::CallOption; use crate::ResourceQuota; @@ -28,7 +31,7 @@ pub use crate::grpc_sys::{ /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents fn format_user_agent_string(agent: &str) -> CString { - let version = "0.7.1"; + let version = "0.8.2"; let trimed_agent = agent.trim(); let val = if trimed_agent.is_empty() { format!("grpc-rust/{}", version) @@ -589,12 +592,66 @@ impl Channel { } } - // If try_to_connect is true, the channel will try to establish a connection, potentially - // changing the state. + /// If try_to_connect is true, the channel will try to establish a connection, potentially + /// changing the state. pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState { self.inner.check_connectivity_state(try_to_connect) } + /// Blocking wait for channel state change or deadline expiration. + /// + /// `check_connectivity_state` needs to be called to get the current state. Returns false + /// means deadline excceeds before observing any state changes. + pub fn wait_for_state_change( + &self, + last_observed: ConnectivityState, + deadline: impl Into<Deadline>, + ) -> impl Future<Output = bool> { + let (cq_f, prom) = CallTag::action_pair(); + let prom_box = Box::new(prom); + let tag = Box::into_raw(prom_box); + let should_wait = if let Ok(cq_ref) = self.cq.borrow() { + unsafe { + grpcio_sys::grpc_channel_watch_connectivity_state( + self.inner.channel, + last_observed, + deadline.into().spec(), + cq_ref.as_ptr(), + tag as *mut _, + ) + } + true + } else { + // It's already shutdown. + false + }; + async move { should_wait && cq_f.await.unwrap() } + } + + /// Wait for this channel to be connected. + /// + /// Returns false means deadline excceeds before connection is connected. + pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool { + // Fast path, it's probably connected. + let mut state = self.check_connectivity_state(true); + if ConnectivityState::GRPC_CHANNEL_READY == state { + return true; + } + let deadline = deadline.into(); + loop { + if self.wait_for_state_change(state, deadline).await { + state = self.check_connectivity_state(true); + match state { + ConnectivityState::GRPC_CHANNEL_READY => return true, + ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false, + _ => (), + } + continue; + } + return false; + } + } + /// Create a Kicker. pub(crate) fn create_kicker(&self) -> Result<Kicker> { let cq_ref = self.cq.borrow()?; @@ -36,35 +36,39 @@ impl CompletionQueueHandle { } fn add_ref(&self) -> Result<()> { + let mut cnt = self.ref_cnt.load(Ordering::SeqCst); loop { - let cnt = self.ref_cnt.load(Ordering::SeqCst); if cnt <= 0 { // `shutdown` has been called, reject any requests. return Err(Error::QueueShutdown); } let new_cnt = cnt + 1; - if cnt - == self - .ref_cnt - .compare_and_swap(cnt, new_cnt, Ordering::SeqCst) - { - return Ok(()); + match self.ref_cnt.compare_exchange_weak( + cnt, + new_cnt, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return Ok(()), + Err(c) => cnt = c, } } } fn unref(&self) { + let mut cnt = self.ref_cnt.load(Ordering::SeqCst); let shutdown = loop { - let cnt = self.ref_cnt.load(Ordering::SeqCst); // If `shutdown` is not called, `cnt` > 0, so minus 1 to unref. // If `shutdown` is called, `cnt` < 0, so plus 1 to unref. let new_cnt = cnt - cnt.signum(); - if cnt - == self - .ref_cnt - .compare_and_swap(cnt, new_cnt, Ordering::SeqCst) - { - break new_cnt == 0; + match self.ref_cnt.compare_exchange_weak( + cnt, + new_cnt, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => break new_cnt == 0, + Err(c) => cnt = c, } }; if shutdown { @@ -75,8 +79,8 @@ impl CompletionQueueHandle { } fn shutdown(&self) { + let mut cnt = self.ref_cnt.load(Ordering::SeqCst); let shutdown = loop { - let cnt = self.ref_cnt.load(Ordering::SeqCst); if cnt <= 0 { // `shutdown` is called, skipped. return; @@ -85,12 +89,14 @@ impl CompletionQueueHandle { // Because `cnt` is initialized to 1, so minus 1 to make it reach // toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`. let new_cnt = -cnt + 1; - if cnt - == self - .ref_cnt - .compare_and_swap(cnt, new_cnt, Ordering::SeqCst) - { - break new_cnt == 0; + match self.ref_cnt.compare_exchange_weak( + cnt, + new_cnt, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => break new_cnt == 0, + Err(c) => cnt = c, } }; if shutdown { diff --git a/src/metadata.rs b/src/metadata.rs index 746b593..893f6e2 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -16,11 +16,11 @@ fn normalize_key(key: &str, binary: bool) -> Result<Cow<'_, str>> { let mut is_upper_case = false; for b in key.as_bytes() { let b = *b; - if b >= b'A' && b <= b'Z' { + if (b'A'..=b'Z').contains(&b) { is_upper_case = true; continue; - } else if b >= b'a' && b <= b'z' - || b >= b'0' && b <= b'9' + } else if (b'a'..=b'z').contains(&b) + || (b'0'..=b'9').contains(&b) || b == b'_' || b == b'-' || b == b'.' @@ -83,10 +83,10 @@ impl MetadataBuilder { } } let key = normalize_key(key, false)?; - self.add_metadata(&key, value.as_bytes()) + Ok(self.add_metadata(&key, value.as_bytes())) } - fn add_metadata(&mut self, key: &str, value: &[u8]) -> Result<&mut MetadataBuilder> { + fn add_metadata(&mut self, key: &str, value: &[u8]) -> &mut MetadataBuilder { unsafe { grpc_sys::grpcwrap_metadata_array_add( &mut self.arr.0, @@ -96,7 +96,7 @@ impl MetadataBuilder { value.len(), ) } - Ok(self) + self } /// Add a metadata holding a binary value. @@ -104,7 +104,7 @@ impl MetadataBuilder { /// `key` needs to have suffix (-bin) indicating a binary valued metadata entry. pub fn add_bytes(&mut self, key: &str, value: &[u8]) -> Result<&mut MetadataBuilder> { let key = normalize_key(key, true)?; - self.add_metadata(&key, value) + Ok(self.add_metadata(&key, value)) } /// Create `Metadata` with configured entries. @@ -221,7 +221,7 @@ impl Clone for Metadata { let mut builder = MetadataBuilder::with_capacity(self.len()); for (k, v) in self.iter() { // use `add_metadata` to skip validation. - builder.add_metadata(k, v).unwrap(); + builder.add_metadata(k, v); } builder.build() } diff --git a/src/server.rs b/src/server.rs index 0f01690..a612b13 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use crate::grpc_sys::{self, grpc_call_error, grpc_server}; use futures::future::Future; +use futures::ready; use futures::task::{Context, Poll}; use crate::call::server::*; @@ -523,14 +524,19 @@ pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) { /// A `Future` that will resolve when shutdown completes. pub struct ShutdownFuture { - cq_f: CqFuture<()>, + /// `true` means the future finishes successfully. + cq_f: CqFuture<bool>, } impl Future for ShutdownFuture { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - Pin::new(&mut self.cq_f).poll(cx) + match ready!(Pin::new(&mut self.cq_f).poll(cx)) { + Ok(true) => Poll::Ready(Ok(())), + Ok(false) => Poll::Ready(Err(Error::ShutdownFailed)), + Err(e) => unreachable!("action future should never resolve to error: {}", e), + } } } @@ -549,7 +555,7 @@ pub struct Server { impl Server { /// Shutdown the server asynchronously. pub fn shutdown(&mut self) -> ShutdownFuture { - let (cq_f, prom) = CallTag::shutdown_pair(); + let (cq_f, prom) = CallTag::action_pair(); let prom_box = Box::new(prom); let tag = Box::into_raw(prom_box); unsafe { diff --git a/src/task/mod.rs b/src/task/mod.rs index f151d0e..53369f1 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -14,7 +14,7 @@ use parking_lot::Mutex; use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback}; use self::executor::SpawnTask; -use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise}; +use self::promise::{Action as ActionPromise, Batch as BatchPromise}; use crate::call::server::RequestContext; use crate::call::{BatchContext, Call, MessageReader}; use crate::cq::CompletionQueue; @@ -113,7 +113,7 @@ pub enum CallTag { Request(RequestCallback), UnaryRequest(UnaryRequestCallback), Abort(Abort), - Shutdown(ShutdownPromise), + Action(ActionPromise), Spawn(Arc<SpawnTask>), } @@ -131,11 +131,12 @@ impl CallTag { CallTag::Request(RequestCallback::new(ctx)) } - /// Generate a Future/CallTag pair for shutdown call. - pub fn shutdown_pair() -> (CqFuture<()>, CallTag) { + /// Generate a Future/CallTag pair for action call that only cares if the result is + /// successful. + pub fn action_pair() -> (CqFuture<bool>, CallTag) { let inner = new_inner(); - let shutdown = ShutdownPromise::new(inner.clone()); - (CqFuture::new(inner), CallTag::Shutdown(shutdown)) + let action = ActionPromise::new(inner.clone()); + (CqFuture::new(inner), CallTag::Action(action)) } /// Generate a CallTag for abort call before handler is called. @@ -175,7 +176,7 @@ impl CallTag { CallTag::Request(cb) => cb.resolve(cq, success), CallTag::UnaryRequest(cb) => cb.resolve(cq, success), CallTag::Abort(_) => {} - CallTag::Shutdown(prom) => prom.resolve(success), + CallTag::Action(prom) => prom.resolve(success), CallTag::Spawn(notify) => self::executor::resolve(notify, success), } } @@ -188,7 +189,7 @@ impl Debug for CallTag { CallTag::Request(_) => write!(f, "CallTag::Request(..)"), CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"), CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"), - CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"), + CallTag::Action(_) => write!(f, "CallTag::Action"), CallTag::Spawn(_) => write!(f, "CallTag::Spawn"), } } @@ -208,8 +209,8 @@ mod tests { fn test_resolve() { let env = Environment::new(1); - let (cq_f1, tag1) = CallTag::shutdown_pair(); - let (cq_f2, tag2) = CallTag::shutdown_pair(); + let (cq_f1, tag1) = CallTag::action_pair(); + let (cq_f2, tag2) = CallTag::action_pair(); let (tx, rx) = mpsc::channel(); let handler = thread::spawn(move || { @@ -224,8 +225,8 @@ mod tests { assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); tag2.resolve(&env.pick_cq(), false); match rx.recv() { - Ok(Err(Error::ShutdownFailed)) => {} - res => panic!("expect shutdown failed, but got {:?}", res), + Ok(Ok(false)) => {} + res => panic!("expect Ok(false), but got {:?}", res), } handler.join().unwrap(); diff --git a/src/task/promise.rs b/src/task/promise.rs index 02e9419..50add06 100644 --- a/src/task/promise.rs +++ b/src/task/promise.rs @@ -104,24 +104,22 @@ impl Debug for Batch { } } -/// A promise used to resolve async shutdown result. -pub struct Shutdown { - inner: Arc<Inner<()>>, +/// A promise used to resolve async action status. +/// +/// The action can only succeed or fail without extra error hint. +pub struct Action { + inner: Arc<Inner<bool>>, } -impl Shutdown { - pub fn new(inner: Arc<Inner<()>>) -> Shutdown { - Shutdown { inner } +impl Action { + pub fn new(inner: Arc<Inner<bool>>) -> Action { + Action { inner } } pub fn resolve(self, success: bool) { let task = { let mut guard = self.inner.lock(); - if success { - guard.set_result(Ok(())) - } else { - guard.set_result(Err(Error::ShutdownFailed)) - } + guard.set_result(Ok(success)) }; task.map(|t| t.wake()); } |