// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::call::client::{ CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver, ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, }; use crate::call::{Call, Method}; use crate::channel::Channel; use crate::error::Result; use crate::task::Executor; use crate::task::Kicker; use futures::executor::block_on; use futures::Future; /// A generic client for making RPC calls. #[derive(Clone)] pub struct Client { channel: Channel, // Used to kick its completion queue. kicker: Kicker, } impl Client { /// Initialize a new [`Client`]. pub fn new(channel: Channel) -> Client { let kicker = channel.create_kicker().unwrap(); Client { channel, kicker } } /// Create a synchronized unary RPC call. /// /// It uses futures::executor::block_on to wait for the futures. It's recommended to use /// the asynchronous version. pub fn unary_call( &self, method: &Method, req: &Req, opt: CallOption, ) -> Result { block_on(self.unary_call_async(method, req, opt)?) } /// Create an asynchronized unary RPC call. pub fn unary_call_async( &self, method: &Method, req: &Req, opt: CallOption, ) -> Result> { Call::unary_async(&self.channel, method, req, opt) } /// Create an asynchronized client streaming call. /// /// Client can send a stream of requests and server responds with a single response. pub fn client_streaming( &self, method: &Method, opt: CallOption, ) -> Result<(ClientCStreamSender, ClientCStreamReceiver)> { Call::client_streaming(&self.channel, method, opt) } /// Create an asynchronized server streaming call. /// /// Client sends on request and server responds with a stream of responses. pub fn server_streaming( &self, method: &Method, req: &Req, opt: CallOption, ) -> Result> { Call::server_streaming(&self.channel, method, req, opt) } /// Create an asynchronized duplex streaming call. /// /// Client sends a stream of requests and server responds with a stream of responses. /// The response stream is completely independent and both side can be sending messages /// at the same time. pub fn duplex_streaming( &self, method: &Method, opt: CallOption, ) -> Result<(ClientDuplexSender, ClientDuplexReceiver)> { Call::duplex_streaming(&self.channel, method, opt) } /// Spawn the future into current gRPC poll thread. /// /// This can reduce a lot of context switching, but please make /// sure there is no heavy work in the future. pub fn spawn(&self, f: F) where F: Future + Send + 'static, { let kicker = self.kicker.clone(); Executor::new(self.channel.cq()).spawn(f, kicker) } }