|
@@ -32,7 +32,6 @@ import kotlin.coroutines.resumeWithException
|
|
|
* Util functions for bridging RxJava and coroutines. Taken from TachiyomiEH/SY.
|
|
|
*/
|
|
|
|
|
|
-@ExperimentalCoroutinesApi
|
|
|
suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
|
|
|
return suspendCancellableCoroutine { continuation ->
|
|
|
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
|
@@ -59,7 +58,6 @@ suspend fun <T> Single<T>.await(subscribeOn: Scheduler? = null): T {
|
|
|
suspend fun <T> PreparedOperation<T>.await(): T = asRxSingle().await()
|
|
|
suspend fun <T> PreparedGetObject<T>.await(): T? = asRxSingle().await()
|
|
|
|
|
|
-@ExperimentalCoroutinesApi
|
|
|
suspend fun Completable.awaitSuspending(subscribeOn: Scheduler? = null) {
|
|
|
return suspendCancellableCoroutine { continuation ->
|
|
|
val self = if (subscribeOn != null) subscribeOn(subscribeOn) else this
|
|
@@ -183,7 +181,6 @@ private suspend fun <T> Observable<T>.awaitOne(): T = suspendCancellableCoroutin
|
|
|
internal fun <T> CancellableContinuation<T>.unsubscribeOnCancellation(sub: Subscription) =
|
|
|
invokeOnCancellation { sub.unsubscribe() }
|
|
|
|
|
|
-@ExperimentalCoroutinesApi
|
|
|
fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
|
|
|
val observer = object : Observer<T> {
|
|
|
override fun onNext(t: T) {
|
|
@@ -202,7 +199,6 @@ fun <T : Any> Observable<T>.asFlow(): Flow<T> = callbackFlow {
|
|
|
awaitClose { subscription.unsubscribe() }
|
|
|
}
|
|
|
|
|
|
-@ExperimentalCoroutinesApi
|
|
|
fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE): Observable<T> {
|
|
|
return Observable.create(
|
|
|
{ emitter ->
|