diff options
authorMarek Langiewicz <marek.langiewicz@gmail.com>2020-02-23 10:12:46 +0100
committerVsevolod Tolstopyatov <qwwdfsad@gmail.com>2020-03-04 17:44:30 +0300
commitd831a862cb688536d564efb1e98af8e3272c31ba (patch)
parent12e96cdcc628edc2bf22755dafb658e96d1acb35 (diff)
Add ObservableSource.asFlow operator (#1768)
4 files changed, 223 insertions, 4 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index 7fbad953..f0fbeb00 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -14,10 +14,11 @@ Coroutine builders:
Integration with [Flow]:
-| **Name** | **Result** | **Description**
-| --------------- | -------------- | ---------------
-| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
-| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
+| **Name** | **Result** | **Description**
+| --------------- | -------------- | ---------------
+| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable.
+| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable.
+| [ObservableSource.asFlow] | `Flow` | Converts the given cold ObservableSource to flow
Suspending extension functions and suspending iteration:
@@ -67,6 +68,7 @@ Conversion functions:
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html
[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html
+[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html
diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
index 54a9663a..22f40384 100644
--- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
+++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
@@ -29,6 +29,7 @@ public final class kotlinx/coroutines/rx2/RxCompletableKt {
public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asCompletable (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Completable;
+ public static final fun asFlow (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/flow/Flow;
public static final fun asMaybe (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Maybe;
public static final fun asObservable (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index a9d423a9..36e8dd95 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -5,10 +5,12 @@
package kotlinx.coroutines.rx2
import io.reactivex.*
+import io.reactivex.disposables.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
+import java.util.concurrent.atomic.*
import kotlin.coroutines.*
@@ -78,6 +80,35 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
+ * Transforms given cold [ObservableSource] into cold [Flow].
+ *
+ * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
+ * is applied to the resulting flow.
+ *
+ * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
+ * resulting flow to specify a user-defined value and to control what happens when data is produced faster
+ * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
+ */
+public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
+ val disposableRef = AtomicReference<Disposable>()
+ val observer = object : Observer<T> {
+ override fun onComplete() { close() }
+ override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
+ override fun onNext(t: T) { sendBlocking(t) }
+ override fun onError(e: Throwable) { close(e) }
+ }
+ subscribe(observer)
+ awaitClose { disposableRef.getAndSet(Disposed)?.dispose() }
+private object Disposed : Disposable {
+ override fun isDisposed() = true
+ override fun dispose() = Unit
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt
new file mode 100644
index 00000000..c14c3cc4
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt
@@ -0,0 +1,185 @@
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+package kotlinx.coroutines.rx2
+import io.reactivex.Observable
+import io.reactivex.ObservableSource
+import io.reactivex.Observer
+import io.reactivex.disposables.Disposables
+import io.reactivex.subjects.PublishSubject
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlin.test.*
+class ObservableAsFlowTest : TestBase() {
+ @Test
+ fun testCancellation() = runTest {
+ var onNext = 0
+ var onCancelled = 0
+ var onError = 0
+ val source = rxObservable(currentDispatcher()) {
+ coroutineContext[Job]?.invokeOnCompletion {
+ if (it is CancellationException) ++onCancelled
+ }
+ repeat(100) {
+ send(it)
+ }
+ }
+ source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
+ onEach {
+ ++onNext
+ throw RuntimeException()
+ }
+ catch<Throwable> {
+ ++onError
+ }
+ }.join()
+ assertEquals(1, onNext)
+ assertEquals(1, onError)
+ assertEquals(1, onCancelled)
+ }
+ @Test
+ fun testImmediateCollection() {
+ val source = PublishSubject.create<Int>()
+ val flow = source.asFlow()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ expect(1)
+ flow.collect { expect(it) }
+ expect(6)
+ }
+ expect(2)
+ source.onNext(3)
+ expect(4)
+ source.onNext(5)
+ source.onComplete()
+ finish(7)
+ }
+ @Test
+ fun testOnErrorCancellation() {
+ val source = PublishSubject.create<Int>()
+ val flow = source.asFlow()
+ val exception = RuntimeException()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ try {
+ expect(1)
+ flow.collect { expect(it) }
+ expectUnreached()
+ }
+ catch (e: Exception) {
+ assertSame(exception, e.cause)
+ expect(5)
+ }
+ expect(6)
+ }
+ expect(2)
+ source.onNext(3)
+ expect(4)
+ source.onError(exception)
+ finish(7)
+ }
+ @Test
+ fun testUnsubscribeOnCollectionException() {
+ val source = PublishSubject.create<Int>()
+ val flow = source.asFlow()
+ val exception = RuntimeException()
+ GlobalScope.launch(Dispatchers.Unconfined) {
+ try {
+ expect(1)
+ flow.collect {
+ expect(it)
+ if (it == 3) throw exception
+ }
+ expectUnreached()
+ }
+ catch (e: Exception) {
+ assertSame(exception, e.cause)
+ expect(4)
+ }
+ expect(5)
+ }
+ expect(2)
+ assertTrue(source.hasObservers())
+ source.onNext(3)
+ assertFalse(source.hasObservers())
+ finish(6)
+ }
+ @Test
+ fun testLateOnSubscribe() {
+ var observer: Observer<in Int>? = null
+ val source = ObservableSource<Int> { observer = it }
+ val flow = source.asFlow()
+ assertNull(observer)
+ val job = GlobalScope.launch(Dispatchers.Unconfined) {
+ expect(1)
+ flow.collect { expectUnreached() }
+ expectUnreached()
+ }
+ expect(2)
+ assertNotNull(observer)
+ job.cancel()
+ val disposable = Disposables.empty()
+ observer!!.onSubscribe(disposable)
+ assertTrue(disposable.isDisposed)
+ finish(3)
+ }
+ @Test
+ fun testBufferUnlimited() = runTest {
+ val source = rxObservable(currentDispatcher()) {
+ expect(1); send(10)
+ expect(2); send(11)
+ expect(3); send(12)
+ expect(4); send(13)
+ expect(5); send(14)
+ expect(6); send(15)
+ expect(7); send(16)
+ expect(8); send(17)
+ expect(9)
+ }
+ source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) }
+ finish(18)
+ }
+ @Test
+ fun testConflated() = runTest {
+ val source = Observable.range(1, 5)
+ val list = source.asFlow().conflate().toList()
+ assertEquals(listOf(1, 5), list)
+ }
+ @Test
+ fun testLongRange() = runTest {
+ val source = Observable.range(1, 10_000)
+ val count = source.asFlow().count()
+ assertEquals(10_000, count)
+ }
+ @Test
+ fun testProduce() = runTest {
+ val source = Observable.range(0, 10)
+ val flow = source.asFlow()
+ check((0..9).toList(), flow.produceIn(this))
+ check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
+ check((0..9).toList(), flow.buffer(2).produceIn(this))
+ check((0..9).toList(), flow.buffer(0).produceIn(this))
+ check(listOf(0, 9), flow.conflate().produceIn(this))
+ }
+ private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
+ val result = ArrayList<Int>(10)
+ channel.consumeEach { result.add(it) }
+ assertEquals(expected, result)
+ }
+} \ No newline at end of file