path: root/src/call/mod.rs
diff options
Diffstat (limited to 'src/call/mod.rs')
1 files changed, 686 insertions, 0 deletions
diff --git a/src/call/mod.rs b/src/call/mod.rs
new file mode 100644
index 0000000..03e520a
--- /dev/null
+++ b/src/call/mod.rs
@@ -0,0 +1,686 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+pub mod client;
+pub mod server;
+use std::fmt::{self, Debug, Display};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::{ptr, slice};
+use crate::cq::CompletionQueue;
+use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
+use futures::future::Future;
+use futures::ready;
+use futures::task::{Context, Poll};
+use libc::c_void;
+use parking_lot::Mutex;
+use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader};
+use crate::codec::{DeserializeFn, Marshaller, SerializeFn};
+use crate::error::{Error, Result};
+use crate::grpc_sys::grpc_status_code::*;
+use crate::task::{self, BatchFuture, BatchType, CallTag};
+// By default buffers in `SinkBase` will be shrink to 4K size.
+const BUF_SHRINK_SIZE: usize = 4 * 1024;
+/// An gRPC status code structure.
+/// This type contains constants for all gRPC status codes.
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub struct RpcStatusCode(i32);
+impl From<i32> for RpcStatusCode {
+ fn from(code: i32) -> RpcStatusCode {
+ RpcStatusCode(code)
+ }
+impl Into<i32> for RpcStatusCode {
+ fn into(self) -> i32 {
+ self.0
+ }
+impl Display for RpcStatusCode {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ Debug::fmt(self, f)
+ }
+macro_rules! status_codes {
+ (
+ $(
+ ($num:path, $konst:ident);
+ )+
+ ) => {
+ impl RpcStatusCode {
+ $(
+ pub const $konst: RpcStatusCode = RpcStatusCode($num);
+ )+
+ }
+ impl Debug for RpcStatusCode {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "{}-{}",
+ self.0,
+ match self {
+ $(RpcStatusCode($num) => stringify!($konst),)+
+ RpcStatusCode(_) => "INVALID_STATUS_CODE",
+ }
+ )
+ }
+ }
+ }
+status_codes! {
+/// Method types supported by gRPC.
+#[derive(Clone, Copy)]
+pub enum MethodType {
+ /// Single request sent from client, single response received from server.
+ Unary,
+ /// Stream of requests sent from client, single response received from server.
+ ClientStreaming,
+ /// Single request sent from client, stream of responses received from server.
+ ServerStreaming,
+ /// Both server and client can stream arbitrary number of requests and responses simultaneously.
+ Duplex,
+/// A description of a remote method.
+// TODO: add serializer and deserializer.
+pub struct Method<Req, Resp> {
+ /// Type of method.
+ pub ty: MethodType,
+ /// Full qualified name of the method.
+ pub name: &'static str,
+ /// The marshaller used for request messages.
+ pub req_mar: Marshaller<Req>,
+ /// The marshaller used for response messages.
+ pub resp_mar: Marshaller<Resp>,
+impl<Req, Resp> Method<Req, Resp> {
+ /// Get the request serializer.
+ #[inline]
+ pub fn req_ser(&self) -> SerializeFn<Req> {
+ self.req_mar.ser
+ }
+ /// Get the request deserializer.
+ #[inline]
+ pub fn req_de(&self) -> DeserializeFn<Req> {
+ self.req_mar.de
+ }
+ /// Get the response serializer.
+ #[inline]
+ pub fn resp_ser(&self) -> SerializeFn<Resp> {
+ self.resp_mar.ser
+ }
+ /// Get the response deserializer.
+ #[inline]
+ pub fn resp_de(&self) -> DeserializeFn<Resp> {
+ self.resp_mar.de
+ }
+/// RPC result returned from the server.
+#[derive(Debug, Clone)]
+pub struct RpcStatus {
+ /// gRPC status code. `Ok` indicates success, all other values indicate an error.
+ pub status: RpcStatusCode,
+ /// Optional detail string.
+ pub details: Option<String>,
+impl Display for RpcStatus {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ Debug::fmt(self, fmt)
+ }
+impl RpcStatus {
+ /// Create a new [`RpcStatus`].
+ pub fn new<T: Into<RpcStatusCode>>(code: T, details: Option<String>) -> RpcStatus {
+ RpcStatus {
+ status: code.into(),
+ details,
+ }
+ }
+ /// Create a new [`RpcStatus`] that status code is Ok.
+ pub fn ok() -> RpcStatus {
+ RpcStatus::new(RpcStatusCode::OK, None)
+ }
+pub type MessageReader = GrpcByteBufferReader;
+/// Context for batch request.
+pub struct BatchContext {
+ ctx: *mut grpcwrap_batch_context,
+impl BatchContext {
+ pub fn new() -> BatchContext {
+ BatchContext {
+ ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() },
+ }
+ }
+ pub fn as_ptr(&self) -> *mut grpcwrap_batch_context {
+ self.ctx
+ }
+ pub fn take_recv_message(&self) -> Option<GrpcByteBuffer> {
+ let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) };
+ if ptr.is_null() {
+ None
+ } else {
+ Some(unsafe { GrpcByteBuffer::from_raw(ptr) })
+ }
+ }
+ /// Get the status of the rpc call.
+ pub fn rpc_status(&self) -> RpcStatus {
+ let status = RpcStatusCode(unsafe {
+ grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx)
+ });
+ let details = if status == RpcStatusCode::OK {
+ None
+ } else {
+ unsafe {
+ let mut details_len = 0;
+ let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details(
+ self.ctx,
+ &mut details_len,
+ );
+ let details_slice = slice::from_raw_parts(details_ptr as *const _, details_len);
+ Some(String::from_utf8_lossy(details_slice).into_owned())
+ }
+ };
+ RpcStatus::new(status, details)
+ }
+ /// Fetch the response bytes of the rpc call.
+ pub fn recv_message(&mut self) -> Option<MessageReader> {
+ let buf = self.take_recv_message()?;
+ Some(GrpcByteBufferReader::new(buf))
+ }
+impl Drop for BatchContext {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) }
+ }
+fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut c_void) {
+ let tag_box = Box::new(tag);
+ (
+ tag_box.batch_ctx().unwrap().as_ptr(),
+ Box::into_raw(tag_box) as _,
+ )
+/// A helper function that runs the batch call and checks the result.
+fn check_run<F>(bt: BatchType, f: F) -> BatchFuture
+ F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error,
+ let (cq_f, tag) = CallTag::batch_pair(bt);
+ let (batch_ptr, tag_ptr) = box_batch_tag(tag);
+ let code = f(batch_ptr, tag_ptr);
+ if code != grpc_call_error::GRPC_CALL_OK {
+ unsafe {
+ Box::from_raw(tag_ptr);
+ }
+ panic!("create call fail: {:?}", code);
+ }
+ cq_f
+/// A Call represents an RPC.
+/// When created, it is in a configuration state allowing properties to be
+/// set until it is invoked. After invoke, the Call can have messages
+/// written to it and read from it.
+pub struct Call {
+ pub call: *mut grpc_call,
+ pub cq: CompletionQueue,
+unsafe impl Send for Call {}
+impl Call {
+ pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call {
+ assert!(!call.is_null());
+ Call { call, cq }
+ }
+ /// Send a message asynchronously.
+ pub fn start_send_message(
+ &mut self,
+ msg: &[u8],
+ write_flags: u32,
+ initial_meta: bool,
+ ) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let i = if initial_meta { 1 } else { 0 };
+ let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_send_message(
+ self.call,
+ ctx,
+ msg.as_ptr() as _,
+ msg.len(),
+ write_flags,
+ i,
+ tag,
+ )
+ });
+ Ok(f)
+ }
+ /// Finish the rpc call from client.
+ pub fn start_send_close_client(&mut self) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let f = check_run(BatchType::Finish, |_, tag| unsafe {
+ grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag)
+ });
+ Ok(f)
+ }
+ /// Receive a message asynchronously.
+ pub fn start_recv_message(&mut self) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let f = check_run(BatchType::Read, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag)
+ });
+ Ok(f)
+ }
+ /// Start handling from server side.
+ ///
+ /// Future will finish once close is received by the server.
+ pub fn start_server_side(&mut self) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag)
+ });
+ Ok(f)
+ }
+ /// Send a status from server.
+ pub fn start_send_status_from_server(
+ &mut self,
+ status: &RpcStatus,
+ send_empty_metadata: bool,
+ payload: &Option<Vec<u8>>,
+ write_flags: u32,
+ ) -> Result<BatchFuture> {
+ let _cq_ref = self.cq.borrow()?;
+ let send_empty_metadata = if send_empty_metadata { 1 } else { 0 };
+ let (payload_ptr, payload_len) = payload
+ .as_ref()
+ .map_or((ptr::null(), 0), |b| (b.as_ptr(), b.len()));
+ let f = check_run(BatchType::Finish, |ctx, tag| unsafe {
+ let details_ptr = status
+ .details
+ .as_ref()
+ .map_or_else(ptr::null, |s| s.as_ptr() as _);
+ let details_len = status.details.as_ref().map_or(0, String::len);
+ grpc_sys::grpcwrap_call_send_status_from_server(
+ self.call,
+ ctx,
+ status.status.into(),
+ details_ptr,
+ details_len,
+ ptr::null_mut(),
+ send_empty_metadata,
+ payload_ptr as _,
+ payload_len,
+ write_flags,
+ tag,
+ )
+ });
+ Ok(f)
+ }
+ /// Abort an rpc call before handler is called.
+ pub fn abort(self, status: &RpcStatus) {
+ match self.cq.borrow() {
+ // Queue is shutdown, ignore.
+ Err(Error::QueueShutdown) => return,
+ Err(e) => panic!("unexpected error when aborting call: {:?}", e),
+ _ => {}
+ }
+ let call_ptr = self.call;
+ let tag = CallTag::abort(self);
+ let (batch_ptr, tag_ptr) = box_batch_tag(tag);
+ let code = unsafe {
+ let details_ptr = status
+ .details
+ .as_ref()
+ .map_or_else(ptr::null, |s| s.as_ptr() as _);
+ let details_len = status.details.as_ref().map_or(0, String::len);
+ grpc_sys::grpcwrap_call_send_status_from_server(
+ call_ptr,
+ batch_ptr,
+ status.status.into(),
+ details_ptr,
+ details_len,
+ ptr::null_mut(),
+ 1,
+ ptr::null(),
+ 0,
+ 0,
+ tag_ptr as *mut c_void,
+ )
+ };
+ if code != grpc_call_error::GRPC_CALL_OK {
+ unsafe {
+ Box::from_raw(tag_ptr);
+ }
+ panic!("create call fail: {:?}", code);
+ }
+ }
+ /// Cancel the rpc call by client.
+ fn cancel(&self) {
+ match self.cq.borrow() {
+ // Queue is shutdown, ignore.
+ Err(Error::QueueShutdown) => return,
+ Err(e) => panic!("unexpected error when canceling call: {:?}", e),
+ _ => {}
+ }
+ unsafe {
+ grpc_sys::grpc_call_cancel(self.call, ptr::null_mut());
+ }
+ }
+impl Drop for Call {
+ fn drop(&mut self) {
+ unsafe { grpc_sys::grpc_call_unref(self.call) }
+ }
+/// A share object for client streaming and duplex streaming call.
+/// In both cases, receiver and sender can be polled in the same time,
+/// hence we need to share the call in the both sides and abort the sink
+/// once the call is canceled or finished early.
+struct ShareCall {
+ call: Call,
+ close_f: BatchFuture,
+ finished: bool,
+ status: Option<RpcStatus>,
+impl ShareCall {
+ fn new(call: Call, close_f: BatchFuture) -> ShareCall {
+ ShareCall {
+ call,
+ close_f,
+ finished: false,
+ status: None,
+ }
+ }
+ /// Poll if the call is still alive.
+ ///
+ /// If the call is still running, will register a notification for its completion.
+ fn poll_finish(&mut self, cx: &mut Context) -> Poll<Result<Option<MessageReader>>> {
+ let res = match Pin::new(&mut self.close_f).poll(cx) {
+ Poll::Ready(Ok(reader)) => {
+ self.status = Some(RpcStatus::ok());
+ Poll::Ready(Ok(reader))
+ }
+ Poll::Pending => return Poll::Pending,
+ Poll::Ready(Err(Error::RpcFailure(status))) => {
+ self.status = Some(status.clone());
+ Poll::Ready(Err(Error::RpcFailure(status)))
+ }
+ res => res,
+ };
+ self.finished = true;
+ res
+ }
+ /// Check if the call is finished.
+ fn check_alive(&mut self) -> Result<()> {
+ if self.finished {
+ // maybe can just take here.
+ return Err(Error::RpcFinished(self.status.clone()));
+ }
+ task::check_alive(&self.close_f)
+ }
+/// A helper trait that allows executing function on the internal `ShareCall` struct.
+trait ShareCallHolder {
+ fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R;
+impl ShareCallHolder for ShareCall {
+ fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
+ f(self)
+ }
+impl ShareCallHolder for Arc<Mutex<ShareCall>> {
+ fn call<R, F: FnOnce(&mut ShareCall) -> R>(&mut self, f: F) -> R {
+ let mut call = self.lock();
+ f(&mut call)
+ }
+/// A helper struct for constructing Stream object for batch requests.
+struct StreamingBase {
+ close_f: Option<BatchFuture>,
+ msg_f: Option<BatchFuture>,
+ read_done: bool,
+impl StreamingBase {
+ fn new(close_f: Option<BatchFuture>) -> StreamingBase {
+ StreamingBase {
+ close_f,
+ msg_f: None,
+ read_done: false,
+ }
+ }
+ fn poll<C: ShareCallHolder>(
+ &mut self,
+ cx: &mut Context,
+ call: &mut C,
+ skip_finish_check: bool,
+ ) -> Poll<Option<Result<MessageReader>>> {
+ if !skip_finish_check {
+ let mut finished = false;
+ if let Some(close_f) = &mut self.close_f {
+ if let Poll::Ready(_) = Pin::new(close_f).poll(cx)? {
+ // Don't return immediately, there may be pending data.
+ finished = true;
+ }
+ }
+ if finished {
+ self.close_f.take();
+ }
+ }
+ let mut bytes = None;
+ if !self.read_done {
+ if let Some(msg_f) = &mut self.msg_f {
+ bytes = ready!(Pin::new(msg_f).poll(cx)?);
+ if bytes.is_none() {
+ self.read_done = true;
+ }
+ }
+ }
+ if self.read_done {
+ if self.close_f.is_none() {
+ return Poll::Ready(None);
+ }
+ return Poll::Pending;
+ }
+ // so msg_f must be either stale or not initialized yet.
+ self.msg_f.take();
+ let msg_f = call.call(|c| c.call.start_recv_message())?;
+ self.msg_f = Some(msg_f);
+ if bytes.is_none() {
+ self.poll(cx, call, true)
+ } else {
+ Poll::Ready(bytes.map(Ok))
+ }
+ }
+ // Cancel the call if we still have some messages or did not
+ // receive status code.
+ fn on_drop<C: ShareCallHolder>(&self, call: &mut C) {
+ if !self.read_done || self.close_f.is_some() {
+ call.call(|c| c.call.cancel());
+ }
+ }
+/// Flags for write operations.
+#[derive(Default, Clone, Copy)]
+pub struct WriteFlags {
+ flags: u32,
+impl WriteFlags {
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ ///
+ /// gRPC is free to buffer the message until the next non-buffered write, or until write stream
+ /// completion, but it need not buffer completely or at all.
+ pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags {
+ client::change_flag(
+ &mut self.flags,
+ need_buffered,
+ );
+ self
+ }
+ /// Force compression to be disabled.
+ pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags {
+ client::change_flag(
+ &mut self.flags,
+ no_compress,
+ );
+ self
+ }
+ /// Get whether buffer hint is enabled.
+ pub fn get_buffer_hint(self) -> bool {
+ (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0
+ }
+ /// Get whether compression is disabled.
+ pub fn get_force_no_compress(self) -> bool {
+ (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0
+ }
+/// A helper struct for constructing Sink object for batch requests.
+struct SinkBase {
+ batch_f: Option<BatchFuture>,
+ buf: Vec<u8>,
+ send_metadata: bool,
+impl SinkBase {
+ fn new(send_metadata: bool) -> SinkBase {
+ SinkBase {
+ batch_f: None,
+ buf: Vec::new(),
+ send_metadata,
+ }
+ }
+ fn start_send<T, C: ShareCallHolder>(
+ &mut self,
+ call: &mut C,
+ t: &T,
+ mut flags: WriteFlags,
+ ser: SerializeFn<T>,
+ ) -> Result<()> {
+ // `start_send` is supposed to be called after `poll_ready` returns ready.
+ assert!(self.batch_f.is_none());
+ self.buf.clear();
+ ser(t, &mut self.buf);
+ if flags.get_buffer_hint() && self.send_metadata {
+ // temporary fix: buffer hint with send meta will not send out any metadata.
+ flags = flags.buffer_hint(false);
+ }
+ let write_f = call.call(|c| {
+ c.call
+ .start_send_message(&self.buf, flags.flags, self.send_metadata)
+ })?;
+ // NOTE: Content of `self.buf` is copied into grpc internal.
+ if self.buf.capacity() > BUF_SHRINK_SIZE {
+ self.buf.truncate(BUF_SHRINK_SIZE);
+ self.buf.shrink_to_fit();
+ }
+ self.batch_f = Some(write_f);
+ self.send_metadata = false;
+ Ok(())
+ }
+ #[inline]
+ fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<()>> {
+ match &mut self.batch_f {
+ None => return Poll::Ready(Ok(())),
+ Some(f) => {
+ ready!(Pin::new(f).poll(cx)?);
+ }
+ }
+ self.batch_f.take();
+ Poll::Ready(Ok(()))
+ }