aboutsummaryrefslogtreecommitdiff
path: root/src/call/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/call/server.rs')
-rw-r--r--src/call/server.rs39
1 files changed, 27 insertions, 12 deletions
diff --git a/src/call/server.rs b/src/call/server.rs
index 8875d6d..add9874 100644
--- a/src/call/server.rs
+++ b/src/call/server.rs
@@ -17,6 +17,7 @@ use parking_lot::Mutex;
use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
use crate::auth_context::AuthContext;
+use crate::buf::GrpcSlice;
use crate::call::{
BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
};
@@ -322,7 +323,7 @@ macro_rules! impl_unary_sink {
$t {
call: Some(call),
write_flags: 0,
- ser: ser,
+ ser,
}
}
@@ -335,8 +336,8 @@ macro_rules! impl_unary_sink {
}
fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
- let data = t.as_ref().map(|t| {
- let mut buf = vec![];
+ let mut data = t.as_ref().map(|t| {
+ let mut buf = GrpcSlice::default();
(self.ser)(t, &mut buf);
buf
});
@@ -344,7 +345,7 @@ macro_rules! impl_unary_sink {
let write_flags = self.write_flags;
let res = self.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(&status, true, &data, write_flags)
+ .start_send_status_from_server(&status, true, &mut data, write_flags)
});
let (cq_f, err) = match res {
@@ -354,8 +355,8 @@ macro_rules! impl_unary_sink {
$rt {
call: self.call.take().unwrap(),
- cq_f: cq_f,
- err: err,
+ cq_f,
+ err,
}
}
}
@@ -420,10 +421,23 @@ macro_rules! impl_stream_sink {
status: RpcStatus::ok(),
flushed: false,
closed: false,
- ser: ser,
+ ser,
}
}
+ /// By default it always sends messages with their configured buffer hint. But when the
+ /// `enhance_batch` is enabled, messages will be batched together as many as possible.
+ /// The rules are listed as below:
+ /// - All messages except the last one will be sent with `buffer_hint` set to true.
+ /// - The last message will also be sent with `buffer_hint` set to true unless any message is
+ /// offered with buffer hint set to false.
+ ///
+ /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
+ /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
+ pub fn enhance_batch(&mut self, flag: bool) {
+ self.base.enhance_buffer_strategy = flag;
+ }
+
pub fn set_status(&mut self, status: RpcStatus) {
assert!(self.flush_f.is_none());
self.status = status;
@@ -434,7 +448,7 @@ macro_rules! impl_stream_sink {
let send_metadata = self.base.send_metadata;
let res = self.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(&status, send_metadata, &None, 0)
+ .start_send_status_from_server(&status, send_metadata, &mut None, 0)
});
let (fail_f, err) = match res {
@@ -444,8 +458,8 @@ macro_rules! impl_stream_sink {
$ft {
call: self.call.take().unwrap(),
- fail_f: fail_f,
- err: err,
+ fail_f,
+ err,
}
}
}
@@ -487,7 +501,8 @@ macro_rules! impl_stream_sink {
if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
return Poll::Ready(Err(Error::RemoteStopped));
}
- Pin::new(&mut self.base).poll_ready(cx)
+ let t = &mut *self;
+ Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap())
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
@@ -499,7 +514,7 @@ macro_rules! impl_stream_sink {
let status = &t.status;
let flush_f = t.call.as_mut().unwrap().call(|c| {
c.call
- .start_send_status_from_server(status, send_metadata, &None, 0)
+ .start_send_status_from_server(status, send_metadata, &mut None, 0)
})?;
t.flush_f = Some(flush_f);
}