aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxSingle.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxSingle.kt26
1 files changed, 9 insertions, 17 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
index f3573ee6..07088909 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
@@ -12,15 +12,9 @@ import kotlin.coroutines.*
import kotlin.internal.*
/**
- * Creates cold [single][Single] that will run a given [block] in a coroutine.
+ * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
* Every time the returned observable is subscribed, it starts a new coroutine.
- * Coroutine returns a single value. Unsubscribing cancels running coroutine.
- *
- * | **Coroutine action** | **Signal to subscriber**
- * | ------------------------------------- | ------------------------
- * | Returns a value | `onSuccess`
- * | Failure with exception or unsubscribe | `onError`
- *
+ * Unsubscribing cancels running coroutine.
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -62,21 +56,19 @@ private class RxSingleCoroutine<T: Any>(
) : AbstractCoroutine<T>(parentContext, true) {
override fun onCompleted(value: T) {
try {
- if (!subscriber.isDisposed) subscriber.onSuccess(value)
+ subscriber.onSuccess(value)
} catch (e: Throwable) {
- handleCoroutineException(context, e)
+ handleUndeliverableException(e, context)
}
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
- if (!subscriber.isDisposed) {
- try {
- subscriber.onError(cause)
- } catch (e: Throwable) {
- handleCoroutineException(context, e)
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
}
- } else if (!handled) {
- handleCoroutineException(context, cause)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
}
}
}