aboutsummaryrefslogtreecommitdiff
path: root/src/task/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/task/mod.rs')
-rw-r--r--src/task/mod.rs233
1 files changed, 233 insertions, 0 deletions
diff --git a/src/task/mod.rs b/src/task/mod.rs
new file mode 100644
index 0000000..f151d0e
--- /dev/null
+++ b/src/task/mod.rs
@@ -0,0 +1,233 @@
+// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
+
+mod callback;
+mod executor;
+mod promise;
+
+use std::fmt::{self, Debug, Formatter};
+use std::pin::Pin;
+use std::sync::Arc;
+
+use futures::future::Future;
+use futures::task::{Context, Poll, Waker};
+use parking_lot::Mutex;
+
+use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
+use self::executor::SpawnTask;
+use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
+use crate::call::server::RequestContext;
+use crate::call::{BatchContext, Call, MessageReader};
+use crate::cq::CompletionQueue;
+use crate::error::{Error, Result};
+use crate::server::RequestCallContext;
+
+pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
+pub use self::promise::BatchType;
+
+/// A handle that is used to notify future that the task finishes.
+pub struct NotifyHandle<T> {
+ result: Option<Result<T>>,
+ waker: Option<Waker>,
+ stale: bool,
+}
+
+impl<T> NotifyHandle<T> {
+ fn new() -> NotifyHandle<T> {
+ NotifyHandle {
+ result: None,
+ waker: None,
+ stale: false,
+ }
+ }
+
+ /// Set the result and notify future if necessary.
+ fn set_result(&mut self, res: Result<T>) -> Option<Waker> {
+ self.result = Some(res);
+
+ self.waker.take()
+ }
+}
+
+type Inner<T> = Mutex<NotifyHandle<T>>;
+
+fn new_inner<T>() -> Arc<Inner<T>> {
+ Arc::new(Mutex::new(NotifyHandle::new()))
+}
+
+/// Get the future status without the need to poll.
+///
+/// If the future is polled successfully, this function will return None.
+/// Not implemented as method as it's only for internal usage.
+pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
+ let guard = f.inner.lock();
+ match guard.result {
+ None => Ok(()),
+ Some(Err(Error::RpcFailure(ref status))) => {
+ Err(Error::RpcFinished(Some(status.to_owned())))
+ }
+ Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
+ }
+}
+
+/// A future object for task that is scheduled to `CompletionQueue`.
+pub struct CqFuture<T> {
+ inner: Arc<Inner<T>>,
+}
+
+impl<T> CqFuture<T> {
+ fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
+ CqFuture { inner }
+ }
+}
+
+impl<T> Future for CqFuture<T> {
+ type Output = Result<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let mut guard = self.inner.lock();
+ if guard.stale {
+ panic!("Resolved future is not supposed to be polled again.");
+ }
+
+ if let Some(res) = guard.result.take() {
+ guard.stale = true;
+ return Poll::Ready(res);
+ }
+
+ // So the task has not been finished yet, add notification hook.
+ if guard.waker.is_none() || !guard.waker.as_ref().unwrap().will_wake(cx.waker()) {
+ guard.waker = Some(cx.waker().clone());
+ }
+
+ Poll::Pending
+ }
+}
+
+/// Future object for batch jobs.
+pub type BatchFuture = CqFuture<Option<MessageReader>>;
+
+/// A result holder for asynchronous execution.
+// This enum is going to be passed to FFI, so don't use trait or generic here.
+pub enum CallTag {
+ Batch(BatchPromise),
+ Request(RequestCallback),
+ UnaryRequest(UnaryRequestCallback),
+ Abort(Abort),
+ Shutdown(ShutdownPromise),
+ Spawn(Arc<SpawnTask>),
+}
+
+impl CallTag {
+ /// Generate a Future/CallTag pair for batch jobs.
+ pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
+ let inner = new_inner();
+ let batch = BatchPromise::new(ty, inner.clone());
+ (CqFuture::new(inner), CallTag::Batch(batch))
+ }
+
+ /// Generate a CallTag for request job. We don't have an eventloop
+ /// to pull the future, so just the tag is enough.
+ pub fn request(ctx: RequestCallContext) -> CallTag {
+ CallTag::Request(RequestCallback::new(ctx))
+ }
+
+ /// Generate a Future/CallTag pair for shutdown call.
+ pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
+ let inner = new_inner();
+ let shutdown = ShutdownPromise::new(inner.clone());
+ (CqFuture::new(inner), CallTag::Shutdown(shutdown))
+ }
+
+ /// Generate a CallTag for abort call before handler is called.
+ pub fn abort(call: Call) -> CallTag {
+ CallTag::Abort(Abort::new(call))
+ }
+
+ /// Generate a CallTag for unary request job.
+ pub fn unary_request(ctx: RequestContext, rc: RequestCallContext) -> CallTag {
+ let cb = UnaryRequestCallback::new(ctx, rc);
+ CallTag::UnaryRequest(cb)
+ }
+
+ /// Get the batch context from result holder.
+ pub fn batch_ctx(&self) -> Option<&BatchContext> {
+ match *self {
+ CallTag::Batch(ref prom) => Some(prom.context()),
+ CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
+ CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
+ _ => None,
+ }
+ }
+
+ /// Get the request context from the result holder.
+ pub fn request_ctx(&self) -> Option<&RequestContext> {
+ match *self {
+ CallTag::Request(ref prom) => Some(prom.context()),
+ CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
+ _ => None,
+ }
+ }
+
+ /// Resolve the CallTag with given status.
+ pub fn resolve(self, cq: &CompletionQueue, success: bool) {
+ match self {
+ CallTag::Batch(prom) => prom.resolve(success),
+ CallTag::Request(cb) => cb.resolve(cq, success),
+ CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
+ CallTag::Abort(_) => {}
+ CallTag::Shutdown(prom) => prom.resolve(success),
+ CallTag::Spawn(notify) => self::executor::resolve(notify, success),
+ }
+ }
+}
+
+impl Debug for CallTag {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ match *self {
+ CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx),
+ CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
+ CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
+ CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
+ CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
+ CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::mpsc::*;
+ use std::sync::*;
+ use std::thread;
+
+ use super::*;
+ use crate::env::Environment;
+ use futures::executor::block_on;
+
+ #[test]
+ fn test_resolve() {
+ let env = Environment::new(1);
+
+ let (cq_f1, tag1) = CallTag::shutdown_pair();
+ let (cq_f2, tag2) = CallTag::shutdown_pair();
+ let (tx, rx) = mpsc::channel();
+
+ let handler = thread::spawn(move || {
+ tx.send(block_on(cq_f1)).unwrap();
+ tx.send(block_on(cq_f2)).unwrap();
+ });
+
+ assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+ tag1.resolve(&env.pick_cq(), true);
+ assert!(rx.recv().unwrap().is_ok());
+
+ assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
+ tag2.resolve(&env.pick_cq(), false);
+ match rx.recv() {
+ Ok(Err(Error::ShutdownFailed)) => {}
+ res => panic!("expect shutdown failed, but got {:?}", res),
+ }
+
+ handler.join().unwrap();
+ }
+}