aboutsummaryrefslogtreecommitdiff
path: root/src/task/promise.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/task/promise.rs')
-rw-r--r--src/task/promise.rs52
1 files changed, 46 insertions, 6 deletions
diff --git a/src/task/promise.rs b/src/task/promise.rs
index 2d826d4..e9b3646 100644
--- a/src/task/promise.rs
+++ b/src/task/promise.rs
@@ -6,6 +6,7 @@ use std::sync::Arc;
use super::Inner;
use crate::call::{BatchContext, MessageReader, RpcStatusCode};
use crate::error::Error;
+use crate::metadata::UnownedMetadata;
/// Batch job type.
#[derive(PartialEq, Debug)]
@@ -18,15 +19,46 @@ pub enum BatchType {
CheckRead,
}
+/// A promise result which stores a message reader with bundled metadata.
+pub struct BatchResult {
+ pub message_reader: Option<MessageReader>,
+ pub initial_metadata: UnownedMetadata,
+ pub trailing_metadata: UnownedMetadata,
+}
+
+impl BatchResult {
+ pub fn new(
+ message_reader: Option<MessageReader>,
+ initial_metadata: Option<UnownedMetadata>,
+ trailing_metadata: Option<UnownedMetadata>,
+ ) -> BatchResult {
+ let initial_metadata = if let Some(m) = initial_metadata {
+ m
+ } else {
+ UnownedMetadata::empty()
+ };
+ let trailing_metadata = if let Some(m) = trailing_metadata {
+ m
+ } else {
+ UnownedMetadata::empty()
+ };
+ BatchResult {
+ message_reader,
+ initial_metadata,
+ trailing_metadata,
+ }
+ }
+}
+
/// A promise used to resolve batch jobs.
pub struct Batch {
ty: BatchType,
ctx: BatchContext,
- inner: Arc<Inner<Option<MessageReader>>>,
+ inner: Arc<Inner<BatchResult>>,
}
impl Batch {
- pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
+ pub fn new(ty: BatchType, inner: Arc<Inner<BatchResult>>) -> Batch {
Batch {
ty,
ctx: BatchContext::new(),
@@ -42,11 +74,11 @@ impl Batch {
let task = {
let mut guard = self.inner.lock();
if success {
- guard.set_result(Ok(self.ctx.recv_message()))
+ guard.set_result(Ok(BatchResult::new(self.ctx.recv_message(), None, None)))
} else {
// rely on C core to handle the failed read (e.g. deliver approriate
// statusCode on the clientside).
- guard.set_result(Ok(None))
+ guard.set_result(Ok(BatchResult::new(None, None, None)))
}
};
task.map(|t| t.wake());
@@ -58,7 +90,11 @@ impl Batch {
if succeed {
let status = self.ctx.rpc_status();
if status.code() == RpcStatusCode::OK {
- guard.set_result(Ok(None))
+ guard.set_result(Ok(BatchResult::new(
+ None,
+ Some(self.ctx.take_initial_metadata()),
+ Some(self.ctx.take_trailing_metadata()),
+ )))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
}
@@ -74,7 +110,11 @@ impl Batch {
let mut guard = self.inner.lock();
let status = self.ctx.rpc_status();
if status.code() == RpcStatusCode::OK {
- guard.set_result(Ok(self.ctx.recv_message()))
+ guard.set_result(Ok(BatchResult::new(
+ self.ctx.recv_message(),
+ Some(self.ctx.take_initial_metadata()),
+ Some(self.ctx.take_trailing_metadata()),
+ )))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
}