aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2020-10-14 21:33:57 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2020-10-14 21:33:57 +0000
commit29ef403c0e0673d871648f50226ec698b9aa2d6c (patch)
tree5647c69210cf8d9a287ac689476f45513371aefc
parent352363d43b57e1a7c239098b358919d113aeb81a (diff)
parentda0e42b1ff3b87738a3e2594a368fba1047a918c (diff)
downloadgrpcio-29ef403c0e0673d871648f50226ec698b9aa2d6c.tar.gz
Import grpcio 0.6.0 am: 761577d44d am: 57e1399659 am: da0e42b1ff
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/grpcio/+/1460423 Change-Id: I962909175e39412b3c8529df879d50a975ec07b9
-rw-r--r--.cargo_vcs_info.json5
-rw-r--r--.clang-format11
-rw-r--r--.clang-tidy6
-rw-r--r--.github/ISSUE_TEMPLATE/bug_report.md26
-rw-r--r--.github/ISSUE_TEMPLATE/feature_request.md16
-rw-r--r--.github/ISSUE_TEMPLATE/something-else.md6
-rw-r--r--.github/PULL_REQUEST_TEMPLATE/bug_fix.md21
-rw-r--r--.github/PULL_REQUEST_TEMPLATE/feature.md24
-rw-r--r--.github/PULL_REQUEST_TEMPLATE/something-else.md7
-rw-r--r--.github/workflows/ci.yml119
-rw-r--r--.gitignore7
-rw-r--r--.gitmodules4
-rw-r--r--.travis.yml83
-rw-r--r--CHANGELOG.md153
-rw-r--r--CODE_OF_CONDUCT.md46
-rw-r--r--Cargo.toml67
-rw-r--r--Cargo.toml.orig49
-rw-r--r--LICENSE201
-rw-r--r--METADATA19
-rw-r--r--MODULE_LICENSE_APACHE20
-rw-r--r--OWNERS1
-rw-r--r--README.md146
-rw-r--r--cross_compile.md68
-rwxr-xr-xscripts/format-grpc-sys2
-rwxr-xr-xscripts/generate-bindings.sh15
-rwxr-xr-xscripts/lint-grpc-sys.sh2
-rwxr-xr-xscripts/reset-submodule.cmd8
-rw-r--r--src/auth_context.rs139
-rw-r--r--src/buf.rs573
-rw-r--r--src/call/client.rs565
-rw-r--r--src/call/mod.rs686
-rw-r--r--src/call/server.rs766
-rw-r--r--src/channel.rs622
-rw-r--r--src/client.rs100
-rw-r--r--src/codec.rs67
-rw-r--r--src/cq.rs216
-rw-r--r--src/env.rs174
-rw-r--r--src/error.rs92
-rw-r--r--src/lib.rs79
-rw-r--r--src/log_util.rs57
-rw-r--r--src/metadata.rs322
-rw-r--r--src/quota.rs49
-rw-r--r--src/security/credentials.rs368
-rw-r--r--src/security/mod.rs10
-rw-r--r--src/server.rs607
-rw-r--r--src/task/callback.rs84
-rw-r--r--src/task/executor.rs256
-rw-r--r--src/task/mod.rs233
-rw-r--r--src/task/promise.rs128
49 files changed, 7305 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..90abfbc
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+ "git": {
+ "sha1": "19a53b5cc9612511dabb365dae05286e807668e5"
+ }
+}
diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..b641a64
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,11 @@
+---
+Language: Cpp
+BasedOnStyle: Google
+DerivePointerAlignment: false
+PointerAlignment: Left
+---
+Language: ObjC
+BasedOnStyle: Google
+ColumnLimit: 100
+ObjCBlockIndentWidth: 2
+...
diff --git a/.clang-tidy b/.clang-tidy
new file mode 100644
index 0000000..d217441
--- /dev/null
+++ b/.clang-tidy
@@ -0,0 +1,6 @@
+---
+Checks: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*'
+WarningsAsErrors: 'modernize-use-nullptr,google-build-namespaces,google-build-explicit-make-pair,readability-function-size,performance-*,bugprone-*'
+CheckOptions:
+ - key: readability-function-size.StatementThreshold
+ value: '450'
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
new file mode 100644
index 0000000..433fb9e
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/bug_report.md
@@ -0,0 +1,26 @@
+---
+name: Bug report
+about: Found a bug? Help us squash it!
+---
+
+**Describe the bug**
+A clear and concise description of what the bug is.
+
+**To Reproduce**
+Steps to reproduce the behavior:
+1. Go to '...'
+2. Click on '....'
+3. Scroll down to '....'
+4. See error
+
+**Expected behavior**
+A clear and concise description of what you expected to happen.
+
+**System information**
+* CPU architecture:
+* Distribution and kernel version:
+* SELinux on?:
+* Any other system details we should know?:
+
+**Additional context**
+Add any other context about the problem here.
diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md
new file mode 100644
index 0000000..8a84863
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/feature_request.md
@@ -0,0 +1,16 @@
+---
+name: Feature request
+about: Help us develop our roadmap and direction.
+---
+
+**Is your feature request related to a problem? Please describe.**
+A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
+
+**Describe the solution you'd like**
+A clear and concise description of what you want to happen.
+
+**Describe alternatives you've considered**
+A clear and concise description of any alternative solutions or features you've considered.
+
+**Additional context**
+Add any other context or screenshots about the feature request here.
diff --git a/.github/ISSUE_TEMPLATE/something-else.md b/.github/ISSUE_TEMPLATE/something-else.md
new file mode 100644
index 0000000..7ce0a34
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/something-else.md
@@ -0,0 +1,6 @@
+---
+name: Something Else
+about: Just give me a text box!
+---
+
+
diff --git a/.github/PULL_REQUEST_TEMPLATE/bug_fix.md b/.github/PULL_REQUEST_TEMPLATE/bug_fix.md
new file mode 100644
index 0000000..a24a4c2
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE/bug_fix.md
@@ -0,0 +1,21 @@
+---
+name: Bug fix
+about: A bug squashed.
+
+---
+
+**Related bugs:**
+This bug fix closes issue #???.
+
+**Description of problem:**
+Describe what was causing the related issue to happen.
+
+**Description of solution:**
+Describe the rationale behind the fix.
+
+**Checklist:**
+The CI will check all of these, but you'll need to have done them:
+
+* [ ] `cargo fmt -- --check` passes.
+* [ ] `cargo +nightly clippy` has no warnings.
+* [ ] `cargo test` passes.
diff --git a/.github/PULL_REQUEST_TEMPLATE/feature.md b/.github/PULL_REQUEST_TEMPLATE/feature.md
new file mode 100644
index 0000000..e43e06a
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE/feature.md
@@ -0,0 +1,24 @@
+---
+name: Feature
+about: The dawn of a new era.
+
+---
+
+**Related features:**
+This feature resolves issue #???.
+
+**Description of feature:**
+A short description of the feature implemented.
+
+**Implementation:**
+Describe any pieces of the implementation that deserve further explanation.
+Detail any gotchas, uncertainties, or open questions about the implementation.
+
+**Checklist:**
+
+The CI will check all of these, but you'll need to have done them:
+
+* [ ] `cargo fmt -- --check` passes.
+* [ ] `cargo +nightly clippy` has no warnings.
+* [ ] `cargo test` passes.
+
diff --git a/.github/PULL_REQUEST_TEMPLATE/something-else.md b/.github/PULL_REQUEST_TEMPLATE/something-else.md
new file mode 100644
index 0000000..afaab25
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE/something-else.md
@@ -0,0 +1,7 @@
+---
+name: Something Else
+about: Just give me a text box!
+
+---
+
+
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..e1faa0b
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,119 @@
+name: CI
+
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+ schedule:
+ - cron: '0 22 * * *'
+
+env:
+ RUST_BACKTRACE: 1
+ RUSTFLAGS: "--deny=warnings"
+ GRPC_VERSION: 1.29.1
+
+jobs:
+ Linux-Format-PKG-Test:
+ name: Linux-Format-PKG-Test
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: sudo apt-get install -y clang-tidy-9
+ - run: sudo update-alternatives --install /usr/bin/clang-tidy clang-tidy /usr/bin/clang-tidy-9 100
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: scripts/reset-submodule.cmd
+ - run: cargo fmt --all -- --check
+ - run: cargo clippy --all -- -D clippy::all && cargo clippy --all --no-default-features --features prost-codec -- -D clippy::all
+ - run: scripts/lint-grpc-sys.sh && git diff-index --quiet HEAD
+ - run: git clone -b v$GRPC_VERSION https://github.com/grpc/grpc
+ - run: cd grpc && git submodule update --init && sudo make install_c
+ - run: GRPCIO_SYS_USE_PKG_CONFIG=1 RUSTFLAGS="-A unused-attributes" cargo test --all
+
+ Linux-Stable:
+ name: Linux-Stable
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: scripts/reset-submodule.cmd
+ - run: scripts/generate-bindings.sh && git diff --exit-code HEAD
+ - run: cargo build --no-default-features
+ - run: cargo build --no-default-features --features protobuf-codec
+ - run: cargo build --no-default-features --features prost-codec
+ - run: cargo build
+ - run: cargo test --all
+
+ Linux-Stable-openssl:
+ name: Linux-Stable-openssl
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: scripts/reset-submodule.cmd
+ - run: cargo test --features "openssl-vendored" --all
+ - run: cargo clean
+ - run: cargo test --features "openssl" --all
+
+ Linux-Nightly:
+ name: Linux-Nightly
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: rustup default nightly
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: scripts/reset-submodule.cmd
+ - run: cargo build --no-default-features
+ - run: cargo build --no-default-features --features protobuf-codec
+ - run: cargo build --no-default-features --features prost-codec
+ - run: cargo build
+ - run: cargo test --all
+ - run: RUSTFLAGS="-Z sanitizer=address" cargo test --all --target x86_64-unknown-linux-gnu
+
+ Mac:
+ name: Mac
+ runs-on: macos-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: scripts/reset-submodule.cmd
+ - run: cargo build --no-default-features
+ - run: cargo build --no-default-features --features protobuf-codec
+ - run: cargo build --no-default-features --features prost-codec
+ - run: cargo build
+ - run: cargo test --all
+
+ Mac-openssl:
+ name: Mac-openssl
+ runs-on: macos-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: brew update && brew upgrade openssl@1.1
+ - run: which go && go version && which cargo && cargo version && clang --version && openssl version
+ - run: scripts/reset-submodule.cmd
+ - run: OPENSSL_ROOT_DIR="/usr/local/opt/openssl@1.1/" cargo test --features "openssl" --all
+ - run: cargo test --features "openssl-vendored" --all
+
+ Win:
+ name: Windows
+ runs-on: windows-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: choco install -y llvm
+ - run: refreshenv
+ - run: go version ; cargo version ; cmake --version
+ - run: scripts/reset-submodule.cmd
+ - run: cargo build
+ - run: cargo test --all
+
+ Pre-Release:
+ name: Pre-Release
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - run: scripts/reset-submodule.cmd
+ - run: cd grpc-sys && cargo publish --dry-run
+ - name: Check generated package size
+ run: |
+ ls -alh target/package/grpcio-sys-*.crate
+ test `cat target/package/grpcio-sys-*.crate | wc -c` -le 10485760
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4629210
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+target
+Cargo.lock
+*.rs.bk
+*.rs.fmt
+.vscode
+.idea
+*.o
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..3ed0061
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,4 @@
+[submodule "grpc-sys/grpc"]
+ path = grpc-sys/grpc
+ url = https://github.com/pingcap/grpc.git
+ branch = rs-release
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..0974f0c
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,83 @@
+# Travis is only used to test ARM64 Linux
+
+dist: xenial
+sudo: false
+language: rust
+git:
+ submodules: false
+
+rust: stable
+
+env:
+ global:
+ - RUST_BACKTRACE=1
+ - RUSTFLAGS="--deny=warnings"
+
+install:
+ - if ! which go 2>/dev/null; then
+ wget https://dl.google.com/go/go1.13.8.linux-arm64.tar.gz;
+ tar -xf go1.13.8.linux-arm64.tar.gz -C $HOME;
+ export PATH="$HOME/go/bin:$PATH";
+ export GO_ROOT=$HOME/go;
+ fi
+
+addons:
+ apt:
+ update: true
+ packages:
+ - libunwind-dev
+
+jobs:
+ include:
+ - os: linux
+ arch: arm64
+ before_script:
+ - scripts/reset-submodule.cmd
+ - export GRPC_VERSION=1.29.1
+ - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin"
+ - GRPC_HEADER="$HOME/.cache/include/grpc/grpc.h"
+ - if [[ ! -f "$GRPC_HEADER" ]] ; then
+ (
+ git clone -b v$GRPC_VERSION https://github.com/grpc/grpc &&
+ cd grpc &&
+ git submodule update --init &&
+ env prefix=$HOME/.cache make install_c
+ );
+ fi
+ - eval "$(gimme stable)"
+ - export CPLUS_INCLUDE_PATH="$HOME/.cache/include"
+ - export LD_LIBRARY_PATH="$HOME/.cache/lib"
+ - export DYLD_LIBRARY_PATH="$HOME/.cache/lib"
+ - export LIBRARY_PATH="$HOME/.cache/lib"
+ - export PKG_CONFIG_PATH="$HOME/.cache/lib/pkgconfig"
+ script:
+ - which go && go version
+ - GRPCIO_SYS_USE_PKG_CONFIG=1 RUSTFLAGS="-A unused-attributes" cargo test --all
+
+ - os: linux
+ arch: arm64
+ before_script:
+ - scripts/reset-submodule.cmd
+ - export GRPC_VERSION=1.29.1
+ - export PATH="$PATH:$HOME/.cache/bin:$HOME/.cargo/bin"
+ - GRPC_HEADER="$HOME/.cache/include/grpc/grpc.h"
+ - sudo apt-get update && sudo apt-get -y install libssl-dev;
+ - eval "$(gimme stable)"
+ - export CPLUS_INCLUDE_PATH="$HOME/.cache/include"
+ - export LD_LIBRARY_PATH="$HOME/.cache/lib"
+ - export DYLD_LIBRARY_PATH="$HOME/.cache/lib"
+ - export LIBRARY_PATH="$HOME/.cache/lib"
+ - export PKG_CONFIG_PATH="$HOME/.cache/lib/pkgconfig"
+ script:
+ - which go && go version
+ - if [[ $TRAVIS_OS_NAME == "linux" ]] && [[ $TRAVIS_RUST_VERSION == "stable" ]]; then
+ rustup component add rustfmt && cargo fmt --all -- --check;
+ scripts/generate-bindings.sh && git diff --exit-code HEAD;
+ fi
+ - cargo build --no-default-features
+ - cargo build --no-default-features --features protobuf-codec
+ - cargo build --no-default-features --features prost-codec
+ - cargo build
+ - cargo test --all
+ - cargo test --features "openssl" --all
+ - cargo test --features "openssl-vendored" --all \ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..d4c0b52
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,153 @@
+# 0.6.0 - 2020-06-12
+
+- Switch to std::future (#447)
+- Update gRPC C core to 1.29.1 (#466)
+- Change spinlock to parking_lot::Mutex (#468)
+
+# 0.5.3 - 2020-05-07
+
+- Switch to github action and update badge (#459)
+- Enable ALPN by default (#456)
+
+# grpcio-sys 0.5.2 - 2020-03-31
+
+- Downgrade bindgen version to be backward compatible. (#452)
+
+# 0.5.1 - 2020-03-30
+
+- Clarify load balancing status (#445)
+- Support unix domain socket (#446)
+- Build: fix rebuild rules for no prebuilt bindings (#450)
+
+# 0.5.0 - 2020-03-16
+
+- Make `build_args` and `channel_args` public (#405)
+- Reclaim buffer memory after sending message (#407)
+- Support ppcle64 (#410)
+- Use libz-sys instead of bundle one (#420)
+- Update gRPC c core to v1.26.0 (#425)
+- Support Authentication (#322)
+- Update `Error` trait to latest version (#428)
+- Update serveral outdated dependencies (#426)
+- Better display and debug implement for status code and errors (#433, #437)
+- Generate bindings for aarch64 target (#430)
+- Support reloading certificates online (440)
+
+# grpcio-compiler 0.5.0-alpha.6 - 2019-11-13
+
+- Fix clippy warnings (#403)
+
+# 0.5.0-alpha.5 - 2019-11-05
+
+- Fix segment fault under race contention (#367)
+- grpcio-compiler: remove protobuf-codegen dependency (#372)
+- Add resource quota support (#377)
+- Make metadata send (#363)
+- Fix openssl link failure on Mac OS (#387)
+- Fix compilation failure for latest gcc (#386)
+- Fix deadlock when spawn multiple linked futures in the same queue (#395)
+
+# 0.5.0-alpha.4 - 2019-08-12
+
+- Make proto compile on Windows
+- Make status code readable
+- Remove clang requirement on x86_64 Linux
+
+# 0.5.0-alpha.3 - 2019-07-24
+
+- Fix circle dependencies to get round several cargo bugs
+- Fix generating bindgen failure
+
+# 0.5.0-alpha.2 - 2019-07-18
+
+- Support using vendored openssl
+- Use bindgen to generate code instead
+
+# 0.5.0-alpha.1 - 2019-04-08
+
+- Fix grpc_sys import when secure feature is disabled
+
+# 0.5.0-alpha - 2019-04-03
+
+- Support Prost
+- Zero copy for receiving
+- Support GrpcConnectivityState
+- Upgrade to Rust 2018 edition
+
+# 0.4.4 - 2019-02-15
+
+- Support cross-compile for iOS and Android targets
+- Support ipv6 host
+
+# 0.4.3 - 2019-01-21
+
+- Remove tilde requirements `~2.0` of protobuf
+
+# 0.4.2 - 2019-01-07
+
+- Update gRPC from 1.14.2 to 1.17.2
+
+# 0.4.1 - 2018-11-15
+
+- `Client` now is clonable
+- Allow '.'s when validate metadata key
+- Fix call validation issue when connection is closed
+- Optionally use openssl instead of boring ssl
+
+# 0.4.0 - 2018-09-15
+
+- Update gRPC from 1.7.2 to 1.14.2
+- Services accept mut reference
+- Cancel RPC when senders and receivers were dropped
+- Notify completion queue via call
+
+# 0.3.1 - 2018-08-27
+
+- Support configuring load balancing policy
+- Fix compilation failure when go is missing
+- Fix compilation issue under musl
+- Fix soundness of service handler
+
+# 0.3.0 - 2018-06-01
+
+- keep compatible with protobuf 2.0
+- enable secure feature by default
+- fix potential overflow in channel args
+
+# 0.2.3 - 2018-04-27
+
+- support querying client address
+
+# 0.2.2 - 2018-04-04
+
+- use a different lock for notify to avoid deadlock
+
+# 0.2.1 - 2018-02-23
+
+- support ping configuration
+- make `CallOptions` clonable
+- support google default credentials
+- fix link error on Windows
+- support request header
+
+# 0.2.0 - 2017-12-19
+
+- update gRPC from 1.6.1 to 1.7.2
+- separate secure/unsecure features
+- fix compilation error on OS X and Win32
+- publish gRPC built-in protos
+
+# 0.1.2 - 2017-09-22
+
+- use environment variable to control linking
+- clear buffer hint when sending metadata
+
+# 0.1.1 - 2017-09-21
+
+- upgrade gRPC from 1.4.0 to 1.6.1
+- support more channel args
+- support log
+
+# 0.1.0 - 2017-07-27
+
+initial release
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..63f400a
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,46 @@
+# Contributor Covenant Code of Conduct
+
+## Our Pledge
+
+In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation.
+
+## Our Standards
+
+Examples of behavior that contributes to creating a positive environment include:
+
+* Using welcoming and inclusive language
+* Being respectful of differing viewpoints and experiences
+* Gracefully accepting constructive criticism
+* Focusing on what is best for the community
+* Showing empathy towards other community members
+
+Examples of unacceptable behavior by participants include:
+
+* The use of sexualized language or imagery and unwelcome sexual attention or advances
+* Trolling, insulting/derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or electronic address, without explicit permission
+* Other conduct which could reasonably be considered inappropriate in a professional setting
+
+## Our Responsibilities
+
+Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
+
+Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
+
+## Scope
+
+This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at coc@pingcap.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.
+
+Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
+
+[homepage]: http://contributor-covenant.org
+[version]: http://contributor-covenant.org/version/1/4/
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..75bf8c1
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,67 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "grpcio"
+version = "0.6.0"
+authors = ["The TiKV Project Developers"]
+autoexamples = false
+description = "The rust language implementation of gRPC, base on the gRPC c core library."
+homepage = "https://github.com/tikv/grpc-rs"
+documentation = "https://docs.rs/grpcio"
+readme = "README.md"
+keywords = ["grpc", "protobuf", "rpc", "tls", "http2"]
+categories = ["asynchronous", "network-programming"]
+license = "Apache-2.0"
+repository = "https://github.com/tikv/grpc-rs"
+[package.metadata.docs.rs]
+all-features = true
+[profile.release]
+debug = true
+[dependencies.bytes]
+version = "0.5"
+optional = true
+
+[dependencies.futures]
+version = "0.3"
+
+[dependencies.grpcio-sys]
+version = "0.6.0"
+
+[dependencies.libc]
+version = "0.2"
+
+[dependencies.log]
+version = "0.4"
+
+[dependencies.parking_lot]
+version = "0.10"
+
+[dependencies.prost]
+version = "0.6"
+optional = true
+
+[dependencies.protobuf]
+version = "2.0"
+optional = true
+
+[features]
+default = ["protobuf-codec", "secure"]
+no-omit-frame-pointer = ["grpcio-sys/no-omit-frame-pointer"]
+openssl = ["secure", "grpcio-sys/openssl"]
+openssl-vendored = ["secure", "grpcio-sys/openssl-vendored"]
+prost-codec = ["prost", "bytes"]
+protobuf-codec = ["protobuf"]
+secure = ["grpcio-sys/secure"]
+[badges.travis-ci]
+repository = "tikv/grpc-rs"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..7775f30
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,49 @@
+[package]
+name = "grpcio"
+version = "0.6.0"
+edition = "2018"
+authors = ["The TiKV Project Developers"]
+license = "Apache-2.0"
+keywords = ["grpc", "protobuf", "rpc", "tls", "http2"]
+repository = "https://github.com/tikv/grpc-rs"
+readme = "README.md"
+homepage = "https://github.com/tikv/grpc-rs"
+documentation = "https://docs.rs/grpcio"
+description = "The rust language implementation of gRPC, base on the gRPC c core library."
+categories = ["asynchronous", "network-programming"]
+autoexamples = false
+
+[package.metadata.docs.rs]
+all-features = true
+
+[dependencies]
+grpcio-sys = { path = "grpc-sys", version = "0.6.0" }
+libc = "0.2"
+futures = "0.3"
+protobuf = { version = "2.0", optional = true }
+prost = { version = "0.6", optional = true }
+bytes = { version = "0.5", optional = true }
+log = "0.4"
+parking_lot = "0.10"
+
+[workspace]
+members = ["proto", "benchmark", "compiler", "interop", "tests-and-examples"]
+
+[features]
+default = ["protobuf-codec", "secure"]
+protobuf-codec = ["protobuf"]
+prost-codec = ["prost", "bytes"]
+secure = ["grpcio-sys/secure"]
+openssl = ["secure", "grpcio-sys/openssl"]
+openssl-vendored = ["secure", "grpcio-sys/openssl-vendored"]
+no-omit-frame-pointer = ["grpcio-sys/no-omit-frame-pointer"]
+
+[profile.release]
+debug = true
+
+[badges]
+travis-ci = { repository = "tikv/grpc-rs" }
+
+[patch.crates-io]
+grpcio-compiler = { path = "compiler", version = "0.6.0", default-features = false }
+protobuf-build = { git = "https://github.com/tikv/protobuf-build.git" }
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..513172a
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2019 TiKV Project Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..a82c991
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,19 @@
+name: "grpcio"
+description: "The rust language implementation of gRPC, base on the gRPC c core library."
+third_party {
+ url {
+ type: HOMEPAGE
+ value: "https://crates.io/crates/grpcio"
+ }
+ url {
+ type: ARCHIVE
+ value: "https://static.crates.io/crates/grpcio/grpcio-0.6.0.crate"
+ }
+ version: "0.6.0"
+ license_type: NOTICE
+ last_upgrade_date {
+ year: 2020
+ month: 10
+ day: 14
+ }
+}
diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_APACHE2
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..46fc303
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1 @@
+include platform/prebuilts/rust:/OWNERS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..03f4e19
--- /dev/null
+++ b/README.md
@@ -0,0 +1,146 @@
+# gRPC-rs
+
+`gRPC-rs` is a Rust wrapper of [gRPC Core](https://github.com/grpc/grpc). [gRPC](http://www.grpc.io) is a high performance, open source universal RPC framework that puts mobile and HTTP/2 first.
+
+[![Crates.io](https://img.shields.io/crates/v/grpcio.svg?maxAge=2592000)](https://crates.io/crates/grpcio)
+[![docs.rs](https://docs.rs/grpcio/badge.svg)](https://docs.rs/grpcio)
+[![Build Status](https://github.com/tikv/grpc-rs/workflows/CI/badge.svg)](https://github.com/tikv/grpc-rs/actions)
+[![Build Status](https://travis-ci.org/tikv/grpc-rs.svg)](https://travis-ci.org/tikv/grpc-rs)
+
+## Status
+
+This project is still under development. The following features with the check marks are supported:
+
+- [x] Basic asynchronous unary/steaming call
+- [x] SSL
+- [x] Generic call
+- [x] Connection level compression
+- [x] Interoperability test
+- [x] QPS benchmark
+- [ ] Custom metadata
+- [x] Health check
+- [ ] Reflection
+- [X] Authentication
+- [ ] Load balance, client side is fully supported, server side load report is not implemented yet.
+
+## Prerequisites
+
+- CMake >= 3.8.0
+- Rust >= 1.36.0
+- binutils >= 2.22
+- LLVM and Clang >= 3.9 if you need to generate bindings at compile time.
+- By default, the [secure feature](#feature-secure) is provided by boringssl. You can also use openssl instead by enabling [openssl feature](#feature-openssl).
+
+For Linux and MacOS, you also need to install gcc (or clang) too.
+
+Bindings are pre-generated for x86_64/arm64 Linux. For other platforms, bindings are generated at compile time.
+
+For Windows, you also need to install following software:
+
+- Active State Perl
+- yasm
+- Visual Studio 2015+
+
+## Build
+
+```
+$ ./scripts/reset-submodule.cmd # if you just cloned the repository
+$ cargo build
+```
+
+### Error linking OpenSSL
+
+If you're getting linker errors when building your project using `gRPC-rs`, head
+down to the `openssl` feature section for a possible fix.
+
+## Usage
+
+To generate the sources from proto files:
+
+### Option 1 - Manual Generation
+
+1. Install the protobuf compiler:
+
+```
+$ cargo install protobuf-codegen
+```
+
+2. Install the gRPC compiler:
+
+```
+$ cargo install grpcio-compiler
+```
+
+3. Generate the sources:
+
+```
+$ protoc --rust_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_rust_plugin` example.proto
+```
+
+
+### Option 2 - Programmatic Generation
+
+Programmatic generation can be used to generate Rust modules from proto files
+via your `build.rs` by using [protoc-grpcio](https://crates.io/crates/protoc-grpcio).
+
+For more information and examples see
+[README](https://github.com/mtp401/protoc-grpcio/blob/master/README.md).
+
+To include this project as a dependency:
+
+```
+[dependencies]
+grpcio = "0.6"
+```
+
+### Feature `secure`
+
+`secure` feature enables support for TLS encryption and some authentication
+mechanism. When you do not need it, for example when working in intranet,
+you can disable it by using the following configuration:
+```
+[dependencies]
+grpcio = { version = "0.6", default-features = false, features = ["protobuf-codec"] }
+```
+
+### Feature `prost-codec` and `protobuf-codec`
+
+`gRPC-rs` uses `protobuf` crate by default. If you want to use `prost` instead, you can enable
+`prost-codec` feature. You probably only want to enable only one of the two features. Though
+grpcio is completely fine with both features enabled at the same time, grpcio-compiler
+will not going to work as expected.
+
+### Feature `openssl` and `openssl-vendored`
+
+`gRPC-rs` comes vendored with `gRPC Core`, which by default uses BoringSSL
+instead of OpenSSL. This may cause linking issues due to symbol clashes and/or
+missing symbols when another one of your dependencies uses OpenSSL. To resolve
+this, you can tell `gRPC-rs` to use OpenSSL too by specifying `"openssl"` in
+your `Cargo.toml`'s features list for `gprcio`, which requires openssl (>=1.0.2). E.g.:
+
+```toml
+[dependencies]
+grpcio = { version = "0.6", features = ["openssl"] }
+```
+
+Feature `openssl-vendored` is the same as feature `openssl` except it will build openssl from
+bundled sources.
+
+## Performance
+
+See [benchmark](https://github.com/tikv/grpc-rs/tree/master/benchmark) to find out how to run a benchmark by yourself.
+
+Cross Compile
+-------------
+See [cross_compile](cross_compile.md)
+
+Contributing
+------------
+
+Make sure to format and test the code before sending a PR.
+
+If the content in grpc-sys/grpc is updated, you may need to regenerate bindings:
+
+```
+$ ./scripts/generate-bindings.sh
+```
diff --git a/cross_compile.md b/cross_compile.md
new file mode 100644
index 0000000..67b1642
--- /dev/null
+++ b/cross_compile.md
@@ -0,0 +1,68 @@
+# Cross Compile gRPC-rs(0.2.1) to Windows under *nix
+
+## First you need to install mingw
+
+```bash
+# macOS
+brew install mingw-w64
+
+# CentOS
+yum install mingw64-openssl-static mingw64-zlib-static mingw64-winpthreads-static
+```
+
+## Fix CMake
+
+```
+# modify grpc-rs/grpc-sys/build.rs
+# fix SYSTEM_PROCESSOR
+"CMAKE_SYSTEM_PROCESSOR", get_env("CARGO_CFG_TARGET_ARCH").unwrap()
+# fix try_run
+"CMAKE_CROSSCOMPILING", "true"
+```
+
+### All diff in `fn build_grpc`
+
+```rust
+ let dst = {
+ let mut config = Config::new("grpc");
+ if get_env("CARGO_CFG_TARGET_OS").map_or(false, |s| s == "macos") {
+ config.cxxflag("-stdlib=libc++");
+ }
+ config
+ .define("CMAKE_SYSTEM_PROCESSOR", get_env("CARGO_CFG_TARGET_ARCH").unwrap())
+ .define("CMAKE_CROSSCOMPILING", "true")
+ .build_target(library)
+ .uses_cxx11()
+ .build()
+ // config.build_target(library).uses_cxx11().build()
+ };
+```
+
+### Fix find zlib
+
+```rust
+ // try these values
+ let mut zlib = "z";
+ let mut zlib = "zlibstatic";
+ let mut zlib = "zlibstaticd";
+```
+
+## Fix WIN32 API
+
+```
+# grpc-rs/grpc-sys/grpc/CMakeLists.txt
+# add these code after about line number 295
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_WIN32_WINNT=0x600")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_WIN32_WINNT=0x600")
+set(C_CXX_FLAGS "${C_CXX_FLAGS} -D_WIN32_WINNT=0x600")
+```
+
+## Fix boringssl
+
+Just update third_party/boringssl
+
+```bash
+cd third_party/boringssl
+git checkout master
+git pull
+```
diff --git a/scripts/format-grpc-sys b/scripts/format-grpc-sys
new file mode 100755
index 0000000..d97445a
--- /dev/null
+++ b/scripts/format-grpc-sys
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+clang-format-5.0 -i grpc-sys/grpc_wrap.cc
diff --git a/scripts/generate-bindings.sh b/scripts/generate-bindings.sh
new file mode 100755
index 0000000..462b2fa
--- /dev/null
+++ b/scripts/generate-bindings.sh
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+# NOTE:
+# This script is only used when you want to generate bindings yourself.
+# The generated bindings will overwrite grpc-sys/bindings/*
+
+if [ "$ARCH" == "" ]; then
+ ARCH=`uname -p`
+fi
+export UPDATE_BIND=1
+cargo build -p grpcio-sys --target ${ARCH}-unknown-linux-gnu
+rustfmt grpc-sys/bindings/*
+if [ "$(uname -s)" == "Linux" ]; then
+ sed -i '/pub type .*= ::std::os::raw::.*/d' grpc-sys/bindings/*
+fi
diff --git a/scripts/lint-grpc-sys.sh b/scripts/lint-grpc-sys.sh
new file mode 100755
index 0000000..a3fc7e0
--- /dev/null
+++ b/scripts/lint-grpc-sys.sh
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+clang-tidy grpc-sys/grpc_wrap.cc -- -Igrpc-sys/grpc/include -x c++ -std=c++11
diff --git a/scripts/reset-submodule.cmd b/scripts/reset-submodule.cmd
new file mode 100755
index 0000000..f873a75
--- /dev/null
+++ b/scripts/reset-submodule.cmd
@@ -0,0 +1,8 @@
+git submodule update --init grpc-sys/grpc
+cd grpc-sys/grpc
+git submodule update --init third_party/boringssl-with-bazel
+git submodule update --init third_party/cares/cares
+git submodule update --init third_party/abseil-cpp
+cd third_party/zlib
+git clean -f
+git reset --hard
diff --git a/src/auth_context.rs b/src/auth_context.rs
new file mode 100644
index 0000000..2658336
--- /dev/null
+++ b/src/auth_context.rs
@@ -0,0 +1,139 @@
+//! API for authenticating peer
+//! Based on https://grpc.github.io/grpc/core/md_doc_server_side_auth.html
+
+use std::ffi::CStr;
+use std::marker::PhantomData;
+use std::ptr::NonNull;
+
+use crate::grpc_sys::{
+ self, grpc_auth_context, grpc_auth_property, grpc_auth_property_iterator, grpc_call,
+};
+
+/// To perform server-side authentication, gRPC exposes the authentication context
+/// for each call. The context exposes important authentication-related information
+/// about the RPC such as the type of security/authentication type being used and
+/// the peer identity.
+///
+/// The authentication context is structured as a multi-map of key-value pairs -
+/// the auth properties. In addition to that, for authenticated RPCs, the set of
+/// properties corresponding to a selected key will represent the verified identity
+/// of the caller - the peer identity.
+///
+/// The contents of the auth properties are populated by an auth interceptor within
+/// gRPC Core. The interceptor also chooses which property key will act as the peer
+/// identity (e.g. for client certificate authentication this property will be
+/// `x509_common_name` or `x509_subject_alternative_name`).
+pub struct AuthContext {
+ ctx: NonNull<grpc_auth_context>,
+}
+
+/// Binding to gRPC Core AuthContext
+impl AuthContext {
+ pub(crate) unsafe fn from_call_ptr(call: *mut grpc_call) -> Option<Self> {
+ NonNull::new(grpc_sys::grpc_call_auth_context(call)).map(|ctx| AuthContext { ctx })
+ }
+
+ /// The name of the property gRPC Core has chosen as main peer identity property,
+ /// if any.
+ pub fn peer_identity_property_name(&self) -> Option<&str> {
+ unsafe {
+ let p = grpc_sys::grpc_auth_context_peer_identity_property_name(self.ctx.as_ref());
+ if p.is_null() {
+ None
+ } else {
+ Some(CStr::from_ptr(p).to_str().expect("valid UTF-8 data"))
+ }
+ }
+ }
+
+ /// `true` if the client has provided a valid certificate (or other auth method
+ /// considered valid by gRPC).
+ /// `false` in non-secure scenarios.
+ pub fn peer_is_authenticated(&self) -> bool {
+ unsafe { grpc_sys::grpc_auth_context_peer_is_authenticated(self.ctx.as_ref()) != 0 }
+ }
+
+ /// `AuthContext[peer_identity_property_name()]`
+ ///
+ /// There may be several of them (for instance if `x509_subject_alternative_name` is selected)
+ pub fn peer_identity(&self) -> AuthPropertyIter {
+ unsafe {
+ // grpc_auth_context_peer_identity returns empty_iterator when self.ctx is NULL
+ let iter = grpc_sys::grpc_auth_context_peer_identity(self.ctx.as_ref());
+ AuthPropertyIter {
+ iter,
+ _lifetime: PhantomData,
+ }
+ }
+ }
+}
+
+impl<'a> IntoIterator for &'a AuthContext {
+ type Item = AuthProperty<'a>;
+ type IntoIter = AuthPropertyIter<'a>;
+
+ /// Iterate over the AuthContext properties
+ fn into_iter(self) -> Self::IntoIter {
+ unsafe {
+ // grpc_auth_context_property_iterator returns empty_iterator when self.ctx is NULL
+ let iter = grpc_sys::grpc_auth_context_property_iterator(self.ctx.as_ref());
+ AuthPropertyIter {
+ iter,
+ _lifetime: PhantomData,
+ }
+ }
+ }
+}
+
+impl Drop for AuthContext {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpc_auth_context_release(self.ctx.as_ptr()) }
+ }
+}
+
+pub struct AuthPropertyIter<'a> {
+ iter: grpc_auth_property_iterator,
+ _lifetime: PhantomData<&'a grpc_auth_property_iterator>,
+}
+
+impl<'a> Iterator for AuthPropertyIter<'a> {
+ type Item = AuthProperty<'a>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ // grpc_auth_property_iterator_next returns empty_iterator when self.iter is NULL
+ let prop = unsafe { grpc_sys::grpc_auth_property_iterator_next(&mut self.iter) };
+ if prop.is_null() {
+ None
+ } else {
+ Some(AuthProperty {
+ prop,
+ _lifetime: PhantomData,
+ })
+ }
+ }
+}
+
+/// Auth properties are elements of the AuthContext. They have a name
+/// (a key of type string) and a value which can be a string or binary data.
+pub struct AuthProperty<'a> {
+ prop: *const grpc_auth_property,
+ _lifetime: PhantomData<&'a grpc_auth_property>,
+}
+
+impl<'a> AuthProperty<'a> {
+ pub fn name(&self) -> &'a str {
+ unsafe { CStr::from_ptr((*self.prop).name) }
+ .to_str()
+ .expect("Auth property name should be valid UTF-8 data")
+ }
+
+ pub fn value(&self) -> &'a [u8] {
+ unsafe {
+ std::slice::from_raw_parts((*self.prop).value as *const u8, (*self.prop).value_length)
+ }
+ }
+
+ pub fn value_str(&self) -> Result<&'a str, std::str::Utf8Error> {
+ std::str::from_utf8(self.value())
+ }
+}
diff --git a/src/buf.rs b/src/buf.rs
new file mode 100644
index 0000000..7731fd5
--- /dev/null
+++ b/src/buf.rs
@@ -0,0 +1,573 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use grpcio_sys::*;
+use std::cell::UnsafeCell;
+use std::ffi::{c_void, CStr, CString};
+use std::fmt::{self, Debug, Formatter};
+use std::io::{self, BufRead, Read};
+use std::mem::{self, ManuallyDrop, MaybeUninit};
+
+/// A convenient rust wrapper for the type `grpc_slice`.
+///
+/// It's expected that the slice should be initialized.
+#[repr(C)]
+pub struct GrpcSlice(grpc_slice);
+
+impl GrpcSlice {
+ /// Get the length of the data.
+ pub fn len(&self) -> usize {
+ unsafe {
+ if !self.0.refcount.is_null() {
+ self.0.data.refcounted.length
+ } else {
+ self.0.data.inlined.length as usize
+ }
+ }
+ }
+
+ /// Returns a slice of inner buffer.
+ pub fn as_slice(&self) -> &[u8] {
+ unsafe {
+ if !self.0.refcount.is_null() {
+ let start = self.0.data.refcounted.bytes;
+ let len = self.0.data.refcounted.length;
+ std::slice::from_raw_parts(start, len)
+ } else {
+ let len = self.0.data.inlined.length;
+ &self.0.data.inlined.bytes[..len as usize]
+ }
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Creates a slice from static rust slice.
+ ///
+ /// Same as `From<&[u8]>` but without copying the buffer.
+ #[inline]
+ pub fn from_static_slice(s: &'static [u8]) -> GrpcSlice {
+ GrpcSlice(unsafe { grpc_slice_from_static_buffer(s.as_ptr() as _, s.len()) })
+ }
+
+ /// Creates a `GrpcSlice` from static rust str.
+ ///
+ /// Same as `from_str` but without allocation.
+ #[inline]
+ pub fn from_static_str(s: &'static str) -> GrpcSlice {
+ GrpcSlice::from_static_slice(s.as_bytes())
+ }
+}
+
+impl Clone for GrpcSlice {
+ /// Clone the slice.
+ ///
+ /// If the slice is not inlined, the reference count will be increased
+ /// instead of copy.
+ fn clone(&self) -> Self {
+ GrpcSlice(unsafe { grpc_slice_ref(self.0) })
+ }
+}
+
+impl Default for GrpcSlice {
+ /// Returns a default slice, which is empty.
+ fn default() -> Self {
+ GrpcSlice(unsafe { grpc_empty_slice() })
+ }
+}
+
+impl Debug for GrpcSlice {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ self.as_slice().fmt(f)
+ }
+}
+
+impl Drop for GrpcSlice {
+ fn drop(&mut self) {
+ unsafe {
+ grpc_slice_unref(self.0);
+ }
+ }
+}
+
+impl PartialEq<[u8]> for GrpcSlice {
+ fn eq(&self, r: &[u8]) -> bool {
+ // Technically, the equal function inside vtable should be used.
+ // But it's not cheap or safe to create a grpc_slice from rust slice.
+ self.as_slice() == r
+ }
+}
+
+impl PartialEq<GrpcSlice> for GrpcSlice {
+ fn eq(&self, r: &GrpcSlice) -> bool {
+ unsafe { grpc_slice_eq(self.0, r.0) != 0 }
+ }
+}
+
+unsafe extern "C" fn drop_vec(ptr: *mut c_void, len: usize) {
+ Vec::from_raw_parts(ptr as *mut u8, len, len);
+}
+
+impl From<Vec<u8>> for GrpcSlice {
+ /// Converts a `Vec<u8>` into `GrpcSlice`.
+ ///
+ /// If v can't fit inline, there will be allocations.
+ #[inline]
+ fn from(mut v: Vec<u8>) -> GrpcSlice {
+ if v.is_empty() {
+ return GrpcSlice::default();
+ }
+
+ if v.len() == v.capacity() {
+ let slice = unsafe {
+ grpcio_sys::grpc_slice_new_with_len(v.as_mut_ptr() as _, v.len(), Some(drop_vec))
+ };
+ mem::forget(v);
+ return GrpcSlice(slice);
+ }
+
+ unsafe {
+ GrpcSlice(grpcio_sys::grpc_slice_from_copied_buffer(
+ v.as_mut_ptr() as _,
+ v.len(),
+ ))
+ }
+ }
+}
+
+/// Creates a `GrpcSlice` from rust string.
+///
+/// If the string can't fit inline, there will be allocations.
+impl From<String> for GrpcSlice {
+ #[inline]
+ fn from(s: String) -> GrpcSlice {
+ GrpcSlice::from(s.into_bytes())
+ }
+}
+
+/// Creates a `GrpcSlice` from rust cstring.
+///
+/// If the cstring can't fit inline, there will be allocations.
+impl From<CString> for GrpcSlice {
+ #[inline]
+ fn from(s: CString) -> GrpcSlice {
+ GrpcSlice::from(s.into_bytes())
+ }
+}
+
+/// Creates a `GrpcSlice` from rust slice.
+///
+/// The data inside slice will be cloned. If the data can't fit inline,
+/// necessary buffer will be allocated.
+impl From<&'_ [u8]> for GrpcSlice {
+ #[inline]
+ fn from(s: &'_ [u8]) -> GrpcSlice {
+ GrpcSlice(unsafe { grpc_slice_from_copied_buffer(s.as_ptr() as _, s.len()) })
+ }
+}
+
+/// Creates a `GrpcSlice` from rust str.
+///
+/// The data inside str will be cloned. If the data can't fit inline,
+/// necessary buffer will be allocated.
+impl From<&'_ str> for GrpcSlice {
+ #[inline]
+ fn from(s: &'_ str) -> GrpcSlice {
+ GrpcSlice::from(s.as_bytes())
+ }
+}
+
+/// Creates a `GrpcSlice` from rust `CStr`.
+///
+/// The data inside `CStr` will be cloned. If the data can't fit inline,
+/// necessary buffer will be allocated.
+impl From<&'_ CStr> for GrpcSlice {
+ #[inline]
+ fn from(s: &'_ CStr) -> GrpcSlice {
+ GrpcSlice::from(s.to_bytes())
+ }
+}
+
+/// A collection of `GrpcBytes`.
+#[repr(C)]
+pub struct GrpcByteBuffer(*mut grpc_byte_buffer);
+
+impl GrpcByteBuffer {
+ #[inline]
+ pub unsafe fn from_raw(ptr: *mut grpc_byte_buffer) -> GrpcByteBuffer {
+ GrpcByteBuffer(ptr)
+ }
+}
+
+impl<'a> From<&'a [GrpcSlice]> for GrpcByteBuffer {
+ /// Create a buffer from the given slice array.
+ ///
+ /// A buffer is allocated for the whole slice array, and every slice will
+ /// be `Clone::clone` into the buffer.
+ fn from(slice: &'a [GrpcSlice]) -> Self {
+ let len = slice.len();
+ unsafe {
+ let s = slice.as_ptr() as *const grpc_slice as *const UnsafeCell<grpc_slice>;
+ // hack: see From<&GrpcSlice>.
+ GrpcByteBuffer(grpc_raw_byte_buffer_create((*s).get(), len))
+ }
+ }
+}
+
+impl<'a> From<&'a GrpcSlice> for GrpcByteBuffer {
+ /// Create a buffer from the given single slice.
+ ///
+ /// A buffer, which length is 1, is allocated for the slice.
+ #[allow(clippy::cast_ref_to_mut)]
+ fn from(s: &'a GrpcSlice) -> GrpcByteBuffer {
+ unsafe {
+ // hack: buffer_create accepts an mutable pointer to indicate it mutate
+ // ref count. Ref count is recorded by atomic variable, which is considered
+ // `Sync` in rust. This is an interesting difference in what is *mutable*
+ // between C++ and rust.
+ // Using `UnsafeCell` to avoid raw cast, which is UB.
+ let s = &*(s as *const GrpcSlice as *const grpc_slice as *const UnsafeCell<grpc_slice>);
+ GrpcByteBuffer(grpc_raw_byte_buffer_create((*s).get(), 1))
+ }
+ }
+}
+
+impl Clone for GrpcByteBuffer {
+ fn clone(&self) -> Self {
+ unsafe { GrpcByteBuffer(grpc_byte_buffer_copy(self.0)) }
+ }
+}
+
+impl Drop for GrpcByteBuffer {
+ fn drop(&mut self) {
+ unsafe { grpc_byte_buffer_destroy(self.0) }
+ }
+}
+
+/// A zero-copy reader for the message payload.
+///
+/// To achieve zero-copy, use the BufRead API `fill_buf` and `consume`
+/// to operate the reader.
+#[repr(C)]
+pub struct GrpcByteBufferReader {
+ reader: grpc_byte_buffer_reader,
+ /// Current reading buffer.
+ // This is a temporary buffer that may need to be dropped before every
+ // iteration. So use `ManuallyDrop` to control the behavior more clean
+ // and precisely.
+ slice: ManuallyDrop<GrpcSlice>,
+ /// The offset of `slice` that has not been read.
+ offset: usize,
+ /// How many bytes pending for reading.
+ remain: usize,
+}
+
+impl GrpcByteBufferReader {
+ /// Creates a reader for the `GrpcByteBuffer`.
+ ///
+ /// `buf` is stored inside the reader, and dropped when the reader is dropped.
+ pub fn new(buf: GrpcByteBuffer) -> GrpcByteBufferReader {
+ let mut reader = MaybeUninit::uninit();
+ let mut s = MaybeUninit::uninit();
+ unsafe {
+ let code = grpc_byte_buffer_reader_init(reader.as_mut_ptr(), buf.0);
+ assert_eq!(code, 1);
+ if 0 == grpc_byte_buffer_reader_next(reader.as_mut_ptr(), s.as_mut_ptr()) {
+ s.as_mut_ptr().write(grpc_empty_slice());
+ }
+ let remain = grpc_byte_buffer_length((*reader.as_mut_ptr()).buffer_out);
+ // buf is stored inside `reader` as `buffer_in`, so do not drop it.
+ mem::forget(buf);
+
+ GrpcByteBufferReader {
+ reader: reader.assume_init(),
+ slice: ManuallyDrop::new(GrpcSlice(s.assume_init())),
+ offset: 0,
+ remain,
+ }
+ }
+ }
+
+ /// Get the next slice from reader.
+ fn load_next_slice(&mut self) {
+ unsafe {
+ ManuallyDrop::drop(&mut self.slice);
+ if 0 == grpc_byte_buffer_reader_next(&mut self.reader, &mut self.slice.0) {
+ self.slice = ManuallyDrop::new(GrpcSlice::default());
+ }
+ }
+ self.offset = 0;
+ }
+
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.remain
+ }
+
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.remain == 0
+ }
+}
+
+impl Read for GrpcByteBufferReader {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let read = self.fill_buf()?.read(buf)?;
+ self.consume(read);
+ Ok(read)
+ }
+
+ fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
+ let cap = self.remain;
+ buf.reserve(cap);
+ let old_len = buf.len();
+ while self.remain > 0 {
+ let read = {
+ let s = match self.fill_buf() {
+ Ok(s) => s,
+ Err(e) => {
+ unsafe {
+ buf.set_len(old_len);
+ }
+ return Err(e);
+ }
+ };
+ buf.extend_from_slice(s);
+ s.len()
+ };
+ self.consume(read);
+ }
+ Ok(cap)
+ }
+}
+
+impl BufRead for GrpcByteBufferReader {
+ #[inline]
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ if self.slice.is_empty() {
+ return Ok(&[]);
+ }
+ Ok(unsafe { self.slice.as_slice().get_unchecked(self.offset..) })
+ }
+
+ fn consume(&mut self, mut amt: usize) {
+ if amt > self.remain {
+ amt = self.remain;
+ }
+ self.remain -= amt;
+ let mut offset = self.offset + amt;
+ while offset >= self.slice.len() && offset > 0 {
+ offset -= self.slice.len();
+ self.load_next_slice();
+ }
+ self.offset = offset;
+ }
+}
+
+impl Drop for GrpcByteBufferReader {
+ fn drop(&mut self) {
+ unsafe {
+ grpc_byte_buffer_reader_destroy(&mut self.reader);
+ ManuallyDrop::drop(&mut self.slice);
+ grpc_byte_buffer_destroy(self.reader.buffer_in);
+ }
+ }
+}
+
+unsafe impl Sync for GrpcByteBufferReader {}
+unsafe impl Send for GrpcByteBufferReader {}
+
+#[cfg(feature = "prost-codec")]
+impl bytes::Buf for GrpcByteBufferReader {
+ fn remaining(&self) -> usize {
+ self.remain
+ }
+
+ fn bytes(&self) -> &[u8] {
+ // This is similar but not identical to `BuffRead::fill_buf`, since `self`
+ // is not mutable, we can only return bytes up to the end of the current
+ // slice.
+
+ // Optimization for empty slice
+ if self.slice.is_empty() {
+ return &[];
+ }
+
+ unsafe { self.slice.as_slice().get_unchecked(self.offset..) }
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ self.consume(cnt);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn new_message_reader(seed: Vec<u8>, copy_count: usize) -> GrpcByteBufferReader {
+ let data = vec![GrpcSlice::from(seed); copy_count];
+ let buf = GrpcByteBuffer::from(data.as_slice());
+ GrpcByteBufferReader::new(buf)
+ }
+
+ #[test]
+ fn test_grpc_slice() {
+ let empty = GrpcSlice::default();
+ assert!(empty.is_empty());
+ assert_eq!(empty.len(), 0);
+ assert!(empty.as_slice().is_empty());
+
+ let a = vec![0, 2, 1, 3, 8];
+ let slice = GrpcSlice::from(a.clone());
+ assert_eq!(a.as_slice(), slice.as_slice());
+ assert_eq!(a.len(), slice.len());
+ assert_eq!(&slice, &*a);
+
+ let a = vec![5; 64];
+ let slice = GrpcSlice::from(a.clone());
+ assert_eq!(a.as_slice(), slice.as_slice());
+ assert_eq!(a.len(), slice.len());
+ assert_eq!(&slice, &*a);
+
+ let a = vec![];
+ let slice = GrpcSlice::from(a);
+ assert_eq!(empty, slice);
+ }
+
+ #[test]
+ // Old code crashes under a very weird circumstance, due to a typo in `MessageReader::consume`
+ fn test_typo_len_offset() {
+ let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
+ // half of the size of `data`
+ let half_size = data.len() / 2;
+ let slice = GrpcSlice::from(data.clone());
+ let buffer = GrpcByteBuffer::from(&slice);
+ let mut reader = GrpcByteBufferReader::new(buffer);
+ assert_eq!(reader.len(), data.len());
+ // first 3 elements of `data`
+ let mut buf = vec![0; half_size];
+ reader.read(buf.as_mut_slice()).unwrap();
+ assert_eq!(data[..half_size], *buf.as_slice());
+ assert_eq!(reader.len(), data.len() - half_size);
+ assert!(!reader.is_empty());
+ reader.read(&mut buf).unwrap();
+ assert_eq!(data[half_size..], *buf.as_slice());
+ assert_eq!(reader.len(), 0);
+ assert!(reader.is_empty());
+ }
+
+ #[test]
+ fn test_message_reader() {
+ for len in 0..=1024 {
+ for n_slice in 1..=4 {
+ let source = vec![len as u8; len];
+ let expect = vec![len as u8; len * n_slice];
+ // Test read.
+ let mut reader = new_message_reader(source.clone(), n_slice);
+ let mut dest = [0; 7];
+ let amt = reader.read(&mut dest).unwrap();
+
+ assert_eq!(
+ dest[..amt],
+ expect[..amt],
+ "len: {}, nslice: {}",
+ len,
+ n_slice
+ );
+
+ // Read after move.
+ let mut box_reader = Box::new(reader);
+ let amt = box_reader.read(&mut dest).unwrap();
+ assert_eq!(
+ dest[..amt],
+ expect[..amt],
+ "len: {}, nslice: {}",
+ len,
+ n_slice
+ );
+
+ // Test read_to_end.
+ let mut reader = new_message_reader(source.clone(), n_slice);
+ let mut dest = vec![];
+ reader.read_to_end(&mut dest).unwrap();
+ assert_eq!(dest, expect, "len: {}, nslice: {}", len, n_slice);
+
+ assert_eq!(0, reader.len());
+ assert_eq!(0, reader.read(&mut [1]).unwrap());
+
+ // Test arbitrary consuming.
+ let mut reader = new_message_reader(source.clone(), n_slice);
+ reader.consume(source.len() * (n_slice - 1));
+ let mut dest = vec![];
+ reader.read_to_end(&mut dest).unwrap();
+ assert_eq!(
+ dest.len(),
+ source.len(),
+ "len: {}, nslice: {}",
+ len,
+ n_slice
+ );
+ assert_eq!(
+ *dest,
+ expect[expect.len() - source.len()..],
+ "len: {}, nslice: {}",
+ len,
+ n_slice
+ );
+ assert_eq!(0, reader.len());
+ assert_eq!(0, reader.read(&mut [1]).unwrap());
+ }
+ }
+ }
+
+ #[test]
+ fn test_converter() {
+ let a = vec![1, 2, 3, 0];
+ assert_eq!(GrpcSlice::from(a.clone()).as_slice(), a.as_slice());
+ assert_eq!(GrpcSlice::from(a.as_slice()).as_slice(), a.as_slice());
+
+ let s = "abcd".to_owned();
+ assert_eq!(GrpcSlice::from(s.clone()).as_slice(), s.as_bytes());
+ assert_eq!(GrpcSlice::from(s.as_str()).as_slice(), s.as_bytes());
+
+ let cs = CString::new(s.clone()).unwrap();
+ assert_eq!(GrpcSlice::from(cs.clone()).as_slice(), s.as_bytes());
+ assert_eq!(GrpcSlice::from(cs.as_c_str()).as_slice(), s.as_bytes());
+ }
+
+ #[cfg(feature = "prost-codec")]
+ #[test]
+ fn test_buf_impl() {
+ use bytes::Buf;
+
+ for len in 0..1024 + 1 {
+ for n_slice in 1..4 {
+ let source = vec![len as u8; len];
+
+ let mut reader = new_message_reader(source.clone(), n_slice);
+
+ let mut remaining = len * n_slice;
+ let mut count = 100;
+ while reader.remaining() > 0 {
+ assert_eq!(remaining, reader.remaining());
+ let bytes = Buf::bytes(&reader);
+ bytes.iter().for_each(|b| assert_eq!(*b, len as u8));
+ let mut read = bytes.len();
+ // We don't have to advance by the whole amount we read.
+ if read > 5 && len % 2 == 0 {
+ read -= 5;
+ }
+ reader.advance(read);
+ remaining -= read;
+ count -= 1;
+ assert!(count > 0);
+ }
+
+ assert_eq!(0, remaining);
+ assert_eq!(0, reader.remaining());
+ }
+ }
+ }
+}
diff --git a/src/call/client.rs b/src/call/client.rs
new file mode 100644
index 0000000..eac0db4
--- /dev/null
+++ b/src/call/client.rs
@@ -0,0 +1,565 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::pin::Pin;
+use std::ptr;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::grpc_sys;
+use futures::ready;
+use futures::sink::Sink;
+use futures::stream::Stream;
+use futures::task::{Context, Poll};
+use parking_lot::Mutex;
+use std::future::Future;
+
+use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags};
+use crate::call::{check_run, Call, MessageReader, Method};
+use crate::channel::Channel;
+use crate::codec::{DeserializeFn, SerializeFn};
+use crate::error::{Error, Result};
+use crate::metadata::Metadata;
+use crate::task::{BatchFuture, BatchType};
+
+/// Update the flag bit in res.
+#[inline]
+pub fn change_flag(res: &mut u32, flag: u32, set: bool) {
+ if set {
+ *res |= flag;
+ } else {
+ *res &= !flag;
+ }
+}
+
+/// Options for calls made by client.
+#[derive(Clone, Default)]
+pub struct CallOption {
+ timeout: Option<Duration>,
+ write_flags: WriteFlags,
+ call_flags: u32,
+ headers: Option<Metadata>,
+}
+
+impl CallOption {
+ /// Signal that the call is idempotent.
+ pub fn idempotent(mut self, is_idempotent: bool) -> CallOption {
+ change_flag(
+ &mut self.call_flags,
+ grpc_sys::GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST,
+ is_idempotent,
+ );
+ self
+ }
+
+ /// Signal that the call should not return UNAVAILABLE before it has started.
+ pub fn wait_for_ready(mut self, wait_for_ready: bool) -> CallOption {
+ change_flag(
+ &mut self.call_flags,
+ grpc_sys::GRPC_INITIAL_METADATA_WAIT_FOR_READY,
+ wait_for_ready,
+ );
+ self
+ }
+
+ /// Signal that the call is cacheable. gRPC is free to use GET verb.
+ pub fn cacheable(mut self, cacheable: bool) -> CallOption {
+ change_flag(
+ &mut self.call_flags,
+ grpc_sys::GRPC_INITIAL_METADATA_CACHEABLE_REQUEST,
+ cacheable,
+ );
+ self
+ }
+
+ /// Set write flags.
+ pub fn write_flags(mut self, write_flags: WriteFlags) -> CallOption {
+ self.write_flags = write_flags;
+ self
+ }
+
+ /// Set a timeout.
+ pub fn timeout(mut self, timeout: Duration) -> CallOption {
+ self.timeout = Some(timeout);
+ self
+ }
+
+ /// Get the timeout.
+ pub fn get_timeout(&self) -> Option<Duration> {
+ self.timeout
+ }
+
+ /// Set the headers to be sent with the call.
+ pub fn headers(mut self, meta: Metadata) -> CallOption {
+ self.headers = Some(meta);
+ self
+ }
+
+ /// Get headers to be sent with the call.
+ pub fn get_headers(&self) -> Option<&Metadata> {
+ self.headers.as_ref()
+ }
+}
+
+impl Call {
+ pub fn unary_async<Req, Resp>(
+ channel: &Channel,
+ method: &Method<Req, Resp>,
+ req: &Req,
+ mut opt: CallOption,
+ ) -> Result<ClientUnaryReceiver<Resp>> {
+ let call = channel.create_call(method, &opt)?;
+ let mut payload = vec![];
+ (method.req_ser())(req, &mut payload);
+ let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_start_unary(
+ call.call,
+ ctx,
+ payload.as_ptr() as *const _,
+ payload.len(),
+ opt.write_flags.flags,
+ opt.headers
+ .as_mut()
+ .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
+ opt.call_flags,
+ tag,
+ )
+ });
+ Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de()))
+ }
+
+ pub fn client_streaming<Req, Resp>(
+ channel: &Channel,
+ method: &Method<Req, Resp>,
+ mut opt: CallOption,
+ ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> {
+ let call = channel.create_call(method, &opt)?;
+ let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_start_client_streaming(
+ call.call,
+ ctx,
+ opt.headers
+ .as_mut()
+ .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
+ opt.call_flags,
+ tag,
+ )
+ });
+
+ let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f)));
+ let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser());
+ let recv = ClientCStreamReceiver {
+ call: share_call,
+ resp_de: method.resp_de(),
+ finished: false,
+ };
+ Ok((sink, recv))
+ }
+
+ pub fn server_streaming<Req, Resp>(
+ channel: &Channel,
+ method: &Method<Req, Resp>,
+ req: &Req,
+ mut opt: CallOption,
+ ) -> Result<ClientSStreamReceiver<Resp>> {
+ let call = channel.create_call(method, &opt)?;
+ let mut payload = vec![];
+ (method.req_ser())(req, &mut payload);
+ let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_start_server_streaming(
+ call.call,
+ ctx,
+ payload.as_ptr() as _,
+ payload.len(),
+ opt.write_flags.flags,
+ opt.headers
+ .as_mut()
+ .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
+ opt.call_flags,
+ tag,
+ )
+ });
+
+ // TODO: handle header
+ check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
+ });
+
+ Ok(ClientSStreamReceiver::new(call, cq_f, method.resp_de()))
+ }
+
+ pub fn duplex_streaming<Req, Resp>(
+ channel: &Channel,
+ method: &Method<Req, Resp>,
+ mut opt: CallOption,
+ ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> {
+ let call = channel.create_call(method, &opt)?;
+ let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_start_duplex_streaming(
+ call.call,
+ ctx,
+ opt.headers
+ .as_mut()
+ .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
+ opt.call_flags,
+ tag,
+ )
+ });
+
+ // TODO: handle header.
+ check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
+ });
+
+ let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f)));
+ let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser());
+ let recv = ClientDuplexReceiver::new(share_call, method.resp_de());
+ Ok((sink, recv))
+ }
+}
+
+/// A receiver for unary request.
+///
+/// The future is resolved once response is received.
+#[must_use = "if unused the ClientUnaryReceiver may immediately cancel the RPC"]
+pub struct ClientUnaryReceiver<T> {
+ call: Call,
+ resp_f: BatchFuture,
+ resp_de: DeserializeFn<T>,
+}
+
+impl<T> ClientUnaryReceiver<T> {
+ fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T> {
+ ClientUnaryReceiver {
+ call,
+ resp_f,
+ resp_de,
+ }
+ }
+
+ /// Cancel the call.
+ #[inline]
+ pub fn cancel(&mut self) {
+ self.call.cancel()
+ }
+
+ #[inline]
+ pub fn resp_de(&self, reader: MessageReader) -> Result<T> {
+ (self.resp_de)(reader)
+ }
+}
+
+impl<T> Future for ClientUnaryReceiver<T> {
+ type Output = Result<T>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>> {
+ let data = ready!(Pin::new(&mut self.resp_f).poll(cx)?);
+ let t = self.resp_de(data.unwrap())?;
+ Poll::Ready(Ok(t))
+ }
+}
+
+/// A receiver for client streaming call.
+///
+/// If the corresponding sink has dropped or cancelled, this will poll a
+/// [`RpcFailure`] error with the [`Cancelled`] status.
+///
+/// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure
+/// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled
+#[must_use = "if unused the ClientCStreamReceiver may immediately cancel the RPC"]
+pub struct ClientCStreamReceiver<T> {
+ call: Arc<Mutex<ShareCall>>,
+ resp_de: DeserializeFn<T>,
+ finished: bool,
+}
+
+impl<T> ClientCStreamReceiver<T> {
+ /// Cancel the call.
+ pub fn cancel(&mut self) {
+ let lock = self.call.lock();
+ lock.call.cancel()
+ }
+
+ #[inline]
+ pub fn resp_de(&self, reader: MessageReader) -> Result<T> {
+ (self.resp_de)(reader)
+ }
+}
+
+impl<T> Drop for ClientCStreamReceiver<T> {
+ /// The corresponding RPC will be canceled if the receiver did not
+ /// finish before dropping.
+ fn drop(&mut self) {
+ if !self.finished {
+ self.cancel();
+ }
+ }
+}
+
+impl<T> Future for ClientCStreamReceiver<T> {
+ type Output = Result<T>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>> {
+ let data = {
+ let mut call = self.call.lock();
+ ready!(call.poll_finish(cx)?)
+ };
+ let t = (self.resp_de)(data.unwrap())?;
+ self.finished = true;
+ Poll::Ready(Ok(t))
+ }
+}
+
+/// A sink for client streaming call and duplex streaming call.
+/// To close the sink properly, you should call [`close`] before dropping.
+///
+/// [`close`]: #method.close
+#[must_use = "if unused the StreamingCallSink may immediately cancel the RPC"]
+pub struct StreamingCallSink<Req> {
+ call: Arc<Mutex<ShareCall>>,
+ sink_base: SinkBase,
+ close_f: Option<BatchFuture>,
+ req_ser: SerializeFn<Req>,
+}
+
+impl<Req> StreamingCallSink<Req> {
+ fn new(call: Arc<Mutex<ShareCall>>, req_ser: SerializeFn<Req>) -> StreamingCallSink<Req> {
+ StreamingCallSink {
+ call,
+ sink_base: SinkBase::new(false),
+ close_f: None,
+ req_ser,
+ }
+ }
+
+ pub fn cancel(&mut self) {
+ let call = self.call.lock();
+ call.call.cancel()
+ }
+}
+
+impl<P> Drop for StreamingCallSink<P> {
+ /// The corresponding RPC will be canceled if the sink did not call
+ /// [`close`] before dropping.
+ ///
+ /// [`close`]: #method.close
+ fn drop(&mut self) {
+ if self.close_f.is_none() {
+ self.cancel();
+ }
+ }
+}
+
+impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> {
+ type Error = Error;
+
+ #[inline]
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ Pin::new(&mut self.sink_base).poll_ready(cx)
+ }
+
+ #[inline]
+ fn start_send(mut self: Pin<&mut Self>, (msg, flags): (Req, WriteFlags)) -> Result<()> {
+ {
+ let mut call = self.call.lock();
+ call.check_alive()?;
+ }
+ let t = &mut *self;
+ Pin::new(&mut t.sink_base).start_send(&mut t.call, &msg, flags, t.req_ser)
+ }
+
+ #[inline]
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ {
+ let mut call = self.call.lock();
+ call.check_alive()?;
+ }
+ Pin::new(&mut self.sink_base).poll_ready(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ let t = &mut *self;
+ let mut call = t.call.lock();
+ if t.close_f.is_none() {
+ ready!(Pin::new(&mut t.sink_base).poll_ready(cx)?);
+
+ let close_f = call.call.start_send_close_client()?;
+ t.close_f = Some(close_f);
+ }
+
+ if let Poll::Pending = Pin::new(t.close_f.as_mut().unwrap()).poll(cx)? {
+ // if call is finished, can return early here.
+ call.check_alive()?;
+ return Poll::Pending;
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+/// A sink for client streaming call.
+///
+/// To close the sink properly, you should call [`close`] before dropping.
+///
+/// [`close`]: #method.close
+pub type ClientCStreamSender<T> = StreamingCallSink<T>;
+/// A sink for duplex streaming call.
+///
+/// To close the sink properly, you should call [`close`] before dropping.
+///
+/// [`close`]: #method.close
+pub type ClientDuplexSender<T> = StreamingCallSink<T>;
+
+struct ResponseStreamImpl<H, T> {
+ call: H,
+ msg_f: Option<BatchFuture>,
+ read_done: bool,
+ finished: bool,
+ resp_de: DeserializeFn<T>,
+}
+
+impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
+ fn new(call: H, resp_de: DeserializeFn<T>) -> ResponseStreamImpl<H, T> {
+ ResponseStreamImpl {
+ call,
+ msg_f: None,
+ read_done: false,
+ finished: false,
+ resp_de,
+ }
+ }
+
+ fn cancel(&mut self) {
+ self.call.call(|c| c.call.cancel())
+ }
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
+ if !self.finished {
+ let t = &mut *self;
+ let finished = &mut t.finished;
+ let _ = t.call.call(|c| {
+ let res = c.poll_finish(cx);
+ *finished = c.finished;
+ res
+ })?;
+ }
+
+ let mut bytes = None;
+ loop {
+ if !self.read_done {
+ if let Some(msg_f) = &mut self.msg_f {
+ bytes = ready!(Pin::new(msg_f).poll(cx)?);
+ if bytes.is_none() {
+ self.read_done = true;
+ }
+ }
+ }
+
+ if self.read_done {
+ if self.finished {
+ return Poll::Ready(None);
+ }
+ return Poll::Pending;
+ }
+
+ // so msg_f must be either stale or not initialised yet.
+ self.msg_f.take();
+ let msg_f = self.call.call(|c| c.call.start_recv_message())?;
+ self.msg_f = Some(msg_f);
+ if let Some(data) = bytes {
+ let msg = (self.resp_de)(data)?;
+ return Poll::Ready(Some(Ok(msg)));
+ }
+ }
+ }
+
+ // Cancel the call if we still have some messages or did not
+ // receive status code.
+ fn on_drop(&mut self) {
+ if !self.read_done || !self.finished {
+ self.cancel();
+ }
+ }
+}
+
+/// A receiver for server streaming call.
+#[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"]
+pub struct ClientSStreamReceiver<Resp> {
+ imp: ResponseStreamImpl<ShareCall, Resp>,
+}
+
+impl<Resp> ClientSStreamReceiver<Resp> {
+ fn new(
+ call: Call,
+ finish_f: BatchFuture,
+ de: DeserializeFn<Resp>,
+ ) -> ClientSStreamReceiver<Resp> {
+ let share_call = ShareCall::new(call, finish_f);
+ ClientSStreamReceiver {
+ imp: ResponseStreamImpl::new(share_call, de),
+ }
+ }
+
+ pub fn cancel(&mut self) {
+ self.imp.cancel()
+ }
+}
+
+impl<Resp> Stream for ClientSStreamReceiver<Resp> {
+ type Item = Result<Resp>;
+
+ #[inline]
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ Pin::new(&mut self.imp).poll(cx)
+ }
+}
+
+/// A response receiver for duplex call.
+///
+/// If the corresponding sink has dropped or cancelled, this will poll a
+/// [`RpcFailure`] error with the [`Cancelled`] status.
+///
+/// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure
+/// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled
+#[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"]
+pub struct ClientDuplexReceiver<Resp> {
+ imp: ResponseStreamImpl<Arc<Mutex<ShareCall>>, Resp>,
+}
+
+impl<Resp> ClientDuplexReceiver<Resp> {
+ fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<Resp>) -> ClientDuplexReceiver<Resp> {
+ ClientDuplexReceiver {
+ imp: ResponseStreamImpl::new(call, de),
+ }
+ }
+
+ pub fn cancel(&mut self) {
+ self.imp.cancel()
+ }
+}
+
+impl<Resp> Drop for ClientDuplexReceiver<Resp> {
+ /// The corresponding RPC will be canceled if the receiver did not
+ /// finish before dropping.
+ fn drop(&mut self) {
+ self.imp.on_drop()
+ }
+}
+
+impl<Resp> Stream for ClientDuplexReceiver<Resp> {
+ type Item = Result<Resp>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ Pin::new(&mut self.imp).poll(cx)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn test_change_flag() {
+ let mut flag = 2 | 4;
+ super::change_flag(&mut flag, 8, true);
+ assert_eq!(flag, 2 | 4 | 8);
+ super::change_flag(&mut flag, 4, false);
+ assert_eq!(flag, 2 | 8);
+ }
+}
diff --git a/src/call/mod.rs b/src/call/mod.rs
new file mode 100644
index 0000000..03e520a
--- /dev/null
+++ b/src/call/mod.rs
@@ -0,0 +1,686 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+pub mod client;
+pub mod server;
+
+use std::fmt::{self, Debug, Display};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::{ptr, slice};
+
+use crate::cq::CompletionQueue;
+use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
+use futures::future::Future;
+use futures::ready;
+use futures::task::{Context, Poll};
+use libc::c_void;
+use parking_lot::Mutex;
+
+use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader};
+use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
+use crate::error::{Error, Result};
+use crate::grpc_sys::grpc_status_code::*;
+use crate::task::{self, BatchFuture, BatchType, CallTag};
+
+// By default buffers in `SinkBase` will be shrink to 4K size.
+const BUF_SHRINK_SIZE: usize = 4 * 1024;
+
+/// An gRPC status code structure.
+/// This type contains constants for all gRPC status codes.
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub struct RpcStatusCode(i32);
+
+impl From<i32> for RpcStatusCode {
+ fn from(code: i32) -> RpcStatusCode {
+ RpcStatusCode(code)
+ }
+}
+
+impl Into<i32> for RpcStatusCode {
+ fn into(self) -> i32 {
+ self.0
+ }
+}
+
+impl Display for RpcStatusCode {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ Debug::fmt(self, f)
+ }
+}
+
+macro_rules! status_codes {
+ (
+ $(
+ ($num:path, $konst:ident);
+ )+
+ ) => {
+ impl RpcStatusCode {
+ $(
+ pub const $konst: RpcStatusCode = RpcStatusCode($num);
+ )+
+ }
+
+ impl Debug for RpcStatusCode {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "{}-{}",
+ self.0,
+ match self {
+ $(RpcStatusCode($num) => stringify!($konst),)+
+ RpcStatusCode(_) => "INVALID_STATUS_CODE",
+ }
+ )
+ }
+ }
+ }
+}
+
+status_codes! {
+ (GRPC_STATUS_OK, OK);
+ (GRPC_STATUS_CANCELLED, CANCELLED);
+ (GRPC_STATUS_UNKNOWN, UNKNOWN);
+ (GRPC_STATUS_INVALID_ARGUMENT, INVALID_ARGUMENT);
+ (GRPC_STATUS_DEADLINE_EXCEEDED, DEADLINE_EXCEEDED);
+ (GRPC_STATUS_NOT_FOUND, NOT_FOUND);
+ (GRPC_STATUS_ALREADY_EXISTS, ALREADY_EXISTS);
+ (GRPC_STATUS_PERMISSION_DENIED, PERMISSION_DENIED);
+ (GRPC_STATUS_RESOURCE_EXHAUSTED, RESOURCE_EXHAUSTED);
+ (GRPC_STATUS_FAILED_PRECONDITION, FAILED_PRECONDITION);
+ (GRPC_STATUS_ABORTED, ABORTED);
+ (GRPC_STATUS_OUT_OF_RANGE, OUT_OF_RANGE);
+ (GRPC_STATUS_UNIMPLEMENTED, UNIMPLEMENTED);
+ (GRPC_STATUS_INTERNAL, INTERNAL);
+ (GRPC_STATUS_UNAVAILABLE, UNAVAILABLE);
+ (GRPC_STATUS_DATA_LOSS, DATA_LOSS);
+ (GRPC_STATUS_UNAUTHENTICATED, UNAUTHENTICATED);
+ (GRPC_STATUS__DO_NOT_USE, DO_NOT_USE);
+}
+
+/// Method types supported by gRPC.
+#[derive(Clone, Copy)]
+pub enum MethodType {
+ /// Single request sent from client, single response received from server.
+ Unary,
+
+ /// Stream of requests sent from client, single response received from server.
+ ClientStreaming,
+
+ /// Single request sent from client, stream of responses received from server.
+ ServerStreaming,
+
+ /// Both server and client can stream arbitrary number of requests and responses simultaneously.
+ Duplex,
+}
+
+/// A description of a remote method.
+// TODO: add serializer and deserializer.
+pub struct Method<Req, Resp> {
+ /// Type of method.
+ pub ty: MethodType,
+
+ /// Full qualified name of the method.
+ pub name: &'static str,
+
+ /// The marshaller used for request messages.
+ pub req_mar: Marshaller<Req>,
+
+ /// The marshaller used for response messages.
+ pub resp_mar: Marshaller<Resp>,
+}
+
+impl<Req, Resp> Method<Req, Resp> {
+ /// Get the request serializer.
+ #[inline]
+ pub fn req_ser(&self) -> SerializeFn<Req> {
+ self.req_mar.ser
+ }
+
+ /// Get the request deserializer.
+ #[inline]
+ pub fn req_de(&self) -> DeserializeFn<Req> {
+ self.req_mar.de
+ }
+
+ /// Get the response serializer.
+ #[inline]
+ pub fn resp_ser(&self) -> SerializeFn<Resp> {
+ self.resp_mar.ser
+ }
+
+ /// Get the response deserializer.
+ #[inline]
+ pub fn resp_de(&self) -> DeserializeFn<Resp> {
+ self.resp_mar.de
+ }
+}
+
+/// RPC result returned from the server.
+#[derive(Debug, Clone)]
+pub struct RpcStatus {
+ /// gRPC status code. `Ok` indicates success, all other values indicate an error.
+ pub status: RpcStatusCode,
+
+ /// Optional detail string.
+ pub details: Option<String>,
+}
+
+impl Display for RpcStatus {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ Debug::fmt(self, fmt)
+ }
+}
+
+impl RpcStatus {
+ /// Create a new [`RpcStatus`].
+ pub fn new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus {
+ RpcStatus {
+ status: code.into(),
+ details,
+ }
+ }
+
+ /// Create a new [`RpcStatus`] that status code is Ok.
+ pub fn ok() -> RpcStatus {
+ RpcStatus::new(RpcStatusCode::OK, None)
+ }
+}
+
+pub type MessageReader = GrpcByteBufferReader;
+
+/// Context for batch request.
+pub struct BatchContext {
+ ctx: *mut grpcwrap_batch_context,
+}
+
+impl BatchContext {
+ pub fn new() -> BatchContext {
+ BatchContext {
+ ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() },
+ }
+ }
+
+ pub fn as_ptr(&self) -> *mut grpcwrap_batch_context {
+ self.ctx
+ }
+
+ pub fn take_recv_message(&self) -> Option<GrpcByteBuffer> {
+ let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) };
+ if ptr.is_null() {
+ None
+ } else {
+ Some(unsafe { GrpcByteBuffer::from_raw(ptr) })
+ }
+ }
+
+ /// Get the status of the rpc call.
+ pub fn rpc_status(&self) -> RpcStatus {
+ let status = RpcStatusCode(unsafe {
+ grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
+ });
+
+ let details = if status == RpcStatusCode::OK {
+ None
+ } else {
+ unsafe {
+ let mut details_len = 0;
+ let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
+ self.ctx,
+ &mut details_len,
+ );
+ let details_slice = slice::from_raw_parts(details_ptr as *const _, details_len);
+ Some(String::from_utf8_lossy(details_slice).into_owned())
+ }
+ };
+
+ RpcStatus::new(status, details)
+ }
+
+ /// Fetch the response bytes of the rpc call.
+ pub fn recv_message(&mut self) -> Option<MessageReader> {
+ let buf = self.take_recv_message()?;
+ Some(GrpcByteBufferReader::new(buf))
+ }
+}
+
+impl Drop for BatchContext {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) }
+ }
+}
+
+#[inline]
+fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void) {
+ let tag_box = Box::new(tag);
+ (
+ tag_box.batch_ctx().unwrap().as_ptr(),
+ Box::into_raw(tag_box) as _,
+ )
+}
+
+/// A helper function that runs the batch call and checks the result.
+fn check_run<F>(bt: BatchType, f: F) -> BatchFuture
+where
+ F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,
+{
+ let (cq_f, tag) = CallTag::batch_pair(bt);
+ let (batch_ptr, tag_ptr) = box_batch_tag(tag);
+ let code = f(batch_ptr, tag_ptr);
+ if code != grpc_call_error::GRPC_CALL_OK {
+ unsafe {
+ Box::from_raw(tag_ptr);
+ }
+ panic!("create call fail: {:?}", code);
+ }
+ cq_f
+}
+
+/// A Call represents an RPC.
+///
+/// When created, it is in a configuration state allowing properties to be
+/// set until it is invoked. After invoke, the Call can have messages
+/// written to it and read from it.
+pub struct Call {
+ pub call: *mut grpc_call,
+ pub cq: CompletionQueue,
+}
+
+unsafe impl Send for Call {}
+
+impl Call {
+ pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call {
+ assert!(!call.is_null());
+ Call { call, cq }
+ }
+
+ /// Send a message asynchronously.
+ pub fn start_send_message(
+ &mut self,
+ msg: &[u8],
+ write_flags: u32,
+ initial_meta: bool,
+ ) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let i = if initial_meta { 1 } else { 0 };
+ let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_send_message(
+ self.call,
+ ctx,
+ msg.as_ptr() as _,
+ msg.len(),
+ write_flags,
+ i,
+ tag,
+ )
+ });
+ Ok(f)
+ }
+
+ /// Finish the rpc call from client.
+ pub fn start_send_close_client(&mut self) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let f = check_run(BatchType::Finish, |_, tag| unsafe {
+ grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag)
+ });
+ Ok(f)
+ }
+
+ /// Receive a message asynchronously.
+ pub fn start_recv_message(&mut self) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let f = check_run(BatchType::Read, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag)
+ });
+ Ok(f)
+ }
+
+ /// Start handling from server side.
+ ///
+ /// Future will finish once close is received by the server.
+ pub fn start_server_side(&mut self) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag)
+ });
+ Ok(f)
+ }
+
+ /// Send a status from server.
+ pub fn start_send_status_from_server(
+ &mut self,
+ status: &RpcStatus,
+ send_empty_metadata: bool,
+ payload: &Option<Vec<u8>>,
+ write_flags: u32,
+ ) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
+ let (payload_ptr, payload_len) = payload
+ .as_ref()
+ .map_or((ptr::null(), 0), |b| (b.as_ptr(), b.len()));
+ let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ let details_ptr = status
+ .details
+ .as_ref()
+ .map_or_else(ptr::null, |s| s.as_ptr() as _);
+ let details_len = status.details.as_ref().map_or(0, String::len);
+ grpc_sys::grpcwrap_call_send_status_from_server(
+ self.call,
+ ctx,
+ status.status.into(),
+ details_ptr,
+ details_len,
+ ptr::null_mut(),
+ send_empty_metadata,
+ payload_ptr as _,
+ payload_len,
+ write_flags,
+ tag,
+ )
+ });
+ Ok(f)
+ }
+
+ /// Abort an rpc call before handler is called.
+ pub fn abort(self, status: &RpcStatus) {
+ match self.cq.borrow() {
+ // Queue is shutdown, ignore.
+ Err(Error::QueueShutdown) => return,
+ Err(e) => panic!("unexpected error when aborting call: {:?}", e),
+ _ => {}
+ }
+ let call_ptr = self.call;
+ let tag = CallTag::abort(self);
+ let (batch_ptr, tag_ptr) = box_batch_tag(tag);
+
+ let code = unsafe {
+ let details_ptr = status
+ .details
+ .as_ref()
+ .map_or_else(ptr::null, |s| s.as_ptr() as _);
+ let details_len = status.details.as_ref().map_or(0, String::len);
+ grpc_sys::grpcwrap_call_send_status_from_server(
+ call_ptr,
+ batch_ptr,
+ status.status.into(),
+ details_ptr,
+ details_len,
+ ptr::null_mut(),
+ 1,
+ ptr::null(),
+ 0,
+ 0,
+ tag_ptr as *mut c_void,
+ )
+ };
+ if code != grpc_call_error::GRPC_CALL_OK {
+ unsafe {
+ Box::from_raw(tag_ptr);
+ }
+ panic!("create call fail: {:?}", code);
+ }
+ }
+
+ /// Cancel the rpc call by client.
+ fn cancel(&self) {
+ match self.cq.borrow() {
+ // Queue is shutdown, ignore.
+ Err(Error::QueueShutdown) => return,
+ Err(e) => panic!("unexpected error when canceling call: {:?}", e),
+ _ => {}
+ }
+ unsafe {
+ grpc_sys::grpc_call_cancel(self.call, ptr::null_mut());
+ }
+ }
+}
+
+impl Drop for Call {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpc_call_unref(self.call) }
+ }
+}
+
+/// A share object for client streaming and duplex streaming call.
+///
+/// In both cases, receiver and sender can be polled in the same time,
+/// hence we need to share the call in the both sides and abort the sink
+/// once the call is canceled or finished early.
+struct ShareCall {
+ call: Call,
+ close_f: BatchFuture,
+ finished: bool,
+ status: Option<RpcStatus>,
+}
+
+impl ShareCall {
+ fn new(call: Call, close_f: BatchFuture) -> ShareCall {
+ ShareCall {
+ call,
+ close_f,
+ finished: false,
+ status: None,
+ }
+ }
+
+ /// Poll if the call is still alive.
+ ///
+ /// If the call is still running, will register a notification for its completion.
+ fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>> {
+ let res = match Pin::new(&mut self.close_f).poll(cx) {
+ Poll::Ready(Ok(reader)) => {
+ self.status = Some(RpcStatus::ok());
+ Poll::Ready(Ok(reader))
+ }
+ Poll::Pending => return Poll::Pending,
+ Poll::Ready(Err(Error::RpcFailure(status))) => {
+ self.status = Some(status.clone());
+ Poll::Ready(Err(Error::RpcFailure(status)))
+ }
+ res => res,
+ };
+
+ self.finished = true;
+ res
+ }
+
+ /// Check if the call is finished.
+ fn check_alive(&mut self) -> Result<()> {
+ if self.finished {
+ // maybe can just take here.
+ return Err(Error::RpcFinished(self.status.clone()));
+ }
+
+ task::check_alive(&self.close_f)
+ }
+}
+
+/// A helper trait that allows executing function on the internal `ShareCall` struct.
+trait ShareCallHolder {
+ fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R;
+}
+
+impl ShareCallHolder for ShareCall {
+ fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
+ f(self)
+ }
+}
+
+impl ShareCallHolder for Arc<Mutex<ShareCall>> {
+ fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
+ let mut call = self.lock();
+ f(&mut call)
+ }
+}
+
+/// A helper struct for constructing Stream object for batch requests.
+struct StreamingBase {
+ close_f: Option<BatchFuture>,
+ msg_f: Option<BatchFuture>,
+ read_done: bool,
+}
+
+impl StreamingBase {
+ fn new(close_f: Option<BatchFuture>) -> StreamingBase {
+ StreamingBase {
+ close_f,
+ msg_f: None,
+ read_done: false,
+ }
+ }
+
+ fn poll<C: ShareCallHolder>(
+ &mut self,
+ cx: &mut Context,
+ call: &mut C,
+ skip_finish_check: bool,
+ ) -> Poll<Option<Result<MessageReader>>> {
+ if !skip_finish_check {
+ let mut finished = false;
+ if let Some(close_f) = &mut self.close_f {
+ if let Poll::Ready(_) = Pin::new(close_f).poll(cx)? {
+ // Don't return immediately, there may be pending data.
+ finished = true;
+ }
+ }
+ if finished {
+ self.close_f.take();
+ }
+ }
+
+ let mut bytes = None;
+ if !self.read_done {
+ if let Some(msg_f) = &mut self.msg_f {
+ bytes = ready!(Pin::new(msg_f).poll(cx)?);
+ if bytes.is_none() {
+ self.read_done = true;
+ }
+ }
+ }
+
+ if self.read_done {
+ if self.close_f.is_none() {
+ return Poll::Ready(None);
+ }
+ return Poll::Pending;
+ }
+
+ // so msg_f must be either stale or not initialized yet.
+ self.msg_f.take();
+ let msg_f = call.call(|c| c.call.start_recv_message())?;
+ self.msg_f = Some(msg_f);
+ if bytes.is_none() {
+ self.poll(cx, call, true)
+ } else {
+ Poll::Ready(bytes.map(Ok))
+ }
+ }
+
+ // Cancel the call if we still have some messages or did not
+ // receive status code.
+ fn on_drop<C: ShareCallHolder>(&self, call: &mut C) {
+ if !self.read_done || self.close_f.is_some() {
+ call.call(|c| c.call.cancel());
+ }
+ }
+}
+
+/// Flags for write operations.
+#[derive(Default, Clone, Copy)]
+pub struct WriteFlags {
+ flags: u32,
+}
+
+impl WriteFlags {
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ ///
+ /// gRPC is free to buffer the message until the next non-buffered write, or until write stream
+ /// completion, but it need not buffer completely or at all.
+ pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags {
+ client::change_flag(
+ &mut self.flags,
+ grpc_sys::GRPC_WRITE_BUFFER_HINT,
+ need_buffered,
+ );
+ self
+ }
+
+ /// Force compression to be disabled.
+ pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags {
+ client::change_flag(
+ &mut self.flags,
+ grpc_sys::GRPC_WRITE_NO_COMPRESS,
+ no_compress,
+ );
+ self
+ }
+
+ /// Get whether buffer hint is enabled.
+ pub fn get_buffer_hint(self) -> bool {
+ (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0
+ }
+
+ /// Get whether compression is disabled.
+ pub fn get_force_no_compress(self) -> bool {
+ (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0
+ }
+}
+
+/// A helper struct for constructing Sink object for batch requests.
+struct SinkBase {
+ batch_f: Option<BatchFuture>,
+ buf: Vec<u8>,
+ send_metadata: bool,
+}
+
+impl SinkBase {
+ fn new(send_metadata: bool) -> SinkBase {
+ SinkBase {
+ batch_f: None,
+ buf: Vec::new(),
+ send_metadata,
+ }
+ }
+
+ fn start_send<T, C: ShareCallHolder>(
+ &mut self,
+ call: &mut C,
+ t: &T,
+ mut flags: WriteFlags,
+ ser: SerializeFn<T>,
+ ) -> Result<()> {
+ // `start_send` is supposed to be called after `poll_ready` returns ready.
+ assert!(self.batch_f.is_none());
+
+ self.buf.clear();
+ ser(t, &mut self.buf);
+ if flags.get_buffer_hint() && self.send_metadata {
+ // temporary fix: buffer hint with send meta will not send out any metadata.
+ flags = flags.buffer_hint(false);
+ }
+ let write_f = call.call(|c| {
+ c.call
+ .start_send_message(&self.buf, flags.flags, self.send_metadata)
+ })?;
+ // NOTE: Content of `self.buf` is copied into grpc internal.
+ if self.buf.capacity() > BUF_SHRINK_SIZE {
+ self.buf.truncate(BUF_SHRINK_SIZE);
+ self.buf.shrink_to_fit();
+ }
+ self.batch_f = Some(write_f);
+ self.send_metadata = false;
+ Ok(())
+ }
+
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>> {
+ match &mut self.batch_f {
+ None => return Poll::Ready(Ok(())),
+ Some(f) => {
+ ready!(Pin::new(f).poll(cx)?);
+ }
+ }
+ self.batch_f.take();
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/src/call/server.rs b/src/call/server.rs
new file mode 100644
index 0000000..8875d6d
--- /dev/null
+++ b/src/call/server.rs
@@ -0,0 +1,766 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::ffi::CStr;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::{result, slice};
+
+use crate::grpc_sys::{
+ self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
+};
+use futures::future::Future;
+use futures::ready;
+use futures::sink::Sink;
+use futures::stream::Stream;
+use futures::task::{Context, Poll};
+use parking_lot::Mutex;
+
+use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
+use crate::auth_context::AuthContext;
+use crate::call::{
+ BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
+};
+use crate::codec::{DeserializeFn, SerializeFn};
+use crate::cq::CompletionQueue;
+use crate::error::{Error, Result};
+use crate::metadata::Metadata;
+use crate::server::{BoxHandler, RequestCallContext};
+use crate::task::{BatchFuture, CallTag, Executor, Kicker};
+
+pub struct Deadline {
+ spec: gpr_timespec,
+}
+
+impl Deadline {
+ fn new(spec: gpr_timespec) -> Deadline {
+ let realtime_spec =
+ unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
+
+ Deadline {
+ spec: realtime_spec,
+ }
+ }
+
+ pub fn exceeded(&self) -> bool {
+ unsafe {
+ let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
+ grpc_sys::gpr_time_cmp(now, self.spec) >= 0
+ }
+ }
+}
+
+/// Context for accepting a request.
+pub struct RequestContext {
+ ctx: *mut grpcwrap_request_call_context,
+ request_call: Option<RequestCallContext>,
+}
+
+impl RequestContext {
+ pub fn new(rc: RequestCallContext) -> RequestContext {
+ let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
+
+ RequestContext {
+ ctx,
+ request_call: Some(rc),
+ }
+ }
+
+ /// Try to accept a client side streaming request.
+ ///
+ /// Return error if the request is a client side unary request.
+ pub fn handle_stream_req(
+ self,
+ cq: &CompletionQueue,
+ rc: &mut RequestCallContext,
+ ) -> result::Result<(), Self> {
+ let handler = unsafe { rc.get_handler(self.method()) };
+ match handler {
+ Some(handler) => match handler.method_type() {
+ MethodType::Unary | MethodType::ServerStreaming => Err(self),
+ _ => {
+ execute(self, cq, None, handler);
+ Ok(())
+ }
+ },
+ None => {
+ execute_unimplemented(self, cq.clone());
+ Ok(())
+ }
+ }
+ }
+
+ /// Accept a client side unary request.
+ ///
+ /// This method should be called after `handle_stream_req`. When handling
+ /// client side unary request, handler will only be called after the unary
+ /// request is received.
+ pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
+ // fetch message before calling callback.
+ let tag = Box::new(CallTag::unary_request(self, rc));
+ let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
+ let request_ctx = tag.request_ctx().unwrap().as_ptr();
+ let tag_ptr = Box::into_raw(tag);
+ unsafe {
+ let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
+ let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
+ if code != grpc_call_error::GRPC_CALL_OK {
+ Box::from_raw(tag_ptr);
+ // it should not failed.
+ panic!("try to receive message fail: {:?}", code);
+ }
+ }
+ }
+
+ pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
+ self.request_call.take()
+ }
+
+ pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
+ self.ctx
+ }
+
+ fn call(&self, cq: CompletionQueue) -> Call {
+ unsafe {
+ // It is okay to use a mutable pointer on a immutable reference, `self`,
+ // because grpcwrap_request_call_context_ref_call is thread-safe.
+ let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
+ assert!(!call.is_null());
+ Call::from_raw(call, cq)
+ }
+ }
+
+ pub fn method(&self) -> &[u8] {
+ let mut len = 0;
+ let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
+
+ unsafe { slice::from_raw_parts(method as _, len) }
+ }
+
+ fn host(&self) -> &[u8] {
+ let mut len = 0;
+ let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
+
+ unsafe { slice::from_raw_parts(host as _, len) }
+ }
+
+ fn deadline(&self) -> Deadline {
+ let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
+
+ Deadline::new(t)
+ }
+
+ fn metadata(&self) -> &Metadata {
+ unsafe {
+ let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
+ let arr_ptr: *const Metadata = ptr as _;
+ &*arr_ptr
+ }
+ }
+
+ fn peer(&self) -> String {
+ unsafe {
+ // RequestContext always holds a reference of the call.
+ let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
+ let p = grpc_sys::grpc_call_get_peer(call);
+ let peer = CStr::from_ptr(p)
+ .to_str()
+ .expect("valid UTF-8 data")
+ .to_owned();
+ grpc_sys::gpr_free(p as _);
+ peer
+ }
+ }
+
+ /// If the server binds in non-secure mode, this will return None
+ fn auth_context(&self) -> Option<AuthContext> {
+ unsafe {
+ let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
+ AuthContext::from_call_ptr(call)
+ }
+ }
+}
+
+impl Drop for RequestContext {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
+ }
+}
+
+/// A context for handling client side unary request.
+pub struct UnaryRequestContext {
+ request: RequestContext,
+ request_call: Option<RequestCallContext>,
+ batch: BatchContext,
+}
+
+impl UnaryRequestContext {
+ pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
+ UnaryRequestContext {
+ request: ctx,
+ request_call: Some(rc),
+ batch: BatchContext::new(),
+ }
+ }
+
+ pub fn batch_ctx(&self) -> &BatchContext {
+ &self.batch
+ }
+
+ pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
+ &mut self.batch
+ }
+
+ pub fn request_ctx(&self) -> &RequestContext {
+ &self.request
+ }
+
+ pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
+ self.request_call.take()
+ }
+
+ pub fn handle(
+ self,
+ rc: &mut RequestCallContext,
+ cq: &CompletionQueue,
+ reader: Option<MessageReader>,
+ ) {
+ let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
+ if reader.is_some() {
+ return execute(self.request, cq, reader, handler);
+ }
+
+ let status = RpcStatus::new(RpcStatusCode::INTERNAL, Some("No payload".to_owned()));
+ self.request.call(cq.clone()).abort(&status)
+ }
+}
+
+/// A stream for client a streaming call and a duplex streaming call.
+///
+/// The corresponding RPC will be canceled if the stream did not
+/// finish before dropping.
+#[must_use = "if unused the RequestStream may immediately cancel the RPC"]
+pub struct RequestStream<T> {
+ call: Arc<Mutex<ShareCall>>,
+ base: StreamingBase,
+ de: DeserializeFn<T>,
+}
+
+impl<T> RequestStream<T> {
+ fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
+ RequestStream {
+ call,
+ base: StreamingBase::new(None),
+ de,
+ }
+ }
+}
+
+impl<T> Stream for RequestStream<T> {
+ type Item = Result<T>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
+ {
+ let mut call = self.call.lock();
+ call.check_alive()?;
+ }
+
+ let t = &mut *self;
+ match ready!(t.base.poll(cx, &mut t.call, false)?) {
+ None => Poll::Ready(None),
+ Some(data) => Poll::Ready(Some((t.de)(data))),
+ }
+ }
+}
+
+impl<T> Drop for RequestStream<T> {
+ /// The corresponding RPC will be canceled if the stream did not
+ /// finish before dropping.
+ fn drop(&mut self) {
+ self.base.on_drop(&mut self.call);
+ }
+}
+
+/// A helper macro used to implement server side unary sink.
+/// Not using generic here because we don't need to expose
+/// `CallHolder` or `Call` to caller.
+// TODO: Use type alias to be friendly for documentation.
+macro_rules! impl_unary_sink {
+ ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
+ pub struct $rt {
+ call: $holder,
+ cq_f: Option<BatchFuture>,
+ err: Option<Error>,
+ }
+
+ impl Future for $rt {
+ type Output = Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ if let Some(e) = self.err.take() {
+ return Poll::Ready(Err(e));
+ }
+
+ if self.cq_f.is_some() {
+ ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
+ self.cq_f.take();
+ }
+
+ ready!(self.call.call(|c| c.poll_finish(cx))?);
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ $(#[$attr])*
+ pub struct $t<T> {
+ call: Option<$holder>,
+ write_flags: u32,
+ ser: SerializeFn<T>,
+ }
+
+ impl<T> $t<T> {
+ fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
+ $t {
+ call: Some(call),
+ write_flags: 0,
+ ser: ser,
+ }
+ }
+
+ pub fn success(self, t: T) -> $rt {
+ self.complete(RpcStatus::ok(), Some(t))
+ }
+
+ pub fn fail(self, status: RpcStatus) -> $rt {
+ self.complete(status, None)
+ }
+
+ fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
+ let data = t.as_ref().map(|t| {
+ let mut buf = vec![];
+ (self.ser)(t, &mut buf);
+ buf
+ });
+
+ let write_flags = self.write_flags;
+ let res = self.call.as_mut().unwrap().call(|c| {
+ c.call
+ .start_send_status_from_server(&status, true, &data, write_flags)
+ });
+
+ let (cq_f, err) = match res {
+ Ok(f) => (Some(f), None),
+ Err(e) => (None, Some(e)),
+ };
+
+ $rt {
+ call: self.call.take().unwrap(),
+ cq_f: cq_f,
+ err: err,
+ }
+ }
+ }
+
+ impl<T> Drop for $t<T> {
+ /// The corresponding RPC will be canceled if the sink did not
+ /// send a response before dropping.
+ fn drop(&mut self) {
+ self.call
+ .as_mut()
+ .map(|call| call.call(|c| c.call.cancel()));
+ }
+ }
+ };
+}
+
+impl_unary_sink!(
+ /// A sink for unary call.
+ ///
+ /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
+ ///
+ /// [`success`]: #method.success
+ /// [`fail`]: #method.fail
+ #[must_use = "if unused the sink may immediately cancel the RPC"]
+ UnarySink,
+ UnarySinkResult,
+ ShareCall
+);
+impl_unary_sink!(
+ /// A sink for client streaming call.
+ ///
+ /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
+ ///
+ /// [`success`]: #method.success
+ /// [`fail`]: #method.fail
+ #[must_use = "if unused the sink may immediately cancel the RPC"]
+ ClientStreamingSink,
+ ClientStreamingSinkResult,
+ Arc<Mutex<ShareCall>>
+);
+
+// A macro helper to implement server side streaming sink.
+macro_rules! impl_stream_sink {
+ ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
+ $(#[$attr])*
+ pub struct $t<T> {
+ call: Option<$holder>,
+ base: SinkBase,
+ flush_f: Option<BatchFuture>,
+ status: RpcStatus,
+ flushed: bool,
+ closed: bool,
+ ser: SerializeFn<T>,
+ }
+
+ impl<T> $t<T> {
+ fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
+ $t {
+ call: Some(call),
+ base: SinkBase::new(true),
+ flush_f: None,
+ status: RpcStatus::ok(),
+ flushed: false,
+ closed: false,
+ ser: ser,
+ }
+ }
+
+ pub fn set_status(&mut self, status: RpcStatus) {
+ assert!(self.flush_f.is_none());
+ self.status = status;
+ }
+
+ pub fn fail(mut self, status: RpcStatus) -> $ft {
+ assert!(self.flush_f.is_none());
+ let send_metadata = self.base.send_metadata;
+ let res = self.call.as_mut().unwrap().call(|c| {
+ c.call
+ .start_send_status_from_server(&status, send_metadata, &None, 0)
+ });
+
+ let (fail_f, err) = match res {
+ Ok(f) => (Some(f), None),
+ Err(e) => (None, Some(e)),
+ };
+
+ $ft {
+ call: self.call.take().unwrap(),
+ fail_f: fail_f,
+ err: err,
+ }
+ }
+ }
+
+ impl<T> Drop for $t<T> {
+ /// The corresponding RPC will be canceled if the sink did not call
+ /// [`close`] or [`fail`] before dropping.
+ ///
+ /// [`close`]: #method.close
+ /// [`fail`]: #method.fail
+ fn drop(&mut self) {
+ // We did not close it explicitly and it was not dropped in the `fail`.
+ if !self.closed && self.call.is_some() {
+ let mut call = self.call.take().unwrap();
+ call.call(|c| c.call.cancel());
+ }
+ }
+ }
+
+ impl<T> Sink<(T, WriteFlags)> for $t<T> {
+ type Error = Error;
+
+ #[inline]
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
+ return Poll::Ready(Err(Error::RemoteStopped));
+ }
+ Pin::new(&mut self.base).poll_ready(cx)
+ }
+
+ #[inline]
+ fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
+ let t = &mut *self;
+ t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser)
+ }
+
+ #[inline]
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
+ return Poll::Ready(Err(Error::RemoteStopped));
+ }
+ Pin::new(&mut self.base).poll_ready(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ if self.flush_f.is_none() {
+ ready!(Pin::new(&mut self.base).poll_ready(cx)?);
+
+ let send_metadata = self.base.send_metadata;
+ let t = &mut *self;
+ let status = &t.status;
+ let flush_f = t.call.as_mut().unwrap().call(|c| {
+ c.call
+ .start_send_status_from_server(status, send_metadata, &None, 0)
+ })?;
+ t.flush_f = Some(flush_f);
+ }
+
+ if !self.flushed {
+ ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
+ self.flushed = true;
+ }
+
+ ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
+ self.closed = true;
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ #[must_use = "if unused the sink failure may immediately cancel the RPC"]
+ pub struct $ft {
+ call: $holder,
+ fail_f: Option<BatchFuture>,
+ err: Option<Error>,
+ }
+
+ impl Future for $ft {
+ type Output = Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
+ if let Some(e) = self.err.take() {
+ return Poll::Ready(Err(e));
+ }
+
+ let readiness = self.call.call(|c| {
+ if c.finished {
+ return Poll::Ready(Ok(()));
+ }
+
+ c.poll_finish(cx).map(|r| r.map(|_| ()))
+ })?;
+
+ if let Some(ref mut f) = self.fail_f {
+ ready!(Pin::new(f).poll(cx)?);
+ }
+
+ self.fail_f.take();
+ readiness.map(Ok)
+ }
+ }
+ };
+}
+
+impl_stream_sink!(
+ /// A sink for server streaming call.
+ ///
+ /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
+ ///
+ /// [`close`]: #method.close
+ /// [`fail`]: #method.fail
+ #[must_use = "if unused the sink may immediately cancel the RPC"]
+ ServerStreamingSink,
+ ServerStreamingSinkFailure,
+ ShareCall
+);
+impl_stream_sink!(
+ /// A sink for duplex streaming call.
+ ///
+ /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
+ ///
+ /// [`close`]: #method.close
+ /// [`fail`]: #method.fail
+ #[must_use = "if unused the sink may immediately cancel the RPC"]
+ DuplexSink,
+ DuplexSinkFailure,
+ Arc<Mutex<ShareCall>>
+);
+
+/// A context for rpc handling.
+pub struct RpcContext<'a> {
+ ctx: RequestContext,
+ executor: Executor<'a>,
+ deadline: Deadline,
+}
+
+impl<'a> RpcContext<'a> {
+ fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
+ RpcContext {
+ deadline: ctx.deadline(),
+ ctx,
+ executor: Executor::new(cq),
+ }
+ }
+
+ fn kicker(&self) -> Kicker {
+ let call = self.call();
+ Kicker::from_call(call)
+ }
+
+ pub(crate) fn call(&self) -> Call {
+ self.ctx.call(self.executor.cq().clone())
+ }
+
+ pub fn method(&self) -> &[u8] {
+ self.ctx.method()
+ }
+
+ pub fn host(&self) -> &[u8] {
+ self.ctx.host()
+ }
+
+ pub fn deadline(&self) -> &Deadline {
+ &self.deadline
+ }
+
+ /// Get the initial metadata sent by client.
+ pub fn request_headers(&self) -> &Metadata {
+ self.ctx.metadata()
+ }
+
+ pub fn peer(&self) -> String {
+ self.ctx.peer()
+ }
+
+ /// Wrapper around the gRPC Core AuthContext
+ ///
+ /// If the server binds in non-secure mode, this will return None
+ pub fn auth_context(&self) -> Option<AuthContext> {
+ self.ctx.auth_context()
+ }
+
+ /// Spawn the future into current gRPC poll thread.
+ ///
+ /// This can reduce a lot of context switching, but please make
+ /// sure there is no heavy work in the future.
+ pub fn spawn<F>(&self, f: F)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ self.executor.spawn(f, self.kicker())
+ }
+}
+
+// Following four helper functions are used to create a callback closure.
+
+macro_rules! accept_call {
+ ($call:expr) => {
+ match $call.start_server_side() {
+ Err(Error::QueueShutdown) => return,
+ Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
+ Ok(f) => f,
+ }
+ };
+}
+
+// Helper function to call a unary handler.
+pub fn execute_unary<P, Q, F>(
+ ctx: RpcContext<'_>,
+ ser: SerializeFn<Q>,
+ de: DeserializeFn<P>,
+ payload: MessageReader,
+ f: &mut F,
+) where
+ F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
+{
+ let mut call = ctx.call();
+ let close_f = accept_call!(call);
+ let request = match de(payload) {
+ Ok(f) => f,
+ Err(e) => {
+ let status = RpcStatus::new(
+ RpcStatusCode::INTERNAL,
+ Some(format!("Failed to deserialize response message: {:?}", e)),
+ );
+ call.abort(&status);
+ return;
+ }
+ };
+ let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
+ f(ctx, request, sink)
+}
+
+// Helper function to call client streaming handler.
+pub fn execute_client_streaming<P, Q, F>(
+ ctx: RpcContext<'_>,
+ ser: SerializeFn<Q>,
+ de: DeserializeFn<P>,
+ f: &mut F,
+) where
+ F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
+{
+ let mut call = ctx.call();
+ let close_f = accept_call!(call);
+ let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
+
+ let req_s = RequestStream::new(call.clone(), de);
+ let sink = ClientStreamingSink::new(call, ser);
+ f(ctx, req_s, sink)
+}
+
+// Helper function to call server streaming handler.
+pub fn execute_server_streaming<P, Q, F>(
+ ctx: RpcContext<'_>,
+ ser: SerializeFn<Q>,
+ de: DeserializeFn<P>,
+ payload: MessageReader,
+ f: &mut F,
+) where
+ F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
+{
+ let mut call = ctx.call();
+ let close_f = accept_call!(call);
+
+ let request = match de(payload) {
+ Ok(t) => t,
+ Err(e) => {
+ let status = RpcStatus::new(
+ RpcStatusCode::INTERNAL,
+ Some(format!("Failed to deserialize response message: {:?}", e)),
+ );
+ call.abort(&status);
+ return;
+ }
+ };
+
+ let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
+ f(ctx, request, sink)
+}
+
+// Helper function to call duplex streaming handler.
+pub fn execute_duplex_streaming<P, Q, F>(
+ ctx: RpcContext<'_>,
+ ser: SerializeFn<Q>,
+ de: DeserializeFn<P>,
+ f: &mut F,
+) where
+ F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
+{
+ let mut call = ctx.call();
+ let close_f = accept_call!(call);
+ let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
+
+ let req_s = RequestStream::new(call.clone(), de);
+ let sink = DuplexSink::new(call, ser);
+ f(ctx, req_s, sink)
+}
+
+// A helper function used to handle all undefined rpc calls.
+pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
+ // Suppress needless-pass-by-value.
+ let ctx = ctx;
+ let mut call = ctx.call(cq);
+ accept_call!(call);
+ call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None))
+}
+
+// Helper function to call handler.
+//
+// Invoked after a request is ready to be handled.
+fn execute(
+ ctx: RequestContext,
+ cq: &CompletionQueue,
+ payload: Option<MessageReader>,
+ f: &mut BoxHandler,
+) {
+ let rpc_ctx = RpcContext::new(ctx, cq);
+ f.handle(rpc_ctx, payload)
+}
diff --git a/src/channel.rs b/src/channel.rs
new file mode 100644
index 0000000..7301fcb
--- /dev/null
+++ b/src/channel.rs
@@ -0,0 +1,622 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::borrow::Cow;
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::ffi::{CStr, CString};
+use std::sync::Arc;
+use std::time::Duration;
+use std::{cmp, i32, ptr};
+
+use crate::grpc_sys::{
+ self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args,
+};
+use libc::{self, c_char, c_int};
+
+use crate::call::{Call, Method};
+use crate::cq::CompletionQueue;
+use crate::env::Environment;
+use crate::error::Result;
+use crate::task::Kicker;
+use crate::CallOption;
+use crate::ResourceQuota;
+
+pub use crate::grpc_sys::{
+ grpc_compression_algorithm as CompressionAlgorithms,
+ grpc_compression_level as CompressionLevel, grpc_connectivity_state as ConnectivityState,
+};
+
+/// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
+fn format_user_agent_string(agent: &str) -> CString {
+ let version = env!("CARGO_PKG_VERSION");
+ let trimed_agent = agent.trim();
+ let val = if trimed_agent.is_empty() {
+ format!("grpc-rust/{}", version)
+ } else {
+ format!("{} grpc-rust/{}", trimed_agent, version)
+ };
+ CString::new(val).unwrap()
+}
+
+fn dur_to_ms(dur: Duration) -> i32 {
+ let millis = dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000;
+ cmp::min(i32::MAX as u64, millis) as i32
+}
+
+enum Options {
+ Integer(i32),
+ String(CString),
+ Pointer(ResourceQuota, *const grpc_arg_pointer_vtable),
+}
+
+/// The optimization target for a [`Channel`].
+#[derive(Clone, Copy)]
+pub enum OptTarget {
+ /// Minimize latency at the cost of throughput.
+ Latency,
+ /// Balance latency and throughput.
+ Blend,
+ /// Maximize throughput at the expense of latency.
+ Throughput,
+}
+
+#[derive(Clone, Copy)]
+pub enum LbPolicy {
+ PickFirst,
+ RoundRobin,
+}
+
+/// [`Channel`] factory in order to configure the properties.
+pub struct ChannelBuilder {
+ env: Arc<Environment>,
+ options: HashMap<Cow<'static, [u8]>, Options>,
+}
+
+impl ChannelBuilder {
+ /// Initialize a new [`ChannelBuilder`].
+ pub fn new(env: Arc<Environment>) -> ChannelBuilder {
+ ChannelBuilder {
+ env,
+ options: HashMap::new(),
+ }
+ }
+
+ /// Set default authority to pass if none specified on call construction.
+ pub fn default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder {
+ let authority = CString::new(authority).unwrap();
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_DEFAULT_AUTHORITY),
+ Options::String(authority),
+ );
+ self
+ }
+
+ /// Set resource quota by consuming a ResourceQuota
+ pub fn set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder {
+ unsafe {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_RESOURCE_QUOTA),
+ Options::Pointer(quota, grpc_sys::grpc_resource_quota_arg_vtable()),
+ );
+ }
+ self
+ }
+
+ /// Set maximum number of concurrent incoming streams to allow on a HTTP/2 connection.
+ pub fn max_concurrent_stream(mut self, num: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_CONCURRENT_STREAMS),
+ Options::Integer(num),
+ );
+ self
+ }
+
+ /// Set maximum message length that the channel can receive. `usize::MAX` means unlimited.
+ pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
+ Options::Integer(len),
+ );
+ self
+ }
+
+ /// Set maximum message length that the channel can send. `-1` means unlimited.
+ pub fn max_send_message_len(mut self, len: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_SEND_MESSAGE_LENGTH),
+ Options::Integer(len),
+ );
+ self
+ }
+
+ /// Set maximum time between subsequent connection attempts.
+ pub fn max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECONNECT_BACKOFF_MS),
+ Options::Integer(dur_to_ms(backoff)),
+ );
+ self
+ }
+
+ /// Set time between the first and second connection attempts.
+ pub fn initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS),
+ Options::Integer(dur_to_ms(backoff)),
+ );
+ self
+ }
+
+ /// Set initial sequence number for HTTP/2 transports.
+ pub fn https_initial_seq_number(mut self, number: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER),
+ Options::Integer(number),
+ );
+ self
+ }
+
+ /// Set amount to read ahead on individual streams. Defaults to 64KB. Larger
+ /// values help throughput on high-latency connections.
+ pub fn stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES),
+ Options::Integer(window_size),
+ );
+ self
+ }
+
+ /// Set primary user agent, which goes at the start of the user-agent metadata sent on
+ /// each request.
+ pub fn primary_user_agent(mut self, agent: &str) -> ChannelBuilder {
+ let agent_string = format_user_agent_string(agent);
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING),
+ Options::String(agent_string),
+ );
+ self
+ }
+
+ /// Set whether to allow the use of `SO_REUSEPORT` if available. Defaults to `true`.
+ pub fn reuse_port(mut self, reuse: bool) -> ChannelBuilder {
+ let opt = if reuse { 1 } else { 0 };
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_ALLOW_REUSEPORT),
+ Options::Integer(opt),
+ );
+ self
+ }
+
+ /// Set the size of slice to try and read from the wire each time.
+ pub fn tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_READ_CHUNK_SIZE),
+ Options::Integer(bytes),
+ );
+ self
+ }
+
+ /// Set the minimum size of slice to try and read from the wire each time.
+ pub fn tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE),
+ Options::Integer(bytes),
+ );
+ self
+ }
+
+ /// Set the maximum size of slice to try and read from the wire each time.
+ pub fn tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE),
+ Options::Integer(bytes),
+ );
+ self
+ }
+
+ /// How much data are we willing to queue up per stream if
+ /// write_buffer_hint is set. This is an upper bound.
+ pub fn http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE),
+ Options::Integer(size),
+ );
+ self
+ }
+
+ /// How big a frame are we willing to receive via HTTP/2.
+ /// Min 16384, max 16777215.
+ /// Larger values give lower CPU usage for large messages, but more head of line
+ /// blocking for small messages.
+ pub fn http2_max_frame_size(mut self, size: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_FRAME_SIZE),
+ Options::Integer(size),
+ );
+ self
+ }
+
+ /// Set whether to enable BDP probing.
+ pub fn http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_BDP_PROBE),
+ Options::Integer(enable as i32),
+ );
+ self
+ }
+
+ /// Minimum time between sending successive ping frames without receiving any
+ /// data frame.
+ pub fn http2_min_sent_ping_interval_without_data(
+ mut self,
+ interval: Duration,
+ ) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS),
+ Options::Integer(dur_to_ms(interval)),
+ );
+ self
+ }
+
+ /// Minimum allowed time between receiving successive ping frames without
+ /// sending any data frame.
+ pub fn http2_min_recv_ping_interval_without_data(
+ mut self,
+ interval: Duration,
+ ) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
+ Options::Integer(dur_to_ms(interval)),
+ );
+ self
+ }
+
+ /// How many pings can we send before needing to send a data frame or header
+ /// frame? (0 indicates that an infinite number of pings can be sent without
+ /// sending a data frame or header frame)
+ pub fn http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA),
+ Options::Integer(num),
+ );
+ self
+ }
+
+ /// How many misbehaving pings the server can bear before sending goaway and
+ /// closing the transport? (0 indicates that the server can bear an infinite
+ /// number of misbehaving pings)
+ pub fn http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PING_STRIKES),
+ Options::Integer(num),
+ );
+ self
+ }
+
+ /// Set default compression algorithm for the channel.
+ pub fn default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM),
+ Options::Integer(algo as i32),
+ );
+ self
+ }
+
+ /// Set default compression level for the channel.
+ pub fn default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL),
+ Options::Integer(level as i32),
+ );
+ self
+ }
+
+ /// After a duration of this time the client/server pings its peer to see
+ /// if the transport is still alive.
+ pub fn keepalive_time(mut self, timeout: Duration) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIME_MS),
+ Options::Integer(dur_to_ms(timeout)),
+ );
+ self
+ }
+
+ /// After waiting for a duration of this time, if the keepalive ping sender does
+ /// not receive the ping ack, it will close the transport.
+ pub fn keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIMEOUT_MS),
+ Options::Integer(dur_to_ms(timeout)),
+ );
+ self
+ }
+
+ /// Is it permissible to send keepalive pings without any outstanding streams.
+ pub fn keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder {
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS),
+ Options::Integer(allow as i32),
+ );
+ self
+ }
+
+ /// Set optimization target for the channel. See [`OptTarget`] for all available
+ /// optimization targets. Defaults to `OptTarget::Blend`.
+ pub fn optimize_for(mut self, target: OptTarget) -> ChannelBuilder {
+ let val = match target {
+ OptTarget::Latency => CString::new("latency"),
+ OptTarget::Blend => CString::new("blend"),
+ OptTarget::Throughput => CString::new("throughput"),
+ };
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_OPTIMIZATION_TARGET),
+ Options::String(val.unwrap()),
+ );
+ self
+ }
+
+ /// Set LbPolicy for channel
+ ///
+ /// This method allows one to set the load-balancing policy for a given channel.
+ pub fn load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder {
+ let val = match lb_policy {
+ LbPolicy::PickFirst => CString::new("pick_first"),
+ LbPolicy::RoundRobin => CString::new("round_robin"),
+ };
+ self.options.insert(
+ Cow::Borrowed(grpcio_sys::GRPC_ARG_LB_POLICY_NAME),
+ Options::String(val.unwrap()),
+ );
+ self
+ }
+
+ /// Set a raw integer configuration.
+ ///
+ /// This method is only for bench usage, users should use the encapsulated API instead.
+ #[doc(hidden)]
+ pub fn raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder {
+ self.options
+ .insert(Cow::Owned(key.into_bytes_with_nul()), Options::Integer(val));
+ self
+ }
+
+ /// Set a raw string configuration.
+ ///
+ /// This method is only for bench usage, users should use the encapsulated API instead.
+ #[doc(hidden)]
+ pub fn raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder {
+ self.options
+ .insert(Cow::Owned(key.into_bytes_with_nul()), Options::String(val));
+ self
+ }
+
+ /// Build `ChannelArgs` from the current configuration.
+ #[allow(clippy::identity_conversion)]
+ pub fn build_args(&self) -> ChannelArgs {
+ let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
+ for (i, (k, v)) in self.options.iter().enumerate() {
+ let key = k.as_ptr() as *const c_char;
+ match *v {
+ Options::Integer(val) => unsafe {
+ // On most modern compiler and architect, c_int is the same as i32,
+ // panic directly to simplify signature.
+ assert!(
+ val <= i32::from(libc::INT_MAX) && val >= i32::from(libc::INT_MIN),
+ "{} is out of range for {:?}",
+ val,
+ CStr::from_bytes_with_nul(k).unwrap()
+ );
+ grpc_sys::grpcwrap_channel_args_set_integer(args, i, key, val as c_int)
+ },
+ Options::String(ref val) => unsafe {
+ grpc_sys::grpcwrap_channel_args_set_string(args, i, key, val.as_ptr())
+ },
+ Options::Pointer(ref quota, vtable) => unsafe {
+ grpc_sys::grpcwrap_channel_args_set_pointer_vtable(
+ args,
+ i,
+ key,
+ quota.get_ptr() as _,
+ vtable,
+ )
+ },
+ }
+ }
+ ChannelArgs { args }
+ }
+
+ fn prepare_connect_args(&mut self) -> ChannelArgs {
+ if let Entry::Vacant(e) = self.options.entry(Cow::Borrowed(
+ grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING,
+ )) {
+ e.insert(Options::String(format_user_agent_string("")));
+ }
+ self.build_args()
+ }
+
+ /// Build an insecure [`Channel`] that connects to a specific address.
+ pub fn connect(mut self, addr: &str) -> Channel {
+ let args = self.prepare_connect_args();
+ let addr = CString::new(addr).unwrap();
+ let addr_ptr = addr.as_ptr();
+ let channel =
+ unsafe { grpc_sys::grpc_insecure_channel_create(addr_ptr, args.args, ptr::null_mut()) };
+
+ Channel::new(self.env.pick_cq(), self.env, channel)
+ }
+}
+
+#[cfg(feature = "secure")]
+mod secure_channel {
+ use std::borrow::Cow;
+ use std::ffi::CString;
+ use std::ptr;
+
+ use crate::grpc_sys;
+
+ use crate::ChannelCredentials;
+
+ use super::{Channel, ChannelBuilder, Options};
+
+ const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0";
+
+ impl ChannelBuilder {
+ /// The caller of the secure_channel_create functions may override the target name used
+ /// for SSL host name checking using this channel argument.
+ ///
+ /// This *should* be used for testing only.
+ #[doc(hidden)]
+ pub fn override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder {
+ let target = CString::new(target).unwrap();
+ self.options.insert(
+ Cow::Borrowed(OPT_SSL_TARGET_NAME_OVERRIDE),
+ Options::String(target),
+ );
+ self
+ }
+
+ /// Build a secure [`Channel`] that connects to a specific address.
+ pub fn secure_connect(mut self, addr: &str, mut creds: ChannelCredentials) -> Channel {
+ let args = self.prepare_connect_args();
+ let addr = CString::new(addr).unwrap();
+ let addr_ptr = addr.as_ptr();
+ let channel = unsafe {
+ grpc_sys::grpc_secure_channel_create(
+ creds.as_mut_ptr(),
+ addr_ptr,
+ args.args,
+ ptr::null_mut(),
+ )
+ };
+
+ Channel::new(self.env.pick_cq(), self.env, channel)
+ }
+ }
+}
+
+pub struct ChannelArgs {
+ args: *mut grpc_channel_args,
+}
+
+impl ChannelArgs {
+ pub fn as_ptr(&self) -> *const grpc_channel_args {
+ self.args
+ }
+}
+
+impl Drop for ChannelArgs {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpcwrap_channel_args_destroy(self.args) }
+ }
+}
+
+struct ChannelInner {
+ _env: Arc<Environment>,
+ channel: *mut grpc_channel,
+}
+
+impl ChannelInner {
+ // If try_to_connect is true, the channel will try to establish a connection, potentially
+ // changing the state.
+ fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
+ let should_try = if try_to_connect { 1 } else { 0 };
+ unsafe { grpc_sys::grpc_channel_check_connectivity_state(self.channel, should_try) }
+ }
+}
+
+impl Drop for ChannelInner {
+ fn drop(&mut self) {
+ unsafe {
+ grpc_sys::grpc_channel_destroy(self.channel);
+ }
+ }
+}
+
+/// A gRPC channel.
+///
+/// Channels are an abstraction of long-lived connections to remote servers. More client objects
+/// can reuse the same channel.
+///
+/// Use [`ChannelBuilder`] to build a [`Channel`].
+#[derive(Clone)]
+pub struct Channel {
+ inner: Arc<ChannelInner>,
+ cq: CompletionQueue,
+}
+
+unsafe impl Send for Channel {}
+unsafe impl Sync for Channel {}
+
+impl Channel {
+ fn new(cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel) -> Channel {
+ Channel {
+ inner: Arc::new(ChannelInner { _env: env, channel }),
+ cq,
+ }
+ }
+
+ // If try_to_connect is true, the channel will try to establish a connection, potentially
+ // changing the state.
+ pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
+ self.inner.check_connectivity_state(try_to_connect)
+ }
+
+ /// Create a Kicker.
+ pub(crate) fn create_kicker(&self) -> Result<Kicker> {
+ let cq_ref = self.cq.borrow()?;
+ let raw_call = unsafe {
+ let ch = self.inner.channel;
+ let cq = cq_ref.as_ptr();
+ // Do not timeout.
+ let timeout = gpr_timespec::inf_future();
+ grpc_sys::grpcwrap_channel_create_call(
+ ch,
+ ptr::null_mut(),
+ 0,
+ cq,
+ ptr::null(),
+ 0,
+ ptr::null(),
+ 0,
+ timeout,
+ )
+ };
+ let call = unsafe { Call::from_raw(raw_call, self.cq.clone()) };
+ Ok(Kicker::from_call(call))
+ }
+
+ /// Create a call using the method and option.
+ pub(crate) fn create_call<Req, Resp>(
+ &self,
+ method: &Method<Req, Resp>,
+ opt: &CallOption,
+ ) -> Result<Call> {
+ let cq_ref = self.cq.borrow()?;
+ let raw_call = unsafe {
+ let ch = self.inner.channel;
+ let cq = cq_ref.as_ptr();
+ let method_ptr = method.name.as_ptr();
+ let method_len = method.name.len();
+ let timeout = opt
+ .get_timeout()
+ .map_or_else(gpr_timespec::inf_future, gpr_timespec::from);
+ grpc_sys::grpcwrap_channel_create_call(
+ ch,
+ ptr::null_mut(),
+ 0,
+ cq,
+ method_ptr as *const _,
+ method_len,
+ ptr::null(),
+ 0,
+ timeout,
+ )
+ };
+
+ unsafe { Ok(Call::from_raw(raw_call, self.cq.clone())) }
+ }
+
+ pub(crate) fn cq(&self) -> &CompletionQueue {
+ &self.cq
+ }
+}
diff --git a/src/client.rs b/src/client.rs
new file mode 100644
index 0000000..4cce793
--- /dev/null
+++ b/src/client.rs
@@ -0,0 +1,100 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::call::client::{
+ CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver,
+ ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver,
+};
+use crate::call::{Call, Method};
+use crate::channel::Channel;
+use crate::error::Result;
+use crate::task::Executor;
+use crate::task::Kicker;
+use futures::executor::block_on;
+use futures::Future;
+
+/// A generic client for making RPC calls.
+#[derive(Clone)]
+pub struct Client {
+ channel: Channel,
+ // Used to kick its completion queue.
+ kicker: Kicker,
+}
+
+impl Client {
+ /// Initialize a new [`Client`].
+ pub fn new(channel: Channel) -> Client {
+ let kicker = channel.create_kicker().unwrap();
+ Client { channel, kicker }
+ }
+
+ /// Create a synchronized unary RPC call.
+ ///
+ /// It uses futures::executor::block_on to wait for the futures. It's recommended to use
+ /// the asynchronous version.
+ pub fn unary_call<Req, Resp>(
+ &self,
+ method: &Method<Req, Resp>,
+ req: &Req,
+ opt: CallOption,
+ ) -> Result<Resp> {
+ block_on(self.unary_call_async(method, req, opt)?)
+ }
+
+ /// Create an asynchronized unary RPC call.
+ pub fn unary_call_async<Req, Resp>(
+ &self,
+ method: &Method<Req, Resp>,
+ req: &Req,
+ opt: CallOption,
+ ) -> Result<ClientUnaryReceiver<Resp>> {
+ Call::unary_async(&self.channel, method, req, opt)
+ }
+
+ /// Create an asynchronized client streaming call.
+ ///
+ /// Client can send a stream of requests and server responds with a single response.
+ pub fn client_streaming<Req, Resp>(
+ &self,
+ method: &Method<Req, Resp>,
+ opt: CallOption,
+ ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> {
+ Call::client_streaming(&self.channel, method, opt)
+ }
+
+ /// Create an asynchronized server streaming call.
+ ///
+ /// Client sends on request and server responds with a stream of responses.
+ pub fn server_streaming<Req, Resp>(
+ &self,
+ method: &Method<Req, Resp>,
+ req: &Req,
+ opt: CallOption,
+ ) -> Result<ClientSStreamReceiver<Resp>> {
+ Call::server_streaming(&self.channel, method, req, opt)
+ }
+
+ /// Create an asynchronized duplex streaming call.
+ ///
+ /// Client sends a stream of requests and server responds with a stream of responses.
+ /// The response stream is completely independent and both side can be sending messages
+ /// at the same time.
+ pub fn duplex_streaming<Req, Resp>(
+ &self,
+ method: &Method<Req, Resp>,
+ opt: CallOption,
+ ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> {
+ Call::duplex_streaming(&self.channel, method, opt)
+ }
+
+ /// Spawn the future into current gRPC poll thread.
+ ///
+ /// This can reduce a lot of context switching, but please make
+ /// sure there is no heavy work in the future.
+ pub fn spawn<F>(&self, f: F)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let kicker = self.kicker.clone();
+ Executor::new(self.channel.cq()).spawn(f, kicker)
+ }
+}
diff --git a/src/codec.rs b/src/codec.rs
new file mode 100644
index 0000000..4a84489
--- /dev/null
+++ b/src/codec.rs
@@ -0,0 +1,67 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::call::MessageReader;
+use crate::error::Result;
+
+pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>;
+pub type SerializeFn<T> = fn(&T, &mut Vec<u8>);
+
+/// Defines how to serialize and deserialize between the specialized type and byte slice.
+pub struct Marshaller<T> {
+ // Use function pointer here to simplify the signature.
+ // Compiler will probably inline the function so performance
+ // impact can be omitted.
+ //
+ // Using trait will require a trait object or generic, which will
+ // either have performance impact or make signature complicated.
+ //
+ // const function is not stable yet (rust-lang/rust#24111), hence
+ // make all fields public.
+ /// The serialize function.
+ pub ser: SerializeFn<T>,
+
+ /// The deserialize function.
+ pub de: DeserializeFn<T>,
+}
+
+#[cfg(feature = "protobuf-codec")]
+pub mod pb_codec {
+ use protobuf::{CodedInputStream, Message};
+
+ use super::MessageReader;
+ use crate::error::Result;
+
+ #[inline]
+ pub fn ser<T: Message>(t: &T, buf: &mut Vec<u8>) {
+ t.write_to_vec(buf).unwrap()
+ }
+
+ #[inline]
+ pub fn de<T: Message>(mut reader: MessageReader) -> Result<T> {
+ let mut s = CodedInputStream::from_buffered_reader(&mut reader);
+ let mut m = T::new();
+ m.merge_from(&mut s)?;
+ Ok(m)
+ }
+}
+
+#[cfg(feature = "prost-codec")]
+pub mod pr_codec {
+ use bytes::buf::BufMut;
+ use prost::Message;
+
+ use super::MessageReader;
+ use crate::error::Result;
+
+ #[inline]
+ pub fn ser<M: Message, B: BufMut>(msg: &M, buf: &mut B) {
+ msg.encode(buf).expect("Writing message to buffer failed");
+ }
+
+ #[inline]
+ pub fn de<M: Message + Default>(mut reader: MessageReader) -> Result<M> {
+ use bytes::buf::Buf;
+ reader.advance(0);
+ M::decode(reader).map_err(Into::into)
+ }
+}
diff --git a/src/cq.rs b/src/cq.rs
new file mode 100644
index 0000000..60b6bb3
--- /dev/null
+++ b/src/cq.rs
@@ -0,0 +1,216 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::cell::UnsafeCell;
+use std::collections::VecDeque;
+use std::ptr;
+use std::sync::atomic::{AtomicIsize, Ordering};
+use std::sync::Arc;
+use std::thread::{self, ThreadId};
+
+use crate::error::{Error, Result};
+use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue};
+use crate::task::UnfinishedWork;
+
+pub use crate::grpc_sys::grpc_completion_type as EventType;
+pub use crate::grpc_sys::grpc_event as Event;
+
+/// `CompletionQueueHandle` enable notification of the completion of asynchronous actions.
+pub struct CompletionQueueHandle {
+ cq: *mut grpc_completion_queue,
+ // When `ref_cnt` < 0, a shutdown is pending, completion queue should not
+ // accept requests anymore; when `ref_cnt` == 0, completion queue should
+ // be shutdown; When `ref_cnt` > 0, completion queue can accept requests
+ // and should not be shutdown.
+ ref_cnt: AtomicIsize,
+}
+
+unsafe impl Sync for CompletionQueueHandle {}
+unsafe impl Send for CompletionQueueHandle {}
+
+impl CompletionQueueHandle {
+ pub fn new() -> CompletionQueueHandle {
+ CompletionQueueHandle {
+ cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) },
+ ref_cnt: AtomicIsize::new(1),
+ }
+ }
+
+ fn add_ref(&self) -> Result<()> {
+ loop {
+ let cnt = self.ref_cnt.load(Ordering::SeqCst);
+ if cnt <= 0 {
+ // `shutdown` has been called, reject any requests.
+ return Err(Error::QueueShutdown);
+ }
+ let new_cnt = cnt + 1;
+ if cnt
+ == self
+ .ref_cnt
+ .compare_and_swap(cnt, new_cnt, Ordering::SeqCst)
+ {
+ return Ok(());
+ }
+ }
+ }
+
+ fn unref(&self) {
+ let shutdown = loop {
+ let cnt = self.ref_cnt.load(Ordering::SeqCst);
+ // If `shutdown` is not called, `cnt` > 0, so minus 1 to unref.
+ // If `shutdown` is called, `cnt` < 0, so plus 1 to unref.
+ let new_cnt = cnt - cnt.signum();
+ if cnt
+ == self
+ .ref_cnt
+ .compare_and_swap(cnt, new_cnt, Ordering::SeqCst)
+ {
+ break new_cnt == 0;
+ }
+ };
+ if shutdown {
+ unsafe {
+ grpc_sys::grpc_completion_queue_shutdown(self.cq);
+ }
+ }
+ }
+
+ fn shutdown(&self) {
+ let shutdown = loop {
+ let cnt = self.ref_cnt.load(Ordering::SeqCst);
+ if cnt <= 0 {
+ // `shutdown` is called, skipped.
+ return;
+ }
+ // Make cnt negative to indicate that `shutdown` has been called.
+ // Because `cnt` is initialized to 1, so minus 1 to make it reach
+ // toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`.
+ let new_cnt = -cnt + 1;
+ if cnt
+ == self
+ .ref_cnt
+ .compare_and_swap(cnt, new_cnt, Ordering::SeqCst)
+ {
+ break new_cnt == 0;
+ }
+ };
+ if shutdown {
+ unsafe {
+ grpc_sys::grpc_completion_queue_shutdown(self.cq);
+ }
+ }
+ }
+}
+
+impl Drop for CompletionQueueHandle {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) }
+ }
+}
+
+pub struct CompletionQueueRef<'a> {
+ queue: &'a CompletionQueue,
+}
+
+impl<'a> CompletionQueueRef<'a> {
+ pub fn as_ptr(&self) -> *mut grpc_completion_queue {
+ self.queue.handle.cq
+ }
+}
+
+impl<'a> Drop for CompletionQueueRef<'a> {
+ fn drop(&mut self) {
+ self.queue.handle.unref();
+ }
+}
+
+/// `WorkQueue` stores the unfinished work of a completion queue.
+///
+/// Every completion queue has a work queue, and every work queue belongs
+/// to exact one completion queue. `WorkQueue` is a short path for future
+/// notifications. When a future is ready to be polled, there are two way
+/// to notify it.
+/// 1. If it's in the same thread where the future is spawned, the future
+/// will be pushed into `WorkQueue` and be polled when current call tag
+/// is handled;
+/// 2. If not, the future will be wrapped as a call tag and pushed into
+/// completion queue and finally popped at the call to `grpc_completion_queue_next`.
+pub struct WorkQueue {
+ id: ThreadId,
+ pending_work: UnsafeCell<VecDeque<UnfinishedWork>>,
+}
+
+unsafe impl Sync for WorkQueue {}
+unsafe impl Send for WorkQueue {}
+
+const QUEUE_CAPACITY: usize = 4096;
+
+impl WorkQueue {
+ pub fn new() -> WorkQueue {
+ WorkQueue {
+ id: std::thread::current().id(),
+ pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
+ }
+ }
+
+ /// Pushes an unfinished work into the inner queue.
+ ///
+ /// If the method is not called from the same thread where it's created,
+ /// the work will returned and no work is pushed.
+ pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> {
+ if self.id == thread::current().id() {
+ unsafe { &mut *self.pending_work.get() }.push_back(work);
+ None
+ } else {
+ Some(work)
+ }
+ }
+
+ /// Pops one unfinished work.
+ ///
+ /// It should only be called from the same thread where the queue is created.
+ /// Otherwise it leads to undefined behavior.
+ pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> {
+ let queue = &mut *self.pending_work.get();
+ if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 {
+ queue.shrink_to_fit();
+ }
+ { &mut *self.pending_work.get() }.pop_back()
+ }
+}
+
+#[derive(Clone)]
+pub struct CompletionQueue {
+ handle: Arc<CompletionQueueHandle>,
+ pub(crate) worker: Arc<WorkQueue>,
+}
+
+impl CompletionQueue {
+ pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue {
+ CompletionQueue { handle, worker }
+ }
+
+ /// Blocks until an event is available, the completion queue is being shut down.
+ pub fn next(&self) -> Event {
+ unsafe {
+ let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME);
+ grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut())
+ }
+ }
+
+ pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> {
+ self.handle.add_ref()?;
+ Ok(CompletionQueueRef { queue: self })
+ }
+
+ /// Begin destruction of a completion queue.
+ ///
+ /// Once all possible events are drained then `next()` will start to produce
+ /// `Event::QueueShutdown` events only.
+ pub fn shutdown(&self) {
+ self.handle.shutdown()
+ }
+
+ pub fn worker_id(&self) -> ThreadId {
+ self.worker.id
+ }
+}
diff --git a/src/env.rs b/src/env.rs
new file mode 100644
index 0000000..8bad45e
--- /dev/null
+++ b/src/env.rs
@@ -0,0 +1,174 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc;
+use std::sync::Arc;
+use std::thread::{Builder as ThreadBuilder, JoinHandle};
+
+use crate::grpc_sys;
+
+use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
+use crate::task::CallTag;
+
+// event loop
+fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
+ let cq = Arc::new(CompletionQueueHandle::new());
+ let worker_info = Arc::new(WorkQueue::new());
+ let cq = CompletionQueue::new(cq, worker_info);
+ tx.send(cq.clone()).expect("send back completion queue");
+ loop {
+ let e = cq.next();
+ match e.type_ {
+ EventType::GRPC_QUEUE_SHUTDOWN => break,
+ // timeout should not happen in theory.
+ EventType::GRPC_QUEUE_TIMEOUT => continue,
+ EventType::GRPC_OP_COMPLETE => {}
+ }
+
+ let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
+
+ tag.resolve(&cq, e.success != 0);
+ while let Some(work) = unsafe { cq.worker.pop_work() } {
+ work.finish();
+ }
+ }
+}
+
+/// [`Environment`] factory in order to configure the properties.
+pub struct EnvBuilder {
+ cq_count: usize,
+ name_prefix: Option<String>,
+}
+
+impl EnvBuilder {
+ /// Initialize a new [`EnvBuilder`].
+ pub fn new() -> EnvBuilder {
+ EnvBuilder {
+ cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
+ name_prefix: None,
+ }
+ }
+
+ /// Set the number of completion queues and polling threads. Each thread polls
+ /// one completion queue.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `count` is 0.
+ pub fn cq_count(mut self, count: usize) -> EnvBuilder {
+ assert!(count > 0);
+ self.cq_count = count;
+ self
+ }
+
+ /// Set the thread name prefix of each polling thread.
+ pub fn name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder {
+ self.name_prefix = Some(prefix.into());
+ self
+ }
+
+ /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
+ pub fn build(self) -> Environment {
+ unsafe {
+ grpc_sys::grpc_init();
+ }
+ let mut cqs = Vec::with_capacity(self.cq_count);
+ let mut handles = Vec::with_capacity(self.cq_count);
+ let (tx, rx) = mpsc::channel();
+ for i in 0..self.cq_count {
+ let tx_i = tx.clone();
+ let mut builder = ThreadBuilder::new();
+ if let Some(ref prefix) = self.name_prefix {
+ builder = builder.name(format!("{}-{}", prefix, i));
+ }
+ let handle = builder.spawn(move || poll_queue(tx_i)).unwrap();
+ handles.push(handle);
+ }
+ for _ in 0..self.cq_count {
+ cqs.push(rx.recv().unwrap());
+ }
+
+ Environment {
+ cqs,
+ idx: AtomicUsize::new(0),
+ _handles: handles,
+ }
+ }
+}
+
+/// An object that used to control concurrency and start gRPC event loop.
+pub struct Environment {
+ cqs: Vec<CompletionQueue>,
+ idx: AtomicUsize,
+ _handles: Vec<JoinHandle<()>>,
+}
+
+impl Environment {
+ /// Initialize gRPC and create a thread pool to poll completion queue. The thread pool size
+ /// and the number of completion queue is specified by `cq_count`. Each thread polls one
+ /// completion queue.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `cq_count` is 0.
+ pub fn new(cq_count: usize) -> Environment {
+ assert!(cq_count > 0);
+ EnvBuilder::new()
+ .name_prefix("grpc-poll")
+ .cq_count(cq_count)
+ .build()
+ }
+
+ /// Get all the created completion queues.
+ pub fn completion_queues(&self) -> &[CompletionQueue] {
+ self.cqs.as_slice()
+ }
+
+ /// Pick an arbitrary completion queue.
+ pub fn pick_cq(&self) -> CompletionQueue {
+ let idx = self.idx.fetch_add(1, Ordering::Relaxed);
+ self.cqs[idx % self.cqs.len()].clone()
+ }
+}
+
+impl Drop for Environment {
+ fn drop(&mut self) {
+ for cq in self.completion_queues() {
+ // it's safe to shutdown more than once.
+ cq.shutdown()
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_basic_loop() {
+ let mut env = Environment::new(2);
+
+ let q1 = env.pick_cq();
+ let q2 = env.pick_cq();
+ let q3 = env.pick_cq();
+ let cases = vec![(&q1, &q3, true), (&q1, &q2, false)];
+ for (lq, rq, is_eq) in cases {
+ let lq_ref = lq.borrow().unwrap();
+ let rq_ref = rq.borrow().unwrap();
+ if is_eq {
+ assert_eq!(lq_ref.as_ptr(), rq_ref.as_ptr());
+ } else {
+ assert_ne!(lq_ref.as_ptr(), rq_ref.as_ptr());
+ }
+ }
+
+ assert_eq!(env.completion_queues().len(), 2);
+ for cq in env.completion_queues() {
+ cq.shutdown();
+ }
+
+ for handle in env._handles.drain(..) {
+ handle.join().unwrap();
+ }
+ }
+}
diff --git a/src/error.rs b/src/error.rs
new file mode 100644
index 0000000..f12ffa4
--- /dev/null
+++ b/src/error.rs
@@ -0,0 +1,92 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::{error, fmt, result};
+
+use crate::call::RpcStatus;
+use crate::grpc_sys::grpc_call_error;
+
+#[cfg(feature = "prost-codec")]
+use prost::DecodeError;
+#[cfg(feature = "protobuf-codec")]
+use protobuf::ProtobufError;
+
+/// Errors generated from this library.
+#[derive(Debug)]
+pub enum Error {
+ /// Codec error.
+ Codec(Box<dyn error::Error + Send + Sync>),
+ /// Failed to start an internal async call.
+ CallFailure(grpc_call_error),
+ /// Rpc request fail.
+ RpcFailure(RpcStatus),
+ /// Try to write to a finished rpc call.
+ RpcFinished(Option<RpcStatus>),
+ /// Remote is stopped.
+ RemoteStopped,
+ /// Failed to shutdown.
+ ShutdownFailed,
+ /// Failed to bind.
+ BindFail(String, u16),
+ /// gRPC completion queue is shutdown.
+ QueueShutdown,
+ /// Failed to create Google default credentials.
+ GoogleAuthenticationFailed,
+ /// Invalid format of metadata.
+ InvalidMetadata(String),
+}
+
+impl fmt::Display for Error {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Error::RpcFailure(RpcStatus { status, details }) => match details {
+ Some(details) => write!(fmt, "RpcFailure: {} {}", status, details),
+ None => write!(fmt, "RpcFailure: {}", status),
+ },
+ other_error => write!(fmt, "{:?}", other_error),
+ }
+ }
+}
+
+impl error::Error for Error {
+ fn source(&self) -> Option<&(dyn error::Error + 'static)> {
+ match *self {
+ Error::Codec(ref e) => Some(e.as_ref()),
+ _ => None,
+ }
+ }
+}
+
+#[cfg(feature = "protobuf-codec")]
+impl From<ProtobufError> for Error {
+ fn from(e: ProtobufError) -> Error {
+ Error::Codec(Box::new(e))
+ }
+}
+
+#[cfg(feature = "prost-codec")]
+impl From<DecodeError> for Error {
+ fn from(e: DecodeError) -> Error {
+ Error::Codec(Box::new(e))
+ }
+}
+
+/// Type alias to use this library's [`Error`] type in a `Result`.
+pub type Result<T> = result::Result<T, Error>;
+
+#[cfg(all(test, feature = "protobuf-codec"))]
+mod tests {
+ use std::error::Error as StdError;
+
+ use protobuf::error::WireError;
+ use protobuf::ProtobufError;
+
+ use super::Error;
+
+ #[test]
+ fn test_convert() {
+ let error = ProtobufError::WireError(WireError::UnexpectedEof);
+ let e: Error = error.into();
+ assert_eq!(e.to_string(), "Codec(WireError(UnexpectedEof))");
+ assert!(e.source().is_some());
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..321e1f2
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,79 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+/*!
+
+[grpcio] is a Rust implementation of [gRPC], which is a high performance, open source universal RPC
+framework that puts mobile and HTTP/2 first. grpcio is built on [gRPC Core] and [futures-rs].
+
+[grpcio]: https://github.com/tikv/grpc-rs/
+[gRPC]: https://grpc.io/
+[gRPC Core]: https://github.com/grpc/grpc
+[futures-rs]: https://github.com/rust-lang/futures-rs
+
+## Optional features
+
+- **`secure`** *(enabled by default)* - Enables support for TLS encryption and some authentication
+ mechanisms.
+
+*/
+
+#![allow(clippy::new_without_default)]
+#![allow(clippy::new_without_default)]
+#![allow(clippy::cast_lossless)]
+#![allow(clippy::option_map_unit_fn)]
+
+use grpcio_sys as grpc_sys;
+#[macro_use]
+extern crate log;
+
+mod auth_context;
+mod buf;
+mod call;
+mod channel;
+mod client;
+mod codec;
+mod cq;
+mod env;
+mod error;
+mod log_util;
+mod metadata;
+mod quota;
+#[cfg(feature = "secure")]
+mod security;
+mod server;
+mod task;
+
+pub use crate::call::client::{
+ CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver,
+ ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, StreamingCallSink,
+};
+pub use crate::call::server::{
+ ClientStreamingSink, ClientStreamingSinkResult, Deadline, DuplexSink, DuplexSinkFailure,
+ RequestStream, RpcContext, ServerStreamingSink, ServerStreamingSinkFailure, UnarySink,
+ UnarySinkResult,
+};
+pub use crate::call::{MessageReader, Method, MethodType, RpcStatus, RpcStatusCode, WriteFlags};
+pub use crate::channel::{
+ Channel, ChannelBuilder, CompressionAlgorithms, CompressionLevel, ConnectivityState, LbPolicy,
+ OptTarget,
+};
+pub use crate::client::Client;
+
+#[cfg(feature = "protobuf-codec")]
+pub use crate::codec::pb_codec::{de as pb_de, ser as pb_ser};
+#[cfg(feature = "prost-codec")]
+pub use crate::codec::pr_codec::{de as pr_de, ser as pr_ser};
+
+pub use crate::auth_context::{AuthContext, AuthProperty, AuthPropertyIter};
+pub use crate::codec::Marshaller;
+pub use crate::env::{EnvBuilder, Environment};
+pub use crate::error::{Error, Result};
+pub use crate::log_util::redirect_log;
+pub use crate::metadata::{Metadata, MetadataBuilder, MetadataIter};
+pub use crate::quota::ResourceQuota;
+#[cfg(feature = "secure")]
+pub use crate::security::{
+ CertificateRequestType, ChannelCredentials, ChannelCredentialsBuilder, ServerCredentials,
+ ServerCredentialsBuilder, ServerCredentialsFetcher,
+};
+pub use crate::server::{Server, ServerBuilder, Service, ServiceBuilder, ShutdownFuture};
diff --git a/src/log_util.rs b/src/log_util.rs
new file mode 100644
index 0000000..3a0cfd6
--- /dev/null
+++ b/src/log_util.rs
@@ -0,0 +1,57 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::ffi::CStr;
+
+use crate::grpc_sys::{self, gpr_log_func_args, gpr_log_severity};
+use log::{self, Level, LevelFilter, Record};
+
+#[inline]
+fn severity_to_log_level(severity: gpr_log_severity) -> Level {
+ match severity {
+ gpr_log_severity::GPR_LOG_SEVERITY_DEBUG => Level::Debug,
+ gpr_log_severity::GPR_LOG_SEVERITY_INFO => Level::Info,
+ gpr_log_severity::GPR_LOG_SEVERITY_ERROR => Level::Error,
+ }
+}
+
+extern "C" fn delegate(c_args: *mut gpr_log_func_args) {
+ let args = unsafe { &*c_args };
+ let level = severity_to_log_level(args.severity);
+ if !log_enabled!(level) {
+ return;
+ }
+
+ // can't panic.
+ let file_str = unsafe { CStr::from_ptr(args.file).to_str().unwrap() };
+ let line = args.line as u32;
+
+ let msg = unsafe { CStr::from_ptr(args.message).to_string_lossy() };
+ log::logger().log(
+ &Record::builder()
+ .args(format_args!("{}", msg))
+ .level(level)
+ .file(file_str.into())
+ .line(line.into())
+ .module_path(module_path!().into())
+ .build(),
+ );
+}
+
+/// Redirect grpc log to rust's log implementation.
+pub fn redirect_log() {
+ let level = match log::max_level() {
+ LevelFilter::Off => unsafe {
+ // disable log.
+ grpc_sys::gpr_set_log_function(None);
+ return;
+ },
+ LevelFilter::Error | LevelFilter::Warn => gpr_log_severity::GPR_LOG_SEVERITY_ERROR,
+ LevelFilter::Info => gpr_log_severity::GPR_LOG_SEVERITY_INFO,
+ LevelFilter::Debug | LevelFilter::Trace => gpr_log_severity::GPR_LOG_SEVERITY_DEBUG,
+ };
+
+ unsafe {
+ grpc_sys::gpr_set_log_verbosity(level);
+ grpc_sys::gpr_set_log_function(Some(delegate));
+ }
+}
diff --git a/src/metadata.rs b/src/metadata.rs
new file mode 100644
index 0000000..ef49bcb
--- /dev/null
+++ b/src/metadata.rs
@@ -0,0 +1,322 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::grpc_sys::{self, grpc_metadata_array};
+use std::borrow::Cow;
+use std::{mem, slice, str};
+
+use crate::error::{Error, Result};
+
+fn normalize_key(key: &str, binary: bool) -> Result<Cow<'_, str>> {
+ if key.is_empty() {
+ return Err(Error::InvalidMetadata(
+ "metadata key should not be empty".to_owned(),
+ ));
+ }
+ let mut is_upper_case = false;
+ for b in key.as_bytes() {
+ let b = *b;
+ if b >= b'A' && b <= b'Z' {
+ is_upper_case = true;
+ continue;
+ } else if b >= b'a' && b <= b'z'
+ || b >= b'0' && b <= b'9'
+ || b == b'_'
+ || b == b'-'
+ || b == b'.'
+ {
+ continue;
+ }
+ return Err(Error::InvalidMetadata(format!("key {:?} is invalid", key)));
+ }
+ let key = if is_upper_case {
+ Cow::Owned(key.to_ascii_lowercase())
+ } else {
+ Cow::Borrowed(key)
+ };
+ if binary {
+ if !key.as_bytes().ends_with(b"-bin") {
+ return Err(Error::InvalidMetadata(
+ "binary key should end with '-bin'".to_owned(),
+ ));
+ }
+ } else if key.as_bytes().ends_with(b"-bin") {
+ return Err(Error::InvalidMetadata(
+ "non-binary key should not end with '-bin'".to_owned(),
+ ));
+ }
+ Ok(key)
+}
+
+/// Builder for immutable Metadata.
+pub struct MetadataBuilder {
+ arr: Metadata,
+}
+
+impl MetadataBuilder {
+ /// Create a builder with empty initial capacity.
+ pub fn new() -> MetadataBuilder {
+ MetadataBuilder::with_capacity(0)
+ }
+
+ /// Create a builder with the given value.
+ pub fn with_capacity(cap: usize) -> MetadataBuilder {
+ MetadataBuilder {
+ arr: Metadata::with_capacity(cap),
+ }
+ }
+
+ /// Add a metadata holding an ASCII value.
+ ///
+ /// `key` must not use suffix (-bin) indicating a binary valued metadata entry.
+ pub fn add_str(&mut self, key: &str, value: &str) -> Result<&mut MetadataBuilder> {
+ if !value.is_ascii() {
+ return Err(Error::InvalidMetadata(
+ "only ascii value is accepted.".to_owned(),
+ ));
+ }
+ for b in value.bytes() {
+ if 0 == unsafe { libc::isprint(b as i32) } {
+ return Err(Error::InvalidMetadata(
+ "Only printable chars are accepted.".to_owned(),
+ ));
+ }
+ }
+ let key = normalize_key(key, false)?;
+ self.add_metadata(&key, value.as_bytes())
+ }
+
+ fn add_metadata(&mut self, key: &str, value: &[u8]) -> Result<&mut MetadataBuilder> {
+ unsafe {
+ grpc_sys::grpcwrap_metadata_array_add(
+ &mut self.arr.0,
+ key.as_ptr() as _,
+ key.len(),
+ value.as_ptr() as _,
+ value.len(),
+ )
+ }
+ Ok(self)
+ }
+
+ /// Add a metadata holding a binary value.
+ ///
+ /// `key` needs to have suffix (-bin) indicating a binary valued metadata entry.
+ pub fn add_bytes(&mut self, key: &str, value: &[u8]) -> Result<&mut MetadataBuilder> {
+ let key = normalize_key(key, true)?;
+ self.add_metadata(&key, value)
+ }
+
+ /// Create `Metadata` with configured entries.
+ pub fn build(mut self) -> Metadata {
+ unsafe {
+ grpc_sys::grpcwrap_metadata_array_shrink_to_fit(&mut self.arr.0);
+ }
+ self.arr
+ }
+}
+
+/// A collection of metadata entries that can be exchanged during a call.
+///
+/// gRPC supports these types of metadata:
+///
+/// - Request headers
+///
+/// They are sent by the client at the beginning of a remote call before
+/// any request messages are sent.
+///
+/// - Response headers
+///
+/// They are sent by the server at the beginning of a remote call handler
+/// before any response messages are sent.
+///
+/// - Response trailers
+///
+/// They are sent by the server at the end of a remote call along with
+/// resulting call status.
+///
+/// Metadata value can be ascii string or bytes. They are distinguish by the
+/// key suffix, key of bytes value should have suffix '-bin'.
+#[repr(C)]
+pub struct Metadata(grpc_metadata_array);
+
+impl Metadata {
+ fn with_capacity(cap: usize) -> Metadata {
+ unsafe {
+ let mut arr = mem::MaybeUninit::uninit();
+ grpc_sys::grpcwrap_metadata_array_init(arr.as_mut_ptr(), cap);
+ Metadata(arr.assume_init())
+ }
+ }
+
+ /// Returns the count of metadata entries.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.0.count
+ }
+
+ /// Returns true if there is no metadata entries.
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.0.count == 0
+ }
+
+ /// Returns the metadata entry at the `index`.
+ ///
+ /// `None` is returned if out of bound.
+ pub fn get(&self, index: usize) -> Option<(&str, &[u8])> {
+ if self.0.count <= index {
+ return None;
+ }
+ let (mut key_len, mut val_len) = (0, 0);
+ unsafe {
+ let key = grpc_sys::grpcwrap_metadata_array_get_key(&self.0, index, &mut key_len);
+ let val = grpc_sys::grpcwrap_metadata_array_get_value(&self.0, index, &mut val_len);
+ let key_str = str::from_utf8_unchecked(slice::from_raw_parts(key as _, key_len));
+ let val_bytes = slice::from_raw_parts(val as *const u8, val_len);
+ Some((key_str, val_bytes))
+ }
+ }
+
+ /// Returns an iterator over the metadata entries.
+ pub fn iter(&self) -> MetadataIter<'_> {
+ MetadataIter {
+ data: self,
+ index: 0,
+ }
+ }
+}
+
+impl Clone for Metadata {
+ fn clone(&self) -> Metadata {
+ let mut builder = MetadataBuilder::with_capacity(self.len());
+ for (k, v) in self.iter() {
+ // use `add_metadata` to skip validation.
+ builder.add_metadata(k, v).unwrap();
+ }
+ builder.build()
+ }
+}
+
+impl Drop for Metadata {
+ fn drop(&mut self) {
+ unsafe {
+ grpc_sys::grpcwrap_metadata_array_cleanup(&mut self.0);
+ }
+ }
+}
+
+unsafe impl Send for Metadata {}
+
+/// Immutable metadata iterator
+///
+/// This struct is created by the iter method on `Metadata`.
+pub struct MetadataIter<'a> {
+ data: &'a Metadata,
+ index: usize,
+}
+
+impl<'a> Iterator for MetadataIter<'a> {
+ type Item = (&'a str, &'a [u8]);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let res = self.data.get(self.index);
+ if res.is_some() {
+ self.index += 1;
+ }
+ res
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let remain = self.data.0.count - self.index;
+ (remain, Some(remain))
+ }
+}
+
+impl<'a> IntoIterator for &'a Metadata {
+ type IntoIter = MetadataIter<'a>;
+ type Item = (&'a str, &'a [u8]);
+
+ fn into_iter(self) -> MetadataIter<'a> {
+ MetadataIter {
+ data: self,
+ index: 0,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_key_check() {
+ let mut builder = MetadataBuilder::new();
+ // Non-byte key should not end with '-bin'.
+ assert!(builder.add_str("key-bin", "value").is_err());
+ // Byte key should end with '-bin'.
+ assert!(builder.add_bytes("key", b"value").is_err());
+ // Key should not be empty.
+ assert!(builder.add_str("", "value").is_err());
+ // Key should follow the rule ^[a-z0-9_-.]+$
+ assert!(builder.add_str(":key", "value").is_err());
+ assert!(builder.add_str("key~", "value").is_err());
+ assert!(builder.add_str("ke+y", "value").is_err());
+ // Only printable ascii value is accepted when `add_str`.
+ assert!(builder.add_str("key", "❤").is_err());
+ assert!(builder.add_str("key", "\0").is_err());
+ assert!(builder.add_str("key", "\n").is_err());
+
+ builder.add_str("key", "value").unwrap();
+ builder.add_str("_", "value").unwrap();
+ builder.add_str("-", "value").unwrap();
+ builder.add_str(".", "value").unwrap();
+ builder.add_bytes("key-bin", b"value").unwrap();
+ }
+
+ #[test]
+ fn test_metadata() {
+ let mut builder = MetadataBuilder::new();
+ let mut meta_kvs = vec![];
+ for i in 0..5 {
+ let key = format!("K{}", i);
+ let val = format!("v{}", i);
+ builder.add_str(&key, &val).unwrap();
+ meta_kvs.push((key.to_ascii_lowercase(), val.into_bytes()));
+ }
+ for i in 5..10 {
+ let key = format!("k{}-Bin", i);
+ let val = format!("v{}", i);
+ builder.add_bytes(&key, val.as_bytes()).unwrap();
+ meta_kvs.push((key.to_ascii_lowercase(), val.into_bytes()));
+ }
+ let metadata = builder.build();
+ for (i, (exp, res)) in meta_kvs.iter().zip(&metadata).enumerate() {
+ let kv = metadata.get(i).unwrap();
+ assert_eq!(kv, res);
+ assert_eq!(res, (exp.0.as_str(), exp.1.as_slice()));
+ }
+ assert!(metadata.get(10).is_none());
+ assert_eq!(metadata.len(), 10);
+ assert!(!metadata.is_empty());
+ {
+ let mut iter = metadata.iter();
+ for i in 0..10 {
+ assert_eq!(iter.size_hint(), (10 - i, Some(10 - i)));
+ iter.next();
+ }
+ assert_eq!(iter.size_hint(), (0, Some(0)));
+ }
+
+ let metadata1 = metadata.clone();
+ for (x, y) in metadata.iter().zip(&metadata1) {
+ assert_eq!(x, y);
+ }
+ drop(metadata);
+ // Ensure deep copy.
+ assert!(metadata1.get(0).is_some());
+
+ let empty_metadata = MetadataBuilder::new().build();
+ assert!(empty_metadata.is_empty());
+ assert_eq!(empty_metadata.len(), 0);
+ }
+}
diff --git a/src/quota.rs b/src/quota.rs
new file mode 100644
index 0000000..7891312
--- /dev/null
+++ b/src/quota.rs
@@ -0,0 +1,49 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::grpc_sys::{self, grpc_resource_quota};
+use std::ffi::CString;
+use std::ptr;
+
+/// ResourceQuota represents a bound on memory and thread usage by the gRPC.
+/// NOTE: The management of threads created in grpc-core don't use ResourceQuota.
+/// TODO: Manage the poller threads created in grpc-rs with this ResourceQuota later.
+pub struct ResourceQuota {
+ raw: *mut grpc_resource_quota,
+}
+
+impl ResourceQuota {
+ /// Create a control block for resource quota. If a name is
+ /// not declared for this control block, a name is automatically
+ /// generated in grpc core.
+ pub fn new(name: Option<&str>) -> ResourceQuota {
+ match name {
+ Some(name_str) => {
+ let name_cstr = CString::new(name_str).unwrap();
+ ResourceQuota {
+ raw: unsafe { grpc_sys::grpc_resource_quota_create(name_cstr.as_ptr() as _) },
+ }
+ }
+ None => ResourceQuota {
+ raw: unsafe { grpc_sys::grpc_resource_quota_create(ptr::null()) },
+ },
+ }
+ }
+
+ /// Resize this ResourceQuota to a new memory size.
+ pub fn resize_memory(self, new_size: usize) -> ResourceQuota {
+ unsafe { grpc_sys::grpc_resource_quota_resize(self.raw, new_size) };
+ self
+ }
+
+ pub(crate) fn get_ptr(&self) -> *mut grpc_resource_quota {
+ self.raw
+ }
+}
+
+impl Drop for ResourceQuota {
+ fn drop(&mut self) {
+ unsafe {
+ grpc_sys::grpc_resource_quota_unref(self.raw);
+ }
+ }
+}
diff --git a/src/security/credentials.rs b/src/security/credentials.rs
new file mode 100644
index 0000000..8d835ee
--- /dev/null
+++ b/src/security/credentials.rs
@@ -0,0 +1,368 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::error::Error as StdError;
+use std::ffi::CString;
+use std::{mem, ptr};
+
+use crate::error::{Error, Result};
+use crate::grpc_sys::grpc_ssl_certificate_config_reload_status::{self, *};
+use crate::grpc_sys::grpc_ssl_client_certificate_request_type::*;
+use crate::grpc_sys::{
+ self, grpc_channel_credentials, grpc_server_credentials,
+ grpc_ssl_client_certificate_request_type, grpc_ssl_server_certificate_config,
+};
+
+#[repr(u32)]
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
+pub enum CertificateRequestType {
+ /// Server does not request client certificate.
+ ///
+ /// The certificate presented by the client is not checked by the server at
+ /// all. (A client may present a self signed or signed certificate or not
+ /// present a certificate at all and any of those option would be accepted)
+ DontRequestClientCertificate = GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE as u32,
+ /// Server requests client certificate but does not enforce that the client
+ /// presents a certificate.
+ ///
+ /// If the client presents a certificate, the client authentication is left to
+ /// the application (the necessary metadata will be available to the
+ /// application via authentication context properties, see grpc_auth_context).
+ ///
+ /// The client's key certificate pair must be valid for the SSL connection to
+ /// be established.
+ RequestClientCertificateButDontVerify =
+ GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY as u32,
+ /// Server requests client certificate but does not enforce that the client
+ /// presents a certificate.
+ ///
+ /// If the client presents a certificate, the client authentication is done by
+ /// the gRPC framework. (For a successful connection the client needs to either
+ /// present a certificate that can be verified against the root certificate
+ /// configured by the server or not present a certificate at all)
+ ///
+ /// The client's key certificate pair must be valid for the SSL connection to
+ /// be established.
+ RequestClientCertificateAndVerify = GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY as u32,
+ /// Server requests client certificate and enforces that the client presents a
+ /// certificate.
+ ///
+ /// If the client presents a certificate, the client authentication is left to
+ /// the application (the necessary metadata will be available to the
+ /// application via authentication context properties, see grpc_auth_context).
+ ///
+ /// The client's key certificate pair must be valid for the SSL connection to
+ /// be established.
+ RequestAndRequireClientCertificateButDontVerify =
+ GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY as u32,
+ /// Server requests client certificate and enforces that the client presents a
+ /// certificate.
+ ///
+ /// The certificate presented by the client is verified by the gRPC framework.
+ /// (For a successful connection the client needs to present a certificate that
+ /// can be verified against the root certificate configured by the server)
+ ///
+ /// The client's key certificate pair must be valid for the SSL connection to
+ /// be established.
+ RequestAndRequireClientCertificateAndVerify =
+ GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY as u32,
+}
+
+/// Traits to retrieve updated SSL server certificates, private keys, and trusted CAs
+/// (for client authentication).
+pub trait ServerCredentialsFetcher {
+ /// Retrieves updated credentials.
+ ///
+ /// The method will be called during server initialization and every time a new
+ /// connection is about to be accepted. When returning `None` or error, gRPC
+ /// will continue to use the previous certificates returned by the method. If no
+ /// valid credentials is returned during initialization, the server will fail to start.
+ fn fetch(&self) -> std::result::Result<Option<ServerCredentialsBuilder>, Box<dyn StdError>>;
+}
+
+impl CertificateRequestType {
+ #[inline]
+ pub(crate) fn to_native(self) -> grpc_ssl_client_certificate_request_type {
+ unsafe { mem::transmute(self) }
+ }
+}
+
+fn clear_key_securely(key: &mut [u8]) {
+ unsafe {
+ for b in key {
+ ptr::write_volatile(b, 0)
+ }
+ }
+}
+
+pub(crate) unsafe extern "C" fn server_cert_fetcher_wrapper(
+ user_data: *mut std::os::raw::c_void,
+ config: *mut *mut grpc_ssl_server_certificate_config,
+) -> grpc_ssl_certificate_config_reload_status {
+ if user_data.is_null() {
+ panic!("fetcher user_data must be set up!");
+ }
+ let f: &mut dyn ServerCredentialsFetcher =
+ (&mut *(user_data as *mut Box<dyn ServerCredentialsFetcher>)).as_mut();
+ let result = f.fetch();
+ match result {
+ Ok(Some(builder)) => {
+ let new_config = builder.build_config();
+ *config = new_config;
+ }
+ Ok(None) => {
+ return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED;
+ }
+ Err(e) => {
+ warn!("cert_fetcher met error: {}", e);
+ return GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL;
+ }
+ }
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW
+}
+
+/// [`ServerCredentials`] factory in order to configure the properties.
+pub struct ServerCredentialsBuilder {
+ root: Option<CString>,
+ key_cert_pairs: Vec<grpcio_sys::grpc_ssl_pem_key_cert_pair>,
+ cer_request_type: CertificateRequestType,
+}
+
+impl ServerCredentialsBuilder {
+ /// Initialize a new [`ServerCredentialsBuilder`].
+ pub fn new() -> ServerCredentialsBuilder {
+ ServerCredentialsBuilder {
+ root: None,
+ key_cert_pairs: vec![],
+ cer_request_type: CertificateRequestType::DontRequestClientCertificate,
+ }
+ }
+
+ /// Set the PEM encoded client root certificate to verify client's identity. If
+ /// `force_client_auth` is set to `true`, the authenticity of client check will be enforced.
+ pub fn root_cert<S: Into<Vec<u8>>>(
+ mut self,
+ cert: S,
+ cer_request_type: CertificateRequestType,
+ ) -> ServerCredentialsBuilder {
+ self.root = Some(CString::new(cert).unwrap());
+ self.cer_request_type = cer_request_type;
+ self
+ }
+
+ /// Add a PEM encoded server side certificate and key.
+ pub fn add_cert(mut self, cert: Vec<u8>, mut private_key: Vec<u8>) -> ServerCredentialsBuilder {
+ if private_key.capacity() == private_key.len() {
+ let mut nil_key = Vec::with_capacity(private_key.len() + 1);
+ nil_key.extend_from_slice(&private_key);
+ clear_key_securely(&mut private_key);
+ private_key = nil_key;
+ }
+ self.key_cert_pairs
+ .push(grpcio_sys::grpc_ssl_pem_key_cert_pair {
+ private_key: CString::new(private_key).unwrap().into_raw(),
+ cert_chain: CString::new(cert).unwrap().into_raw(),
+ });
+ self
+ }
+
+ /// Finalize the [`ServerCredentialsBuilder`] and build the
+ /// [`*mut grpcio_sys::bindings::grpc_ssl_server_certificate_config`].
+ unsafe fn build_config(mut self) -> *mut grpcio_sys::grpc_ssl_server_certificate_config {
+ let root_cert = self
+ .root
+ .take()
+ .map_or_else(ptr::null_mut, CString::into_raw);
+ let cfg = grpcio_sys::grpc_ssl_server_certificate_config_create(
+ root_cert,
+ self.key_cert_pairs.as_ptr(),
+ self.key_cert_pairs.len(),
+ );
+ if !root_cert.is_null() {
+ CString::from_raw(root_cert);
+ }
+ cfg
+ }
+
+ /// Finalize the [`ServerCredentialsBuilder`] and build the [`ServerCredentials`].
+ pub fn build(self) -> ServerCredentials {
+ let credentials = unsafe {
+ let opt = grpcio_sys::grpc_ssl_server_credentials_create_options_using_config(
+ self.cer_request_type.to_native(),
+ self.build_config(),
+ );
+ grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt)
+ };
+
+ ServerCredentials { creds: credentials }
+ }
+}
+
+impl Drop for ServerCredentialsBuilder {
+ fn drop(&mut self) {
+ for pair in self.key_cert_pairs.drain(..) {
+ unsafe {
+ CString::from_raw(pair.cert_chain as *mut _);
+ let s = CString::from_raw(pair.private_key as *mut _);
+ clear_key_securely(&mut s.into_bytes_with_nul());
+ }
+ }
+ }
+}
+
+/// Server-side SSL credentials.
+///
+/// Use [`ServerCredentialsBuilder`] to build a [`ServerCredentials`].
+pub struct ServerCredentials {
+ creds: *mut grpc_server_credentials,
+}
+
+unsafe impl Send for ServerCredentials {}
+
+impl ServerCredentials {
+ pub(crate) unsafe fn frow_raw(creds: *mut grpc_server_credentials) -> ServerCredentials {
+ ServerCredentials { creds }
+ }
+
+ pub fn as_mut_ptr(&mut self) -> *mut grpc_server_credentials {
+ self.creds
+ }
+}
+
+impl Drop for ServerCredentials {
+ fn drop(&mut self) {
+ unsafe {
+ grpc_sys::grpc_server_credentials_release(self.creds);
+ }
+ }
+}
+
+/// [`ChannelCredentials`] factory in order to configure the properties.
+pub struct ChannelCredentialsBuilder {
+ root: Option<CString>,
+ cert_key_pair: Option<(CString, CString)>,
+}
+
+impl ChannelCredentialsBuilder {
+ /// Initialize a new [`ChannelCredentialsBuilder`].
+ pub fn new() -> ChannelCredentialsBuilder {
+ ChannelCredentialsBuilder {
+ root: None,
+ cert_key_pair: None,
+ }
+ }
+
+ /// Set the PEM encoded server root certificate to verify server's identity.
+ pub fn root_cert(mut self, cert: Vec<u8>) -> ChannelCredentialsBuilder {
+ self.root = Some(CString::new(cert).unwrap());
+ self
+ }
+
+ /// Set the PEM encoded client side certificate and key.
+ pub fn cert(mut self, cert: Vec<u8>, mut private_key: Vec<u8>) -> ChannelCredentialsBuilder {
+ if private_key.capacity() == private_key.len() {
+ let mut nil_key = Vec::with_capacity(private_key.len() + 1);
+ nil_key.extend_from_slice(&private_key);
+ clear_key_securely(&mut private_key);
+ private_key = nil_key;
+ }
+ self.cert_key_pair = Some((
+ CString::new(cert).unwrap(),
+ CString::new(private_key).unwrap(),
+ ));
+ self
+ }
+
+ /// Finalize the [`ChannelCredentialsBuilder`] and build the [`ChannelCredentials`].
+ pub fn build(mut self) -> ChannelCredentials {
+ let root_ptr = self
+ .root
+ .take()
+ .map_or_else(ptr::null_mut, CString::into_raw);
+ let (cert_ptr, key_ptr) = self.cert_key_pair.take().map_or_else(
+ || (ptr::null_mut(), ptr::null_mut()),
+ |(cert, key)| (cert.into_raw(), key.into_raw()),
+ );
+
+ let mut pair = grpcio_sys::grpc_ssl_pem_key_cert_pair {
+ private_key: key_ptr,
+ cert_chain: cert_ptr,
+ };
+ let creds = unsafe {
+ if cert_ptr.is_null() {
+ grpcio_sys::grpc_ssl_credentials_create_ex(
+ root_ptr,
+ ptr::null_mut(),
+ ptr::null_mut(),
+ ptr::null_mut(),
+ )
+ } else {
+ grpcio_sys::grpc_ssl_credentials_create_ex(
+ root_ptr,
+ &mut pair,
+ ptr::null_mut(),
+ ptr::null_mut(),
+ )
+ }
+ };
+
+ if !root_ptr.is_null() {
+ unsafe {
+ self.root = Some(CString::from_raw(root_ptr));
+ }
+ }
+
+ if !cert_ptr.is_null() {
+ unsafe {
+ let cert = CString::from_raw(cert_ptr);
+ let key = CString::from_raw(key_ptr);
+ self.cert_key_pair = Some((cert, key));
+ }
+ }
+
+ ChannelCredentials { creds }
+ }
+}
+
+impl Drop for ChannelCredentialsBuilder {
+ fn drop(&mut self) {
+ if let Some((_, key)) = self.cert_key_pair.take() {
+ clear_key_securely(&mut key.into_bytes_with_nul());
+ }
+ }
+}
+
+/// Client-side SSL credentials.
+///
+/// Use [`ChannelCredentialsBuilder`] or [`ChannelCredentials::google_default_credentials`] to
+/// build a [`ChannelCredentials`].
+pub struct ChannelCredentials {
+ creds: *mut grpc_channel_credentials,
+}
+
+impl ChannelCredentials {
+ pub fn as_mut_ptr(&mut self) -> *mut grpc_channel_credentials {
+ self.creds
+ }
+
+ /// Try to build a [`ChannelCredentials`] to authenticate with Google OAuth credentials.
+ pub fn google_default_credentials() -> Result<ChannelCredentials> {
+ // Initialize the runtime here. Because this is an associated method
+ // that can be called before construction of an `Environment`, we
+ // need to call this here too.
+ unsafe {
+ grpc_sys::grpc_init();
+ }
+ let creds = unsafe { grpc_sys::grpc_google_default_credentials_create() };
+ if creds.is_null() {
+ Err(Error::GoogleAuthenticationFailed)
+ } else {
+ Ok(ChannelCredentials { creds })
+ }
+ }
+}
+
+impl Drop for ChannelCredentials {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpc_channel_credentials_release(self.creds) }
+ }
+}
diff --git a/src/security/mod.rs b/src/security/mod.rs
new file mode 100644
index 0000000..f2c2bad
--- /dev/null
+++ b/src/security/mod.rs
@@ -0,0 +1,10 @@
+// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
+
+mod credentials;
+
+pub use self::credentials::{
+ CertificateRequestType, ChannelCredentials, ChannelCredentialsBuilder, ServerCredentials,
+ ServerCredentialsBuilder, ServerCredentialsFetcher,
+};
+
+pub(crate) use self::credentials::server_cert_fetcher_wrapper;
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..3dd8bf3
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,607 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::cell::UnsafeCell;
+use std::collections::HashMap;
+use std::fmt::{self, Debug, Formatter};
+use std::net::{IpAddr, SocketAddr};
+use std::pin::Pin;
+use std::ptr;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+
+use crate::grpc_sys::{self, grpc_call_error, grpc_server};
+use futures::future::Future;
+use futures::task::{Context, Poll};
+
+use crate::call::server::*;
+use crate::call::{MessageReader, Method, MethodType};
+use crate::channel::ChannelArgs;
+use crate::cq::CompletionQueue;
+use crate::env::Environment;
+use crate::error::{Error, Result};
+use crate::task::{CallTag, CqFuture};
+use crate::RpcContext;
+
+const DEFAULT_REQUEST_SLOTS_PER_CQ: usize = 1024;
+
+/// An RPC call holder.
+#[derive(Clone)]
+pub struct Handler<F> {
+ method_type: MethodType,
+ cb: F,
+}
+
+impl<F> Handler<F> {
+ pub fn new(method_type: MethodType, cb: F) -> Handler<F> {
+ Handler { method_type, cb }
+ }
+}
+
+pub trait CloneableHandler: Send {
+ fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>);
+ fn box_clone(&self) -> Box<dyn CloneableHandler>;
+ fn method_type(&self) -> MethodType;
+}
+
+impl<F: 'static> CloneableHandler for Handler<F>
+where
+ F: FnMut(RpcContext<'_>, Option<MessageReader>) + Send + Clone,
+{
+ #[inline]
+ fn handle(&mut self, ctx: RpcContext<'_>, reqs: Option<MessageReader>) {
+ (self.cb)(ctx, reqs)
+ }
+
+ #[inline]
+ fn box_clone(&self) -> Box<dyn CloneableHandler> {
+ Box::new(self.clone())
+ }
+
+ #[inline]
+ fn method_type(&self) -> MethodType {
+ self.method_type
+ }
+}
+
+/// Given a host and port, creates a string of the form "host:port" or
+/// "[host]:port", depending on whether the host is an IPv6 literal.
+fn join_host_port(host: &str, port: u16) -> String {
+ if host.starts_with("unix:") {
+ format!("{}\0", host)
+ } else if let Ok(ip) = host.parse::<IpAddr>() {
+ format!("{}\0", SocketAddr::new(ip, port))
+ } else {
+ format!("{}:{}\0", host, port)
+ }
+}
+
+#[cfg(feature = "secure")]
+mod imp {
+ use super::join_host_port;
+ use crate::grpc_sys::{self, grpc_server};
+ use crate::security::ServerCredentialsFetcher;
+ use crate::ServerCredentials;
+
+ pub struct Binder {
+ pub host: String,
+ pub port: u16,
+ cred: Option<ServerCredentials>,
+ _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
+ }
+
+ impl Binder {
+ pub fn new(host: String, port: u16) -> Binder {
+ let cred = None;
+ Binder {
+ host,
+ port,
+ cred,
+ _fetcher: None,
+ }
+ }
+
+ pub fn with_cred(
+ host: String,
+ port: u16,
+ cred: ServerCredentials,
+ _fetcher: Option<Box<Box<dyn ServerCredentialsFetcher + Send + Sync>>>,
+ ) -> Binder {
+ let cred = Some(cred);
+ Binder {
+ host,
+ port,
+ cred,
+ _fetcher,
+ }
+ }
+
+ pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
+ let addr = join_host_port(&self.host, self.port);
+ let port = match self.cred.take() {
+ None => grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _),
+ Some(mut cert) => grpc_sys::grpc_server_add_secure_http2_port(
+ server,
+ addr.as_ptr() as _,
+ cert.as_mut_ptr(),
+ ),
+ };
+ port as u16
+ }
+ }
+}
+
+#[cfg(not(feature = "secure"))]
+mod imp {
+ use super::join_host_port;
+ use crate::grpc_sys::{self, grpc_server};
+
+ pub struct Binder {
+ pub host: String,
+ pub port: u16,
+ }
+
+ impl Binder {
+ pub fn new(host: String, port: u16) -> Binder {
+ Binder { host, port }
+ }
+
+ pub unsafe fn bind(&mut self, server: *mut grpc_server) -> u16 {
+ let addr = join_host_port(&self.host, self.port);
+ grpc_sys::grpc_server_add_insecure_http2_port(server, addr.as_ptr() as _) as u16
+ }
+ }
+}
+
+use self::imp::Binder;
+
+impl Debug for Binder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "Binder {{ host: {}, port: {} }}", self.host, self.port)
+ }
+}
+
+/// [`Service`] factory in order to configure the properties.
+///
+/// Use it to build a service which can be registered to a server.
+pub struct ServiceBuilder {
+ handlers: HashMap<&'static [u8], BoxHandler>,
+}
+
+impl ServiceBuilder {
+ /// Initialize a new [`ServiceBuilder`].
+ pub fn new() -> ServiceBuilder {
+ ServiceBuilder {
+ handlers: HashMap::new(),
+ }
+ }
+
+ /// Add a unary RPC call handler.
+ pub fn add_unary_handler<Req, Resp, F>(
+ mut self,
+ method: &Method<Req, Resp>,
+ mut handler: F,
+ ) -> ServiceBuilder
+ where
+ Req: 'static,
+ Resp: 'static,
+ F: FnMut(RpcContext<'_>, Req, UnarySink<Resp>) + Send + Clone + 'static,
+ {
+ let (ser, de) = (method.resp_ser(), method.req_de());
+ let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
+ execute_unary(ctx, ser, de, payload.unwrap(), &mut handler)
+ };
+ let ch = Box::new(Handler::new(MethodType::Unary, h));
+ self.handlers.insert(method.name.as_bytes(), ch);
+ self
+ }
+
+ /// Add a client streaming RPC call handler.
+ pub fn add_client_streaming_handler<Req, Resp, F>(
+ mut self,
+ method: &Method<Req, Resp>,
+ mut handler: F,
+ ) -> ServiceBuilder
+ where
+ Req: 'static,
+ Resp: 'static,
+ F: FnMut(RpcContext<'_>, RequestStream<Req>, ClientStreamingSink<Resp>)
+ + Send
+ + Clone
+ + 'static,
+ {
+ let (ser, de) = (method.resp_ser(), method.req_de());
+ let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
+ execute_client_streaming(ctx, ser, de, &mut handler)
+ };
+ let ch = Box::new(Handler::new(MethodType::ClientStreaming, h));
+ self.handlers.insert(method.name.as_bytes(), ch);
+ self
+ }
+
+ /// Add a server streaming RPC call handler.
+ pub fn add_server_streaming_handler<Req, Resp, F>(
+ mut self,
+ method: &Method<Req, Resp>,
+ mut handler: F,
+ ) -> ServiceBuilder
+ where
+ Req: 'static,
+ Resp: 'static,
+ F: FnMut(RpcContext<'_>, Req, ServerStreamingSink<Resp>) + Send + Clone + 'static,
+ {
+ let (ser, de) = (method.resp_ser(), method.req_de());
+ let h = move |ctx: RpcContext<'_>, payload: Option<MessageReader>| {
+ execute_server_streaming(ctx, ser, de, payload.unwrap(), &mut handler)
+ };
+ let ch = Box::new(Handler::new(MethodType::ServerStreaming, h));
+ self.handlers.insert(method.name.as_bytes(), ch);
+ self
+ }
+
+ /// Add a duplex streaming RPC call handler.
+ pub fn add_duplex_streaming_handler<Req, Resp, F>(
+ mut self,
+ method: &Method<Req, Resp>,
+ mut handler: F,
+ ) -> ServiceBuilder
+ where
+ Req: 'static,
+ Resp: 'static,
+ F: FnMut(RpcContext<'_>, RequestStream<Req>, DuplexSink<Resp>) + Send + Clone + 'static,
+ {
+ let (ser, de) = (method.resp_ser(), method.req_de());
+ let h = move |ctx: RpcContext<'_>, _: Option<MessageReader>| {
+ execute_duplex_streaming(ctx, ser, de, &mut handler)
+ };
+ let ch = Box::new(Handler::new(MethodType::Duplex, h));
+ self.handlers.insert(method.name.as_bytes(), ch);
+ self
+ }
+
+ /// Finalize the [`ServiceBuilder`] and build the [`Service`].
+ pub fn build(self) -> Service {
+ Service {
+ handlers: self.handlers,
+ }
+ }
+}
+
+/// A gRPC service.
+///
+/// Use [`ServiceBuilder`] to build a [`Service`].
+pub struct Service {
+ handlers: HashMap<&'static [u8], BoxHandler>,
+}
+
+/// [`Server`] factory in order to configure the properties.
+pub struct ServerBuilder {
+ env: Arc<Environment>,
+ binders: Vec<Binder>,
+ args: Option<ChannelArgs>,
+ slots_per_cq: usize,
+ handlers: HashMap<&'static [u8], BoxHandler>,
+}
+
+impl ServerBuilder {
+ /// Initialize a new [`ServerBuilder`].
+ pub fn new(env: Arc<Environment>) -> ServerBuilder {
+ ServerBuilder {
+ env,
+ binders: Vec::new(),
+ args: None,
+ slots_per_cq: DEFAULT_REQUEST_SLOTS_PER_CQ,
+ handlers: HashMap::new(),
+ }
+ }
+
+ /// Bind to an address.
+ ///
+ /// This function can be called multiple times to bind to multiple ports.
+ pub fn bind<S: Into<String>>(mut self, host: S, port: u16) -> ServerBuilder {
+ self.binders.push(Binder::new(host.into(), port));
+ self
+ }
+
+ /// Add additional configuration for each incoming channel.
+ pub fn channel_args(mut self, args: ChannelArgs) -> ServerBuilder {
+ self.args = Some(args);
+ self
+ }
+
+ /// Set how many requests a completion queue can handle.
+ pub fn requests_slot_per_cq(mut self, slots: usize) -> ServerBuilder {
+ self.slots_per_cq = slots;
+ self
+ }
+
+ /// Register a service.
+ pub fn register_service(mut self, service: Service) -> ServerBuilder {
+ self.handlers.extend(service.handlers);
+ self
+ }
+
+ /// Finalize the [`ServerBuilder`] and build the [`Server`].
+ pub fn build(mut self) -> Result<Server> {
+ let args = self
+ .args
+ .as_ref()
+ .map_or_else(ptr::null, ChannelArgs::as_ptr);
+ unsafe {
+ let server = grpc_sys::grpc_server_create(args, ptr::null_mut());
+ for binder in self.binders.iter_mut() {
+ let bind_port = binder.bind(server);
+ if bind_port == 0 {
+ grpc_sys::grpc_server_destroy(server);
+ return Err(Error::BindFail(binder.host.clone(), binder.port));
+ }
+ binder.port = bind_port;
+ }
+
+ for cq in self.env.completion_queues() {
+ let cq_ref = cq.borrow()?;
+ grpc_sys::grpc_server_register_completion_queue(
+ server,
+ cq_ref.as_ptr(),
+ ptr::null_mut(),
+ );
+ }
+
+ Ok(Server {
+ env: self.env,
+ core: Arc::new(ServerCore {
+ server,
+ shutdown: AtomicBool::new(false),
+ binders: self.binders,
+ slots_per_cq: self.slots_per_cq,
+ }),
+ handlers: self.handlers,
+ })
+ }
+ }
+}
+
+#[cfg(feature = "secure")]
+mod secure_server {
+ use super::{Binder, ServerBuilder};
+ use crate::grpc_sys;
+ use crate::security::{
+ server_cert_fetcher_wrapper, CertificateRequestType, ServerCredentials,
+ ServerCredentialsFetcher,
+ };
+
+ impl ServerBuilder {
+ /// Bind to an address with credentials for secure connection.
+ ///
+ /// This function can be called multiple times to bind to multiple ports.
+ pub fn bind_with_cred<S: Into<String>>(
+ mut self,
+ host: S,
+ port: u16,
+ c: ServerCredentials,
+ ) -> ServerBuilder {
+ self.binders
+ .push(Binder::with_cred(host.into(), port, c, None));
+ self
+ }
+
+ /// Bind to an address for secure connection.
+ ///
+ /// The required credentials will be fetched using provided `fetcher`. This
+ /// function can be called multiple times to bind to multiple ports.
+ pub fn bind_with_fetcher<S: Into<String>>(
+ mut self,
+ host: S,
+ port: u16,
+ fetcher: Box<dyn ServerCredentialsFetcher + Send + Sync>,
+ cer_request_type: CertificateRequestType,
+ ) -> ServerBuilder {
+ let fetcher_wrap = Box::new(fetcher);
+ let fetcher_wrap_ptr = Box::into_raw(fetcher_wrap);
+ let (sc, fb) = unsafe {
+ let opt = grpc_sys::grpc_ssl_server_credentials_create_options_using_config_fetcher(
+ cer_request_type.to_native(),
+ Some(server_cert_fetcher_wrapper),
+ fetcher_wrap_ptr as _,
+ );
+ (
+ ServerCredentials::frow_raw(
+ grpcio_sys::grpc_ssl_server_credentials_create_with_options(opt),
+ ),
+ Box::from_raw(fetcher_wrap_ptr),
+ )
+ };
+ self.binders
+ .push(Binder::with_cred(host.into(), port, sc, Some(fb)));
+ self
+ }
+ }
+}
+
+struct ServerCore {
+ server: *mut grpc_server,
+ binders: Vec<Binder>,
+ slots_per_cq: usize,
+ shutdown: AtomicBool,
+}
+
+impl Drop for ServerCore {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpc_server_destroy(self.server) }
+ }
+}
+
+unsafe impl Send for ServerCore {}
+unsafe impl Sync for ServerCore {}
+
+pub type BoxHandler = Box<dyn CloneableHandler>;
+
+#[derive(Clone)]
+pub struct RequestCallContext {
+ server: Arc<ServerCore>,
+ registry: Arc<UnsafeCell<HashMap<&'static [u8], BoxHandler>>>,
+}
+
+impl RequestCallContext {
+ /// Users should guarantee the method is always called from the same thread.
+ /// TODO: Is there a better way?
+ #[inline]
+ pub unsafe fn get_handler(&mut self, path: &[u8]) -> Option<&mut BoxHandler> {
+ let registry = &mut *self.registry.get();
+ registry.get_mut(path)
+ }
+}
+
+// Apparently, its life time is guaranteed by the ref count, hence is safe to be sent
+// to other thread. However it's not `Sync`, as `BoxHandler` is unnecessarily `Sync`.
+unsafe impl Send for RequestCallContext {}
+
+/// Request notification of a new call.
+pub fn request_call(ctx: RequestCallContext, cq: &CompletionQueue) {
+ if ctx.server.shutdown.load(Ordering::Relaxed) {
+ return;
+ }
+ let cq_ref = match cq.borrow() {
+ // Shutting down, skip.
+ Err(_) => return,
+ Ok(c) => c,
+ };
+ let server_ptr = ctx.server.server;
+ let prom = CallTag::request(ctx);
+ let request_ptr = prom.request_ctx().unwrap().as_ptr();
+ let prom_box = Box::new(prom);
+ let tag = Box::into_raw(prom_box);
+ let code = unsafe {
+ grpc_sys::grpcwrap_server_request_call(
+ server_ptr,
+ cq_ref.as_ptr(),
+ request_ptr,
+ tag as *mut _,
+ )
+ };
+ if code != grpc_call_error::GRPC_CALL_OK {
+ Box::from(tag);
+ panic!("failed to request call: {:?}", code);
+ }
+}
+
+/// A `Future` that will resolve when shutdown completes.
+pub struct ShutdownFuture {
+ cq_f: CqFuture<()>,
+}
+
+impl Future for ShutdownFuture {
+ type Output = Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ Pin::new(&mut self.cq_f).poll(cx)
+ }
+}
+
+/// A gRPC server.
+///
+/// A single server can serve arbitrary number of services and can listen on more than one port.
+///
+/// Use [`ServerBuilder`] to build a [`Server`].
+pub struct Server {
+ env: Arc<Environment>,
+ core: Arc<ServerCore>,
+ handlers: HashMap<&'static [u8], BoxHandler>,
+}
+
+impl Server {
+ /// Shutdown the server asynchronously.
+ pub fn shutdown(&mut self) -> ShutdownFuture {
+ let (cq_f, prom) = CallTag::shutdown_pair();
+ let prom_box = Box::new(prom);
+ let tag = Box::into_raw(prom_box);
+ unsafe {
+ // Since env still exists, no way can cq been shutdown.
+ let cq_ref = self.env.completion_queues()[0].borrow().unwrap();
+ grpc_sys::grpc_server_shutdown_and_notify(
+ self.core.server,
+ cq_ref.as_ptr(),
+ tag as *mut _,
+ )
+ }
+ self.core.shutdown.store(true, Ordering::SeqCst);
+ ShutdownFuture { cq_f }
+ }
+
+ /// Cancel all in-progress calls.
+ ///
+ /// Only usable after shutdown.
+ pub fn cancel_all_calls(&mut self) {
+ unsafe { grpc_sys::grpc_server_cancel_all_calls(self.core.server) }
+ }
+
+ /// Start the server.
+ pub fn start(&mut self) {
+ unsafe {
+ grpc_sys::grpc_server_start(self.core.server);
+ for cq in self.env.completion_queues() {
+ // Handlers are Send and Clone, but not Sync. So we need to
+ // provide a replica for each completion queue.
+ let registry = self
+ .handlers
+ .iter()
+ .map(|(k, v)| (k.to_owned(), v.box_clone()))
+ .collect();
+ let rc = RequestCallContext {
+ server: self.core.clone(),
+ registry: Arc::new(UnsafeCell::new(registry)),
+ };
+ for _ in 0..self.core.slots_per_cq {
+ request_call(rc.clone(), cq);
+ }
+ }
+ }
+ }
+
+ /// Get binded addresses pairs.
+ pub fn bind_addrs(&self) -> impl ExactSizeIterator<Item = (&String, u16)> {
+ self.core.binders.iter().map(|b| (&b.host, b.port))
+ }
+}
+
+impl Drop for Server {
+ fn drop(&mut self) {
+ // if the server is not shutdown completely, destroy a server will core.
+ // TODO: don't wait here
+ let f = if self.core.shutdown.load(Ordering::SeqCst) {
+ Some(self.shutdown())
+ } else {
+ None
+ };
+ self.cancel_all_calls();
+ let _ = f.map(futures::executor::block_on);
+ }
+}
+
+impl Debug for Server {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "Server {:?}", self.core.binders)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::join_host_port;
+
+ #[test]
+ fn test_join_host_port() {
+ let tbl = vec![
+ ("localhost", 0u16, "localhost:0\0"),
+ ("127.0.0.1", 100u16, "127.0.0.1:100\0"),
+ ("::1", 0u16, "[::1]:0\0"),
+ (
+ "fe80::7376:45d5:fb08:61e3",
+ 10028u16,
+ "[fe80::7376:45d5:fb08:61e3]:10028\0",
+ ),
+ ];
+
+ for (h, p, e) in &tbl {
+ assert_eq!(join_host_port(h, *p), e.to_owned());
+ }
+ }
+}
diff --git a/src/task/callback.rs b/src/task/callback.rs
new file mode 100644
index 0000000..2675469
--- /dev/null
+++ b/src/task/callback.rs
@@ -0,0 +1,84 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::call::server::{RequestContext, UnaryRequestContext};
+use crate::call::{BatchContext, Call};
+use crate::cq::CompletionQueue;
+use crate::server::{self, RequestCallContext};
+
+pub struct Request {
+ ctx: RequestContext,
+}
+
+impl Request {
+ pub fn new(rc: RequestCallContext) -> Request {
+ let ctx = RequestContext::new(rc);
+ Request { ctx }
+ }
+
+ pub fn context(&self) -> &RequestContext {
+ &self.ctx
+ }
+
+ pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
+ let mut rc = self.ctx.take_request_call_context().unwrap();
+ if !success {
+ server::request_call(rc, cq);
+ return;
+ }
+
+ match self.ctx.handle_stream_req(cq, &mut rc) {
+ Ok(_) => server::request_call(rc, cq),
+ Err(ctx) => ctx.handle_unary_req(rc, cq),
+ }
+ }
+}
+
+pub struct UnaryRequest {
+ ctx: UnaryRequestContext,
+}
+
+impl UnaryRequest {
+ pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequest {
+ let ctx = UnaryRequestContext::new(ctx, rc);
+ UnaryRequest { ctx }
+ }
+
+ pub fn batch_ctx(&self) -> &BatchContext {
+ self.ctx.batch_ctx()
+ }
+
+ pub fn request_ctx(&self) -> &RequestContext {
+ self.ctx.request_ctx()
+ }
+
+ pub fn resolve(mut self, cq: &CompletionQueue, success: bool) {
+ let mut rc = self.ctx.take_request_call_context().unwrap();
+ if !success {
+ server::request_call(rc, cq);
+ return;
+ }
+
+ let reader = self.ctx.batch_ctx_mut().recv_message();
+ self.ctx.handle(&mut rc, cq, reader);
+ server::request_call(rc, cq);
+ }
+}
+
+/// A callback to wait for status for the aborted rpc call to be sent.
+pub struct Abort {
+ ctx: BatchContext,
+ _call: Call,
+}
+
+impl Abort {
+ pub fn new(call: Call) -> Abort {
+ Abort {
+ ctx: BatchContext::new(),
+ _call: call,
+ }
+ }
+
+ pub fn batch_ctx(&self) -> &BatchContext {
+ &self.ctx
+ }
+}
diff --git a/src/task/executor.rs b/src/task/executor.rs
new file mode 100644
index 0000000..4a13905
--- /dev/null
+++ b/src/task/executor.rs
@@ -0,0 +1,256 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+//! gRPC C Core binds a call to a completion queue, all the related readiness
+//! will be forwarded to the completion queue. This module utilizes the mechanism
+//! and using `Kicker` to wake up completion queue.
+//!
+//! Apparently, to minimize context switch, it's better to bind the future to the
+//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
+
+use std::cell::UnsafeCell;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicU8, Ordering};
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{waker_ref, ArcWake, Context, Poll};
+
+use super::CallTag;
+use crate::call::Call;
+use crate::cq::{CompletionQueue, WorkQueue};
+use crate::error::{Error, Result};
+use crate::grpc_sys::{self, grpc_call_error};
+
+/// A handle to a `Spawn`.
+/// Inner future is expected to be polled in the same thread as cq.
+type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
+
+/// `Kicker` wakes up the completion queue that the inner call binds to.
+pub(crate) struct Kicker {
+ call: Call,
+}
+
+impl Kicker {
+ pub fn from_call(call: Call) -> Kicker {
+ Kicker { call }
+ }
+
+ /// Wakes up its completion queue.
+ ///
+ /// `tag` will be popped by `grpc_completion_queue_next` in the future.
+ pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
+ let _ref = self.call.cq.borrow()?;
+ unsafe {
+ let ptr = Box::into_raw(tag);
+ let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
+ if status == grpc_call_error::GRPC_CALL_OK {
+ Ok(())
+ } else {
+ Err(Error::CallFailure(status))
+ }
+ }
+ }
+}
+
+unsafe impl Sync for Kicker {}
+
+impl Clone for Kicker {
+ fn clone(&self) -> Kicker {
+ // Bump call's reference count.
+ let call = unsafe {
+ grpc_sys::grpc_call_ref(self.call.call);
+ self.call.call
+ };
+ let cq = self.call.cq.clone();
+ Kicker {
+ call: Call { call, cq },
+ }
+ }
+}
+
+/// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
+/// it will be notified via task.wake(), and marked as NOTIFIED. When executor
+/// begins to poll the future, it's marked as POLLING. When the executor finishes
+/// polling, the future can either be ready or not ready. In the former case, it's
+/// marked as COMPLETED. If it's latter, it's marked as IDLE again.
+///
+/// Note it's possible the future is notified during polling, in which case, executor
+/// should polling it when last polling is finished unless it returns ready.
+const NOTIFIED: u8 = 1;
+const IDLE: u8 = 2;
+const POLLING: u8 = 3;
+const COMPLETED: u8 = 4;
+
+/// Maintains the spawned future with state, so that it can be notified and polled efficiently.
+pub struct SpawnTask {
+ handle: UnsafeCell<Option<SpawnHandle>>,
+ state: AtomicU8,
+ kicker: Kicker,
+ queue: Arc<WorkQueue>,
+}
+
+/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
+///
+/// Sync is required by `ArcWake`.
+unsafe impl Sync for SpawnTask {}
+
+impl SpawnTask {
+ fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
+ SpawnTask {
+ handle: UnsafeCell::new(Some(s)),
+ state: AtomicU8::new(IDLE),
+ kicker,
+ queue,
+ }
+ }
+
+ /// Marks the state of this task to NOTIFIED.
+ ///
+ /// Returns true means the task was IDLE, needs to be scheduled.
+ fn mark_notified(&self) -> bool {
+ loop {
+ match self.state.compare_exchange_weak(
+ IDLE,
+ NOTIFIED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => return true,
+ Err(POLLING) => match self.state.compare_exchange_weak(
+ POLLING,
+ NOTIFIED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Err(IDLE) | Err(POLLING) => continue,
+ // If it succeeds, then executor will poll the future again;
+ // if it fails, then the future should be resolved. In both
+ // cases, no need to notify the future, hence return false.
+ _ => return false,
+ },
+ Err(IDLE) => continue,
+ _ => return false,
+ }
+ }
+ }
+}
+
+pub fn resolve(task: Arc<SpawnTask>, success: bool) {
+ // it should always be canceled for now.
+ assert!(success);
+ poll(task, true);
+}
+
+/// A custom Waker.
+///
+/// It will push the inner future to work_queue if it's notified on the
+/// same thread as inner cq.
+impl ArcWake for SpawnTask {
+ fn wake_by_ref(task: &Arc<Self>) {
+ if !task.mark_notified() {
+ return;
+ }
+
+ // It can lead to deadlock if poll the future immediately. So we need to
+ // defer the work instead.
+ if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
+ match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
+ // If the queue is shutdown, then the tag will be notified
+ // eventually. So just skip here.
+ Err(Error::QueueShutdown) => (),
+ Err(e) => panic!("unexpected error when canceling call: {:?}", e),
+ _ => (),
+ }
+ }
+ }
+}
+
+/// Work that should be deferred to be handled.
+///
+/// Sometimes a work can't be done immediately as it might lead
+/// to resource conflict, deadlock for example. So they will be
+/// pushed into a queue and handled when current work is done.
+pub struct UnfinishedWork(Arc<SpawnTask>);
+
+impl UnfinishedWork {
+ pub fn finish(self) {
+ resolve(self.0, true);
+ }
+}
+
+/// Poll the future.
+///
+/// `woken` indicates that if the cq is waken up by itself.
+fn poll(task: Arc<SpawnTask>, woken: bool) {
+ let mut init_state = if woken { NOTIFIED } else { IDLE };
+ // TODO: maybe we need to break the loop to avoid hunger.
+ loop {
+ match task
+ .state
+ .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
+ {
+ Ok(_) => {}
+ Err(COMPLETED) => return,
+ Err(s) => panic!("unexpected state {}", s),
+ }
+
+ let waker = waker_ref(&task);
+ let mut cx = Context::from_waker(&waker);
+
+ // L208 "lock"s state, hence it's safe to get a mutable reference.
+ match unsafe { &mut *task.handle.get() }
+ .as_mut()
+ .unwrap()
+ .as_mut()
+ .poll(&mut cx)
+ {
+ Poll::Ready(()) => {
+ task.state.store(COMPLETED, Ordering::Release);
+ unsafe { &mut *task.handle.get() }.take();
+ }
+ _ => {
+ match task.state.compare_exchange(
+ POLLING,
+ IDLE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => return,
+ Err(NOTIFIED) => {
+ init_state = NOTIFIED;
+ }
+ Err(s) => panic!("unexpected state {}", s),
+ }
+ }
+ }
+ }
+}
+
+/// An executor that drives a future in the gRPC poll thread, which
+/// can reduce thread context switching.
+pub(crate) struct Executor<'a> {
+ cq: &'a CompletionQueue,
+}
+
+impl<'a> Executor<'a> {
+ pub fn new(cq: &CompletionQueue) -> Executor<'_> {
+ Executor { cq }
+ }
+
+ pub fn cq(&self) -> &CompletionQueue {
+ self.cq
+ }
+
+ /// Spawn the future into inner poll loop.
+ ///
+ /// If you want to trace the future, you may need to create a sender/receiver
+ /// pair by yourself.
+ pub fn spawn<F>(&self, f: F, kicker: Kicker)
+ where
+ F: Future<Output = ()> + Send + 'static,
+ {
+ let s = Box::pin(f);
+ let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
+ poll(notify, false)
+ }
+}
diff --git a/src/task/mod.rs b/src/task/mod.rs
new file mode 100644
index 0000000..f151d0e
--- /dev/null
+++ b/src/task/mod.rs
@@ -0,0 +1,233 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+mod callback;
+mod executor;
+mod promise;
+
+use std::fmt::{self, Debug, Formatter};
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{Context, Poll, Waker};
+use parking_lot::Mutex;
+
+use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
+use self::executor::SpawnTask;
+use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
+use crate::call::server::RequestContext;
+use crate::call::{BatchContext, Call, MessageReader};
+use crate::cq::CompletionQueue;
+use crate::error::{Error, Result};
+use crate::server::RequestCallContext;
+
+pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
+pub use self::promise::BatchType;
+
+/// A handle that is used to notify future that the task finishes.
+pub struct NotifyHandle<T> {
+ result: Option<Result<T>>,
+ waker: Option<Waker>,
+ stale: bool,
+}
+
+impl<T> NotifyHandle<T> {
+ fn new() -> NotifyHandle<T> {
+ NotifyHandle {
+ result: None,
+ waker: None,
+ stale: false,
+ }
+ }
+
+ /// Set the result and notify future if necessary.
+ fn set_result(&mut self, res: Result<T>) -> Option<Waker> {
+ self.result = Some(res);
+
+ self.waker.take()
+ }
+}
+
+type Inner<T> = Mutex<NotifyHandle<T>>;
+
+fn new_inner<T>() -> Arc<Inner<T>> {
+ Arc::new(Mutex::new(NotifyHandle::new()))
+}
+
+/// Get the future status without the need to poll.
+///
+/// If the future is polled successfully, this function will return None.
+/// Not implemented as method as it's only for internal usage.
+pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
+ let guard = f.inner.lock();
+ match guard.result {
+ None => Ok(()),
+ Some(Err(Error::RpcFailure(ref status))) => {
+ Err(Error::RpcFinished(Some(status.to_owned())))
+ }
+ Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
+ }
+}
+
+/// A future object for task that is scheduled to `CompletionQueue`.
+pub struct CqFuture<T> {
+ inner: Arc<Inner<T>>,
+}
+
+impl<T> CqFuture<T> {
+ fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
+ CqFuture { inner }
+ }
+}
+
+impl<T> Future for CqFuture<T> {
+ type Output = Result<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let mut guard = self.inner.lock();
+ if guard.stale {
+ panic!("Resolved future is not supposed to be polled again.");
+ }
+
+ if let Some(res) = guard.result.take() {
+ guard.stale = true;
+ return Poll::Ready(res);
+ }
+
+ // So the task has not been finished yet, add notification hook.
+ if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) {
+ guard.waker = Some(cx.waker().clone());
+ }
+
+ Poll::Pending
+ }
+}
+
+/// Future object for batch jobs.
+pub type BatchFuture = CqFuture<Option<MessageReader>>;
+
+/// A result holder for asynchronous execution.
+// This enum is going to be passed to FFI, so don't use trait or generic here.
+pub enum CallTag {
+ Batch(BatchPromise),
+ Request(RequestCallback),
+ UnaryRequest(UnaryRequestCallback),
+ Abort(Abort),
+ Shutdown(ShutdownPromise),
+ Spawn(Arc<SpawnTask>),
+}
+
+impl CallTag {
+ /// Generate a Future/CallTag pair for batch jobs.
+ pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
+ let inner = new_inner();
+ let batch = BatchPromise::new(ty, inner.clone());
+ (CqFuture::new(inner), CallTag::Batch(batch))
+ }
+
+ /// Generate a CallTag for request job. We don't have an eventloop
+ /// to pull the future, so just the tag is enough.
+ pub fn request(ctx: RequestCallContext) -> CallTag {
+ CallTag::Request(RequestCallback::new(ctx))
+ }
+
+ /// Generate a Future/CallTag pair for shutdown call.
+ pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
+ let inner = new_inner();
+ let shutdown = ShutdownPromise::new(inner.clone());
+ (CqFuture::new(inner), CallTag::Shutdown(shutdown))
+ }
+
+ /// Generate a CallTag for abort call before handler is called.
+ pub fn abort(call: Call) -> CallTag {
+ CallTag::Abort(Abort::new(call))
+ }
+
+ /// Generate a CallTag for unary request job.
+ pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag {
+ let cb = UnaryRequestCallback::new(ctx, rc);
+ CallTag::UnaryRequest(cb)
+ }
+
+ /// Get the batch context from result holder.
+ pub fn batch_ctx(&self) -> Option<&BatchContext> {
+ match *self {
+ CallTag::Batch(ref prom) => Some(prom.context()),
+ CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
+ CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
+ _ => None,
+ }
+ }
+
+ /// Get the request context from the result holder.
+ pub fn request_ctx(&self) -> Option<&RequestContext> {
+ match *self {
+ CallTag::Request(ref prom) => Some(prom.context()),
+ CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
+ _ => None,
+ }
+ }
+
+ /// Resolve the CallTag with given status.
+ pub fn resolve(self, cq: &CompletionQueue, success: bool) {
+ match self {
+ CallTag::Batch(prom) => prom.resolve(success),
+ CallTag::Request(cb) => cb.resolve(cq, success),
+ CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
+ CallTag::Abort(_) => {}
+ CallTag::Shutdown(prom) => prom.resolve(success),
+ CallTag::Spawn(notify) => self::executor::resolve(notify, success),
+ }
+ }
+}
+
+impl Debug for CallTag {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ match *self {
+ CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx),
+ CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
+ CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
+ CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
+ CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
+ CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::mpsc::*;
+ use std::sync::*;
+ use std::thread;
+
+ use super::*;
+ use crate::env::Environment;
+ use futures::executor::block_on;
+
+ #[test]
+ fn test_resolve() {
+ let env = Environment::new(1);
+
+ let (cq_f1, tag1) = CallTag::shutdown_pair();
+ let (cq_f2, tag2) = CallTag::shutdown_pair();
+ let (tx, rx) = mpsc::channel();
+
+ let handler = thread::spawn(move || {
+ tx.send(block_on(cq_f1)).unwrap();
+ tx.send(block_on(cq_f2)).unwrap();
+ });
+
+ assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+ tag1.resolve(&env.pick_cq(), true);
+ assert!(rx.recv().unwrap().is_ok());
+
+ assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+ tag2.resolve(&env.pick_cq(), false);
+ match rx.recv() {
+ Ok(Err(Error::ShutdownFailed)) => {}
+ res => panic!("expect shutdown failed, but got {:?}", res),
+ }
+
+ handler.join().unwrap();
+ }
+}
diff --git a/src/task/promise.rs b/src/task/promise.rs
new file mode 100644
index 0000000..02e9419
--- /dev/null
+++ b/src/task/promise.rs
@@ -0,0 +1,128 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::fmt::{self, Debug, Formatter};
+use std::sync::Arc;
+
+use super::Inner;
+use crate::call::{BatchContext, MessageReader, RpcStatusCode};
+use crate::error::Error;
+
+/// Batch job type.
+#[derive(PartialEq, Debug)]
+pub enum BatchType {
+ /// Finish without reading any message.
+ Finish,
+ /// Extract one message when finish.
+ Read,
+ /// Check the rpc code and then extract one message.
+ CheckRead,
+}
+
+/// A promise used to resolve batch jobs.
+pub struct Batch {
+ ty: BatchType,
+ ctx: BatchContext,
+ inner: Arc<Inner<Option<MessageReader>>>,
+}
+
+impl Batch {
+ pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
+ Batch {
+ ty,
+ ctx: BatchContext::new(),
+ inner,
+ }
+ }
+
+ pub fn context(&self) -> &BatchContext {
+ &self.ctx
+ }
+
+ fn read_one_msg(&mut self, success: bool) {
+ let task = {
+ let mut guard = self.inner.lock();
+ if success {
+ guard.set_result(Ok(self.ctx.recv_message()))
+ } else {
+ // rely on C core to handle the failed read (e.g. deliver approriate
+ // statusCode on the clientside).
+ guard.set_result(Ok(None))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+
+ fn finish_response(&mut self, succeed: bool) {
+ let task = {
+ let mut guard = self.inner.lock();
+ if succeed {
+ let status = self.ctx.rpc_status();
+ if status.status == RpcStatusCode::OK {
+ guard.set_result(Ok(None))
+ } else {
+ guard.set_result(Err(Error::RpcFailure(status)))
+ }
+ } else {
+ guard.set_result(Err(Error::RemoteStopped))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+
+ fn handle_unary_response(&mut self) {
+ let task = {
+ let mut guard = self.inner.lock();
+ let status = self.ctx.rpc_status();
+ if status.status == RpcStatusCode::OK {
+ guard.set_result(Ok(self.ctx.recv_message()))
+ } else {
+ guard.set_result(Err(Error::RpcFailure(status)))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+
+ pub fn resolve(mut self, success: bool) {
+ match self.ty {
+ BatchType::CheckRead => {
+ assert!(success);
+ self.handle_unary_response();
+ }
+ BatchType::Finish => {
+ self.finish_response(success);
+ }
+ BatchType::Read => {
+ self.read_one_msg(success);
+ }
+ }
+ }
+}
+
+impl Debug for Batch {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "Batch [{:?}]", self.ty)
+ }
+}
+
+/// A promise used to resolve async shutdown result.
+pub struct Shutdown {
+ inner: Arc<Inner<()>>,
+}
+
+impl Shutdown {
+ pub fn new(inner: Arc<Inner<()>>) -> Shutdown {
+ Shutdown { inner }
+ }
+
+ pub fn resolve(self, success: bool) {
+ let task = {
+ let mut guard = self.inner.lock();
+ if success {
+ guard.set_result(Ok(()))
+ } else {
+ guard.set_result(Err(Error::ShutdownFailed))
+ }
+ };
+ task.map(|t| t.wake());
+ }
+}