diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxSingle.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxSingle.kt | 26 |
1 files changed, 9 insertions, 17 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index f3573ee6..07088909 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -12,15 +12,9 @@ import kotlin.coroutines.* import kotlin.internal.* /** - * Creates cold [single][Single] that will run a given [block] in a coroutine. + * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result. * Every time the returned observable is subscribed, it starts a new coroutine. - * Coroutine returns a single value. Unsubscribing cancels running coroutine. - * - * | **Coroutine action** | **Signal to subscriber** - * | ------------------------------------- | ------------------------ - * | Returns a value | `onSuccess` - * | Failure with exception or unsubscribe | `onError` - * + * Unsubscribing cancels running coroutine. * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -62,21 +56,19 @@ private class RxSingleCoroutine<T: Any>( ) : AbstractCoroutine<T>(parentContext, true) { override fun onCompleted(value: T) { try { - if (!subscriber.isDisposed) subscriber.onSuccess(value) + subscriber.onSuccess(value) } catch (e: Throwable) { - handleCoroutineException(context, e) + handleUndeliverableException(e, context) } } override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!subscriber.isDisposed) { - try { - subscriber.onError(cause) - } catch (e: Throwable) { - handleCoroutineException(context, e) + try { + if (!subscriber.tryOnError(cause)) { + handleUndeliverableException(cause, context) } - } else if (!handled) { - handleCoroutineException(context, cause) + } catch (e: Throwable) { + handleUndeliverableException(e, context) } } } |