// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::pin::Pin; use std::ptr; use std::sync::Arc; use std::time::Duration; use crate::grpc_sys; use futures::ready; use futures::sink::Sink; use futures::stream::Stream; use futures::task::{Context, Poll}; use parking_lot::Mutex; use std::future::Future; use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags}; use crate::buf::GrpcSlice; use crate::call::{check_run, Call, MessageReader, Method}; use crate::channel::Channel; use crate::codec::{DeserializeFn, SerializeFn}; use crate::error::{Error, Result}; use crate::metadata::Metadata; use crate::task::{BatchFuture, BatchType}; /// Update the flag bit in res. #[inline] pub fn change_flag(res: &mut u32, flag: u32, set: bool) { if set { *res |= flag; } else { *res &= !flag; } } /// Options for calls made by client. #[derive(Clone, Default)] pub struct CallOption { timeout: Option, write_flags: WriteFlags, call_flags: u32, headers: Option, } impl CallOption { /// Signal that the call is idempotent. pub fn idempotent(mut self, is_idempotent: bool) -> CallOption { change_flag( &mut self.call_flags, grpc_sys::GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST, is_idempotent, ); self } /// Signal that the call should not return UNAVAILABLE before it has started. pub fn wait_for_ready(mut self, wait_for_ready: bool) -> CallOption { change_flag( &mut self.call_flags, grpc_sys::GRPC_INITIAL_METADATA_WAIT_FOR_READY, wait_for_ready, ); self } /// Signal that the call is cacheable. gRPC is free to use GET verb. pub fn cacheable(mut self, cacheable: bool) -> CallOption { change_flag( &mut self.call_flags, grpc_sys::GRPC_INITIAL_METADATA_CACHEABLE_REQUEST, cacheable, ); self } /// Set write flags. pub fn write_flags(mut self, write_flags: WriteFlags) -> CallOption { self.write_flags = write_flags; self } /// Set a timeout. pub fn timeout(mut self, timeout: Duration) -> CallOption { self.timeout = Some(timeout); self } /// Get the timeout. pub fn get_timeout(&self) -> Option { self.timeout } /// Set the headers to be sent with the call. pub fn headers(mut self, meta: Metadata) -> CallOption { self.headers = Some(meta); self } /// Get headers to be sent with the call. pub fn get_headers(&self) -> Option<&Metadata> { self.headers.as_ref() } } impl Call { pub fn unary_async( channel: &Channel, method: &Method, req: &Req, mut opt: CallOption, ) -> Result> { let call = channel.create_call(method, &opt)?; let mut payload = GrpcSlice::default(); (method.req_ser())(req, &mut payload); let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_unary( call.call, ctx, payload.as_mut_ptr(), opt.write_flags.flags, opt.headers .as_mut() .map_or_else(ptr::null_mut, |c| c as *mut _ as _), opt.call_flags, tag, ) }); Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de())) } pub fn client_streaming( channel: &Channel, method: &Method, mut opt: CallOption, ) -> Result<(ClientCStreamSender, ClientCStreamReceiver)> { let call = channel.create_call(method, &opt)?; let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_client_streaming( call.call, ctx, opt.headers .as_mut() .map_or_else(ptr::null_mut, |c| c as *mut _ as _), opt.call_flags, tag, ) }); let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f))); let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser()); let recv = ClientCStreamReceiver { call: share_call, resp_de: method.resp_de(), finished: false, }; Ok((sink, recv)) } pub fn server_streaming( channel: &Channel, method: &Method, req: &Req, mut opt: CallOption, ) -> Result> { let call = channel.create_call(method, &opt)?; let mut payload = GrpcSlice::default(); (method.req_ser())(req, &mut payload); let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_server_streaming( call.call, ctx, payload.as_mut_ptr(), opt.write_flags.flags, opt.headers .as_mut() .map_or_else(ptr::null_mut, |c| c as *mut _ as _), opt.call_flags, tag, ) }); // TODO: handle header check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag) }); Ok(ClientSStreamReceiver::new(call, cq_f, method.resp_de())) } pub fn duplex_streaming( channel: &Channel, method: &Method, mut opt: CallOption, ) -> Result<(ClientDuplexSender, ClientDuplexReceiver)> { let call = channel.create_call(method, &opt)?; let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_duplex_streaming( call.call, ctx, opt.headers .as_mut() .map_or_else(ptr::null_mut, |c| c as *mut _ as _), opt.call_flags, tag, ) }); // TODO: handle header. check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag) }); let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f))); let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser()); let recv = ClientDuplexReceiver::new(share_call, method.resp_de()); Ok((sink, recv)) } } /// A receiver for unary request. /// /// The future is resolved once response is received. #[must_use = "if unused the ClientUnaryReceiver may immediately cancel the RPC"] pub struct ClientUnaryReceiver { call: Call, resp_f: BatchFuture, resp_de: DeserializeFn, } impl ClientUnaryReceiver { fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn) -> ClientUnaryReceiver { ClientUnaryReceiver { call, resp_f, resp_de, } } /// Cancel the call. #[inline] pub fn cancel(&mut self) { self.call.cancel() } #[inline] pub fn resp_de(&self, reader: MessageReader) -> Result { (self.resp_de)(reader) } } impl Future for ClientUnaryReceiver { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let data = ready!(Pin::new(&mut self.resp_f).poll(cx)?); let t = self.resp_de(data.unwrap())?; Poll::Ready(Ok(t)) } } /// A receiver for client streaming call. /// /// If the corresponding sink has dropped or cancelled, this will poll a /// [`RpcFailure`] error with the [`Cancelled`] status. /// /// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure /// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled #[must_use = "if unused the ClientCStreamReceiver may immediately cancel the RPC"] pub struct ClientCStreamReceiver { call: Arc>, resp_de: DeserializeFn, finished: bool, } impl ClientCStreamReceiver { /// Cancel the call. pub fn cancel(&mut self) { let lock = self.call.lock(); lock.call.cancel() } #[inline] pub fn resp_de(&self, reader: MessageReader) -> Result { (self.resp_de)(reader) } } impl Drop for ClientCStreamReceiver { /// The corresponding RPC will be canceled if the receiver did not /// finish before dropping. fn drop(&mut self) { if !self.finished { self.cancel(); } } } impl Future for ClientCStreamReceiver { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let data = { let mut call = self.call.lock(); ready!(call.poll_finish(cx)?) }; let t = (self.resp_de)(data.unwrap())?; self.finished = true; Poll::Ready(Ok(t)) } } /// A sink for client streaming call and duplex streaming call. /// To close the sink properly, you should call [`close`] before dropping. /// /// [`close`]: #method.close #[must_use = "if unused the StreamingCallSink may immediately cancel the RPC"] pub struct StreamingCallSink { call: Arc>, sink_base: SinkBase, close_f: Option, req_ser: SerializeFn, } impl StreamingCallSink { fn new(call: Arc>, req_ser: SerializeFn) -> StreamingCallSink { StreamingCallSink { call, sink_base: SinkBase::new(false), close_f: None, req_ser, } } /// By default it always sends messages with their configured buffer hint. But when the /// `enhance_batch` is enabled, messages will be batched together as many as possible. /// The rules are listed as below: /// - All messages except the last one will be sent with `buffer_hint` set to true. /// - The last message will also be sent with `buffer_hint` set to true unless any message is /// offered with buffer hint set to false. /// /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core. pub fn enhance_batch(&mut self, flag: bool) { self.sink_base.enhance_buffer_strategy = flag; } pub fn cancel(&mut self) { let call = self.call.lock(); call.call.cancel() } } impl

