aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxConvert.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt31
1 files changed, 31 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index a9d423a9..36e8dd95 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -5,10 +5,12 @@
package kotlinx.coroutines.rx2
import io.reactivex.*
+import io.reactivex.disposables.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
+import java.util.concurrent.atomic.*
import kotlin.coroutines.*
/**
@@ -78,6 +80,35 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
}
/**
+ * Transforms given cold [ObservableSource] into cold [Flow].
+ *
+ * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
+ * is applied to the resulting flow.
+ *
+ * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
+ * resulting flow to specify a user-defined value and to control what happens when data is produced faster
+ * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
+ */
+@ExperimentalCoroutinesApi
+public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
+ val disposableRef = AtomicReference<Disposable>()
+ val observer = object : Observer<T> {
+ override fun onComplete() { close() }
+ override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
+ override fun onNext(t: T) { sendBlocking(t) }
+ override fun onError(e: Throwable) { close(e) }
+ }
+
+ subscribe(observer)
+ awaitClose { disposableRef.getAndSet(Disposed)?.dispose() }
+}
+
+private object Disposed : Disposable {
+ override fun isDisposed() = true
+ override fun dispose() = Unit
+}
+
+/**
* Converts the given flow to a cold observable.
* The original flow is cancelled when the observable subscriber is disposed.
*/