// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use super::Inner; use crate::call::{BatchContext, MessageReader, RpcStatusCode}; use crate::error::Error; /// Batch job type. #[derive(PartialEq, Debug)] pub enum BatchType { /// Finish without reading any message. Finish, /// Extract one message when finish. Read, /// Check the rpc code and then extract one message. CheckRead, } /// A promise used to resolve batch jobs. pub struct Batch { ty: BatchType, ctx: BatchContext, inner: Arc>>, } impl Batch { pub fn new(ty: BatchType, inner: Arc>>) -> Batch { Batch { ty, ctx: BatchContext::new(), inner, } } pub fn context(&self) -> &BatchContext { &self.ctx } fn read_one_msg(&mut self, success: bool) { let task = { let mut guard = self.inner.lock(); if success { guard.set_result(Ok(self.ctx.recv_message())) } else { // rely on C core to handle the failed read (e.g. deliver approriate // statusCode on the clientside). guard.set_result(Ok(None)) } }; task.map(|t| t.wake()); } fn finish_response(&mut self, succeed: bool) { let task = { let mut guard = self.inner.lock(); if succeed { let status = self.ctx.rpc_status(); if status.code() == RpcStatusCode::OK { guard.set_result(Ok(None)) } else { guard.set_result(Err(Error::RpcFailure(status))) } } else { guard.set_result(Err(Error::RemoteStopped)) } }; task.map(|t| t.wake()); } fn handle_unary_response(&mut self) { let task = { let mut guard = self.inner.lock(); let status = self.ctx.rpc_status(); if status.code() == RpcStatusCode::OK { guard.set_result(Ok(self.ctx.recv_message())) } else { guard.set_result(Err(Error::RpcFailure(status))) } }; task.map(|t| t.wake()); } pub fn resolve(mut self, success: bool) { match self.ty { BatchType::CheckRead => { assert!(success); self.handle_unary_response(); } BatchType::Finish => { self.finish_response(success); } BatchType::Read => { self.read_one_msg(success); } } } } impl Debug for Batch { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "Batch [{:?}]", self.ty) } } /// A promise used to resolve async action status. /// /// The action can only succeed or fail without extra error hint. pub struct Action { inner: Arc>, } impl Action { pub fn new(inner: Arc>) -> Action { Action { inner } } pub fn resolve(self, success: bool) { let task = { let mut guard = self.inner.lock(); guard.set_result(Ok(success)) }; task.map(|t| t.wake()); } }