aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
blob: 5d546dffd3d6e437fedd5587ab2e9372aa1b7799 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.collect
import org.reactivestreams.*
import kotlin.coroutines.*
import java.util.concurrent.Flow as JFlow

/**
 * Transforms the given reactive [Publisher] into [Flow].
 * Use [buffer] operator on the resulting flow to specify the size of the backpressure.
 * More precisely, it specifies the value of the subscription's [request][Subscription.request].
 * [buffer] default capacity is used by default.
 *
 * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
 * are discarded.
 */
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
        FlowAdapters.toPublisher(this).asFlow()

/**
 * Transforms the given flow to a reactive specification compliant [Publisher].
 *
 * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
 * is used, so calls are performed from an arbitrary thread.
 */
@JvmOverloads // binary compatibility
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
    val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
    return FlowAdapters.toFlowPublisher(reactivePublisher)
}

/**
 * Subscribes to this [Publisher] and performs the specified action for each received element.
 * Cancels subscription if any exception happens during collect.
 */
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
    FlowAdapters.toPublisher(this).collect(action)