|
@@ -5,17 +5,10 @@ import kotlinx.coroutines.CancellationException
|
|
|
import kotlinx.coroutines.CoroutineStart
|
|
|
import kotlinx.coroutines.Dispatchers
|
|
|
import kotlinx.coroutines.GlobalScope
|
|
|
-import kotlinx.coroutines.channels.awaitClose
|
|
|
-import kotlinx.coroutines.flow.Flow
|
|
|
-import kotlinx.coroutines.flow.callbackFlow
|
|
|
import kotlinx.coroutines.launch
|
|
|
import kotlinx.coroutines.suspendCancellableCoroutine
|
|
|
import rx.Emitter
|
|
|
import rx.Observable
|
|
|
-import rx.Observer
|
|
|
-import rx.Scheduler
|
|
|
-import rx.Single
|
|
|
-import rx.SingleSubscriber
|
|
|
import rx.Subscriber
|
|
|
import rx.Subscription
|
|
|
import kotlin.coroutines.resume
|
|
@@ -25,45 +18,6 @@ import kotlin.coroutines.resumeWithException
|
|
|
* Util functions for bridging RxJava and coroutines. Taken from TachiyomiEH/SY.
|
|
|
*/
|
|
|
|
|
|
-suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
|
|
|
- return suspendCancellableCoroutine { continuation ->
|
|
|
- val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
|
|
- lateinit var sub: Subscription
|
|
|
- sub = self.subscribe(
|
|
|
- {
|
|
|
- continuation.resume(it) {
|
|
|
- sub.unsubscribe()
|
|
|
- }
|
|
|
- },
|
|
|
- {
|
|
|
- if (!continuation.isCancelled) {
|
|
|
- continuation.resumeWithException(it)
|
|
|
- }
|
|
|
- }
|
|
|
- )
|
|
|
-
|
|
|
- continuation.invokeOnCancellation {
|
|
|
- sub.unsubscribe()
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
|
|
|
- cont.unsubscribeOnCancellation(
|
|
|
- subscribe(
|
|
|
- object : SingleSubscriber<T>() {
|
|
|
- override fun onSuccess(t: T) {
|
|
|
- cont.resume(t)
|
|
|
- }
|
|
|
-
|
|
|
- override fun onError(error: Throwable) {
|
|
|
- cont.resumeWithException(error)
|
|
|
- }
|
|
|
- }
|
|
|
- )
|
|
|
- )
|
|
|
-}
|
|
|
-
|
|
|
suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
|
|
|
|
|
|
private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutine { cont ->
|
|
@@ -105,24 +59,6 @@ private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutin
|
|
|
internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
|
|
|
invokeOnCancellation { sub.unsubscribe() }
|
|
|
|
|
|
-fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
|
|
|
- val observer = object : Observer<T> {
|
|
|
- override fun onNext(t: T) {
|
|
|
- offer(t)
|
|
|
- }
|
|
|
-
|
|
|
- override fun onError(e: Throwable) {
|
|
|
- close(e)
|
|
|
- }
|
|
|
-
|
|
|
- override fun onCompleted() {
|
|
|
- close()
|
|
|
- }
|
|
|
- }
|
|
|
- val subscription = subscribe(observer)
|
|
|
- awaitClose { subscription.unsubscribe() }
|
|
|
-}
|
|
|
-
|
|
|
fun <T> runAsObservable(
|
|
|
block: suspend () -> T,
|
|
|
backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE
|