diff options
Diffstat (limited to 'src/call/server.rs')
-rw-r--r-- | src/call/server.rs | 39 |
1 files changed, 27 insertions, 12 deletions
diff --git a/src/call/server.rs b/src/call/server.rs index 8875d6d..add9874 100644 --- a/src/call/server.rs +++ b/src/call/server.rs @@ -17,6 +17,7 @@ use parking_lot::Mutex; use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags}; use crate::auth_context::AuthContext; +use crate::buf::GrpcSlice; use crate::call::{ BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase, }; @@ -322,7 +323,7 @@ macro_rules! impl_unary_sink { $t { call: Some(call), write_flags: 0, - ser: ser, + ser, } } @@ -335,8 +336,8 @@ macro_rules! impl_unary_sink { } fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt { - let data = t.as_ref().map(|t| { - let mut buf = vec![]; + let mut data = t.as_ref().map(|t| { + let mut buf = GrpcSlice::default(); (self.ser)(t, &mut buf); buf }); @@ -344,7 +345,7 @@ macro_rules! impl_unary_sink { let write_flags = self.write_flags; let res = self.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(&status, true, &data, write_flags) + .start_send_status_from_server(&status, true, &mut data, write_flags) }); let (cq_f, err) = match res { @@ -354,8 +355,8 @@ macro_rules! impl_unary_sink { $rt { call: self.call.take().unwrap(), - cq_f: cq_f, - err: err, + cq_f, + err, } } } @@ -420,10 +421,23 @@ macro_rules! impl_stream_sink { status: RpcStatus::ok(), flushed: false, closed: false, - ser: ser, + ser, } } + /// By default it always sends messages with their configured buffer hint. But when the + /// `enhance_batch` is enabled, messages will be batched together as many as possible. + /// The rules are listed as below: + /// - All messages except the last one will be sent with `buffer_hint` set to true. + /// - The last message will also be sent with `buffer_hint` set to true unless any message is + /// offered with buffer hint set to false. + /// + /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of + /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core. + pub fn enhance_batch(&mut self, flag: bool) { + self.base.enhance_buffer_strategy = flag; + } + pub fn set_status(&mut self, status: RpcStatus) { assert!(self.flush_f.is_none()); self.status = status; @@ -434,7 +448,7 @@ macro_rules! impl_stream_sink { let send_metadata = self.base.send_metadata; let res = self.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(&status, send_metadata, &None, 0) + .start_send_status_from_server(&status, send_metadata, &mut None, 0) }); let (fail_f, err) = match res { @@ -444,8 +458,8 @@ macro_rules! impl_stream_sink { $ft { call: self.call.take().unwrap(), - fail_f: fail_f, - err: err, + fail_f, + err, } } } @@ -487,7 +501,8 @@ macro_rules! impl_stream_sink { if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? { return Poll::Ready(Err(Error::RemoteStopped)); } - Pin::new(&mut self.base).poll_ready(cx) + let t = &mut *self; + Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap()) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> { @@ -499,7 +514,7 @@ macro_rules! impl_stream_sink { let status = &t.status; let flush_f = t.call.as_mut().unwrap().call(|c| { c.call - .start_send_status_from_server(status, send_metadata, &None, 0) + .start_send_status_from_server(status, send_metadata, &mut None, 0) })?; t.flush_f = Some(flush_f); } |