diff options
Diffstat (limited to 'src/task/promise.rs')
-rw-r--r-- | src/task/promise.rs | 52 |
1 files changed, 46 insertions, 6 deletions
diff --git a/src/task/promise.rs b/src/task/promise.rs index 2d826d4..e9b3646 100644 --- a/src/task/promise.rs +++ b/src/task/promise.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use super::Inner; use crate::call::{BatchContext, MessageReader, RpcStatusCode}; use crate::error::Error; +use crate::metadata::UnownedMetadata; /// Batch job type. #[derive(PartialEq, Debug)] @@ -18,15 +19,46 @@ pub enum BatchType { CheckRead, } +/// A promise result which stores a message reader with bundled metadata. +pub struct BatchResult { + pub message_reader: Option<MessageReader>, + pub initial_metadata: UnownedMetadata, + pub trailing_metadata: UnownedMetadata, +} + +impl BatchResult { + pub fn new( + message_reader: Option<MessageReader>, + initial_metadata: Option<UnownedMetadata>, + trailing_metadata: Option<UnownedMetadata>, + ) -> BatchResult { + let initial_metadata = if let Some(m) = initial_metadata { + m + } else { + UnownedMetadata::empty() + }; + let trailing_metadata = if let Some(m) = trailing_metadata { + m + } else { + UnownedMetadata::empty() + }; + BatchResult { + message_reader, + initial_metadata, + trailing_metadata, + } + } +} + /// A promise used to resolve batch jobs. pub struct Batch { ty: BatchType, ctx: BatchContext, - inner: Arc<Inner<Option<MessageReader>>>, + inner: Arc<Inner<BatchResult>>, } impl Batch { - pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch { + pub fn new(ty: BatchType, inner: Arc<Inner<BatchResult>>) -> Batch { Batch { ty, ctx: BatchContext::new(), @@ -42,11 +74,11 @@ impl Batch { let task = { let mut guard = self.inner.lock(); if success { - guard.set_result(Ok(self.ctx.recv_message())) + guard.set_result(Ok(BatchResult::new(self.ctx.recv_message(), None, None))) } else { // rely on C core to handle the failed read (e.g. deliver approriate // statusCode on the clientside). - guard.set_result(Ok(None)) + guard.set_result(Ok(BatchResult::new(None, None, None))) } }; task.map(|t| t.wake()); @@ -58,7 +90,11 @@ impl Batch { if succeed { let status = self.ctx.rpc_status(); if status.code() == RpcStatusCode::OK { - guard.set_result(Ok(None)) + guard.set_result(Ok(BatchResult::new( + None, + Some(self.ctx.take_initial_metadata()), + Some(self.ctx.take_trailing_metadata()), + ))) } else { guard.set_result(Err(Error::RpcFailure(status))) } @@ -74,7 +110,11 @@ impl Batch { let mut guard = self.inner.lock(); let status = self.ctx.rpc_status(); if status.code() == RpcStatusCode::OK { - guard.set_result(Ok(self.ctx.recv_message())) + guard.set_result(Ok(BatchResult::new( + self.ctx.recv_message(), + Some(self.ctx.take_initial_metadata()), + Some(self.ctx.take_trailing_metadata()), + ))) } else { guard.set_result(Err(Error::RpcFailure(status))) } |