diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxConvert.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index a9d423a9..36e8dd95 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -5,10 +5,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.disposables.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* +import java.util.concurrent.atomic.* import kotlin.coroutines.* /** @@ -78,6 +80,35 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): } /** + * Transforms given cold [ObservableSource] into cold [Flow]. + * + * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator + * is applied to the resulting flow. + * + * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the + * resulting flow to specify a user-defined value and to control what happens when data is produced faster + * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. + */ +@ExperimentalCoroutinesApi +public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { + val disposableRef = AtomicReference<Disposable>() + val observer = object : Observer<T> { + override fun onComplete() { close() } + override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } + override fun onNext(t: T) { sendBlocking(t) } + override fun onError(e: Throwable) { close(e) } + } + + subscribe(observer) + awaitClose { disposableRef.getAndSet(Disposed)?.dispose() } +} + +private object Disposed : Disposable { + override fun isDisposed() = true + override fun dispose() = Unit +} + +/** * Converts the given flow to a cold observable. * The original flow is cancelled when the observable subscriber is disposed. */ |