Drop for StreamingCallSink

{ /// The corresponding RPC will be canceled if the sink did not call /// [`close`] before dropping. /// /// [`close`]: #method.close fn drop(&mut self) { if self.close_f.is_none() { self.cancel(); } } } impl Sink<(Req, WriteFlags)> for StreamingCallSink { type Error = Error; #[inline] fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.sink_base).poll_ready(cx) } #[inline] fn start_send(mut self: Pin<&mut Self>, (msg, flags): (Req, WriteFlags)) -> Result<()> { { let mut call = self.call.lock(); call.check_alive()?; } let t = &mut *self; Pin::new(&mut t.sink_base).start_send(&mut t.call, &msg, flags, t.req_ser) } #[inline] fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { { let mut call = self.call.lock(); call.check_alive()?; } let t = &mut *self; Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let t = &mut *self; let mut call = t.call.lock(); if t.close_f.is_none() { ready!(Pin::new(&mut t.sink_base).poll_ready(cx)?); let close_f = call.call.start_send_close_client()?; t.close_f = Some(close_f); } if Pin::new(t.close_f.as_mut().unwrap()).poll(cx)?.is_pending() { // if call is finished, can return early here. call.check_alive()?; return Poll::Pending; } Poll::Ready(Ok(())) } } /// A sink for client streaming call. /// /// To close the sink properly, you should call [`close`] before dropping. /// /// [`close`]: #method.close pub type ClientCStreamSender = StreamingCallSink; /// A sink for duplex streaming call. /// /// To close the sink properly, you should call [`close`] before dropping. /// /// [`close`]: #method.close pub type ClientDuplexSender = StreamingCallSink; struct ResponseStreamImpl { call: H, msg_f: Option, read_done: bool, finished: bool, resp_de: DeserializeFn, } impl ResponseStreamImpl { fn new(call: H, resp_de: DeserializeFn) -> ResponseStreamImpl { ResponseStreamImpl { call, msg_f: None, read_done: false, finished: false, resp_de, } } fn cancel(&mut self) { self.call.call(|c| c.call.cancel()) } fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { if !self.finished { let t = &mut *self; let finished = &mut t.finished; let _ = t.call.call(|c| { let res = c.poll_finish(cx); *finished = c.finished; res })?; } let mut bytes = None; loop { if !self.read_done { if let Some(msg_f) = &mut self.msg_f { bytes = ready!(Pin::new(msg_f).poll(cx)?); if bytes.is_none() { self.read_done = true; } } } if self.read_done { if self.finished { return Poll::Ready(None); } return Poll::Pending; } // so msg_f must be either stale or not initialised yet. self.msg_f.take(); let msg_f = self.call.call(|c| c.call.start_recv_message())?; self.msg_f = Some(msg_f); if let Some(data) = bytes { let msg = (self.resp_de)(data)?; return Poll::Ready(Some(Ok(msg))); } } } // Cancel the call if we still have some messages or did not // receive status code. fn on_drop(&mut self) { if !self.read_done || !self.finished { self.cancel(); } } } /// A receiver for server streaming call. #[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"] pub struct ClientSStreamReceiver { imp: ResponseStreamImpl, } impl ClientSStreamReceiver { fn new( call: Call, finish_f: BatchFuture, de: DeserializeFn, ) -> ClientSStreamReceiver { let share_call = ShareCall::new(call, finish_f); ClientSStreamReceiver { imp: ResponseStreamImpl::new(share_call, de), } } pub fn cancel(&mut self) { self.imp.cancel() } } impl Stream for ClientSStreamReceiver { type Item = Result; #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.imp).poll(cx) } } /// A response receiver for duplex call. /// /// If the corresponding sink has dropped or cancelled, this will poll a /// [`RpcFailure`] error with the [`Cancelled`] status. /// /// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure /// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled #[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"] pub struct ClientDuplexReceiver { imp: ResponseStreamImpl>, Resp>, } impl ClientDuplexReceiver { fn new(call: Arc>, de: DeserializeFn) -> ClientDuplexReceiver { ClientDuplexReceiver { imp: ResponseStreamImpl::new(call, de), } } pub fn cancel(&mut self) { self.imp.cancel() } } impl Drop for ClientDuplexReceiver { /// The corresponding RPC will be canceled if the receiver did not /// finish before dropping. fn drop(&mut self) { self.imp.on_drop() } } impl Stream for ClientDuplexReceiver { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.imp).poll(cx) } } #[cfg(test)] mod tests { #[test] fn test_change_flag() { let mut flag = 2 | 4; super::change_flag(&mut flag, 8, true); assert_eq!(flag, 2 | 4 | 8); super::change_flag(&mut flag, 4, false); assert_eq!(flag, 2 | 8); } }