aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
blob: c73ded9eeafc61b8841915e3c5722e4fb79271e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
/*
 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
 * Catches exceptions in the flow completion and calls a specified [action] with
 * the caught exception. This operator is *transparent* to exceptions that occur
 * in downstream flow and does not catch exceptions that are thrown to cancel the flow.
 *
 * For example:
 *
 * ```
 * flow { emitData() }
 *     .map { computeOne(it) }
 *     .catch { ... } // catches exceptions in emitData and computeOne
 *     .map { computeTwo(it) }
 *     .collect { process(it) } // throws exceptions from process and computeTwo
 * ```
 *
 * Conceptually, the action of `catch` operator is similar to wrapping the code of upstream flows with
 * `try { ... } catch (e: Throwable) { action(e) }`.
 *
 * Any exception in the [action] code itself proceeds downstream where it can be
 * caught by further `catch` operators if needed. If a particular exception does not need to be
 * caught it can be rethrown from the action of `catch` operator. For example:
 *
 * ```
 * flow.catch { e ->
 *     if (e !is IOException) throw e // rethrow all but IOException
 *     // e is IOException here
 *     ...
 * }
 * ```
 *
 * The [action] code has [FlowCollector] as a receiver and can [emit][FlowCollector.emit] values downstream.
 * For example, caught exception can be replaced with some wrapper value for errors:
 *
 * ```
 * flow.catch { e -> emit(ErrorWrapperValue(e)) }
 * ```
 *
 * The [action] can also use [emitAll] to fallback on some other flow in case of an error. However, to
 * retry an original flow use [retryWhen] operator that can retry the flow multiple times without
 * introducing ever-growing stack of suspending calls.
 */
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
    flow {
        val exception = catchImpl(this)
        if (exception != null) action(exception)
    }

/**
 * @suppress **Deprecated**: Use `(Throwable) -> Boolean` functional type
 */
@Deprecated(
    level = DeprecationLevel.ERROR,
    message = "Use (Throwable) -> Boolean functional type",
    replaceWith = ReplaceWith("(Throwable) -> Boolean")
)
public typealias ExceptionPredicate = (Throwable) -> Boolean

/**
 * Switches to the [fallback] flow if the original flow throws an exception that matches the [predicate].
 * Cancellation exceptions that were caused by the direct [cancel] call are not handled by this operator.
 *
 * @suppress **Deprecated**: Use `catch { e -> if (predicate(e)) emitAll(fallback) else throw e }`
 */
@Deprecated(
    level = DeprecationLevel.ERROR,
    message = "Use catch { e -> if (predicate(e)) emitAll(fallback) else throw e }",
    replaceWith = ReplaceWith("catch { e -> if (predicate(e)) emitAll(fallback) else throw e }")
)
public fun <T> Flow<T>.onErrorCollect(
    fallback: Flow<T>,
    predicate: (Throwable) -> Boolean = { true }
): Flow<T> = catch { e ->
    if (!predicate(e)) throw e
    emitAll(fallback)
}

/**
 * Retries collection of the given flow up to [retries] times when an exception that matches the
 * given [predicate] occurs in the upstream flow. This operator is *transparent* to exceptions that occur
 * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
 *
 * See [catch] for details on how exceptions are caught in flows.
 *
 * The default value of [retries] parameter is [Long.MAX_VALUE]. This value effectively means to retry forever.
 * This operator is a shorthand for the following code (see [retryWhen]). Note that `attempt` is checked first
 * and [predicate] is not called when it reaches the given number of [retries]:
 *
 * ```
 * retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
 * ```
 *
 * The [predicate] parameter is always true by default. The [predicate] is a suspending function,
 * so it can be also used to introduce delay before retry, for example:
 *
 * ```
 * flow.retry(3) { e ->
 *     // retry on any IOException but also introduce delay if retrying
 *     (e is IOException).also { if (it) delay(1000) }
 * }
 * ```
 *
 * @throws IllegalArgumentException when [retries] is not positive.
 */
public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

@FlowPreview
@Deprecated(level = DeprecationLevel.HIDDEN, message = "binary compatibility with retries: Int preview version")
public fun <T> Flow<T>.retry(
    retries: Int = Int.MAX_VALUE,
    predicate: (Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> predicate(cause) && attempt < retries }
}

/**
 * Retries collection of the given flow when an exception occurs in the upstream flow and the
 * [predicate] returns true. The predicate also receives an `attempt` number as parameter,
 * starting from zero on the initial call. This operator is *transparent* to exceptions that occur
 * in downstream flow and does not retry on exceptions that are thrown to cancel the flow.
 *
 * For example, the following call retries the flow forever if the error is caused by `IOException`, but
 * stops after 3 retries on any other exception:
 *
 * ```
 * flow.retryWhen { cause, attempt -> cause is IOException || attempt < 3 }
 * ```
 *
 * To implement a simple retry logic with a limit on the number of retries use [retry] operator.
 *
 * Similarly to [catch] operator, the [predicate] code has [FlowCollector] as a receiver and can
 * [emit][FlowCollector.emit] values downstream.
 * The [predicate] is a suspending function, so it can be used to introduce delay before retry, for example:
 *
 * ```
 * flow.retryWhen { cause, attempt ->
 *     if (cause is IOException) {    // retry on IOException
 *         emit(RetryWrapperValue(e))
 *         delay(1000)                // delay for one second before retry
 *         true
 *     } else {                       // do not retry otherwise
 *         false
 *     }
 * }
 * ```
 *
 * See [catch] for more details.
 */
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
    flow {
        var attempt = 0L
        var shallRetry: Boolean
        do {
            shallRetry = false
            val cause = catchImpl(this)
            if (cause != null) {
                if (predicate(cause, attempt)) {
                    shallRetry = true
                    attempt++
                } else {
                    throw cause
                }
            }
        } while (shallRetry)
    }

// Return exception from upstream or null
internal suspend fun <T> Flow<T>.catchImpl(
    collector: FlowCollector<T>
): Throwable? {
    var fromDownstream: Throwable? = null
    try {
        collect {
            try {
                collector.emit(it)
            } catch (e: Throwable) {
                fromDownstream = e
                throw e
            }
        }
    } catch (e: Throwable) {
        /*
         * First check ensures that we catch an original exception, not one rethrown by an operator.
         * Seconds check ignores cancellation causes, they cannot be caught.
         */
        if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
            throw e // Rethrow exceptions from downstream and cancellation causes
        } else {
            return e // not from downstream
        }
    }
    return null
}

private fun Throwable.isCancellationCause(coroutineContext: CoroutineContext): Boolean {
    val job = coroutineContext[Job]
    if (job == null || !job.isCancelled) return false
    return isSameExceptionAs(job.getCancellationException())
}

private fun Throwable.isSameExceptionAs(other: Throwable?): Boolean =
    other != null && unwrap(other) == unwrap(this)