Selaa lähdekoodia

Migrate Kitsu API to coroutines and kotlinx.serialization

arkon 4 vuotta sitten
vanhempi
commit
271de31d51

+ 0 - 2
app/build.gradle

@@ -175,8 +175,6 @@ dependencies {
     final retrofit_version = '2.9.0'
     implementation "com.squareup.retrofit2:retrofit:$retrofit_version"
     implementation "com.jakewharton.retrofit:retrofit2-kotlinx-serialization-converter:0.8.0"
-    implementation "com.squareup.retrofit2:converter-gson:$retrofit_version"
-    implementation "com.squareup.retrofit2:adapter-rxjava:$retrofit_version"
 
     // JSON
     final kotlin_serialization_version = '1.0.1'

+ 8 - 7
app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/Kitsu.kt

@@ -7,6 +7,7 @@ import eu.kanade.tachiyomi.R
 import eu.kanade.tachiyomi.data.database.models.Track
 import eu.kanade.tachiyomi.data.track.TrackService
 import eu.kanade.tachiyomi.data.track.model.TrackSearch
+import eu.kanade.tachiyomi.util.lang.runAsObservable
 import rx.Completable
 import rx.Observable
 import uy.kohesive.injekt.injectLazy
@@ -69,15 +70,15 @@ class Kitsu(private val context: Context, id: Int) : TrackService(id) {
     }
 
     override fun add(track: Track): Observable<Track> {
-        return api.addLibManga(track, getUserId())
+        return runAsObservable({ api.addLibManga(track, getUserId()) })
     }
 
     override fun update(track: Track): Observable<Track> {
-        return api.updateLibManga(track)
+        return runAsObservable({ api.updateLibManga(track) })
     }
 
     override fun bind(track: Track): Observable<Track> {
-        return api.findLibManga(track, getUserId())
+        return runAsObservable({ api.findLibManga(track, getUserId()) })
             .flatMap { remoteTrack ->
                 if (remoteTrack != null) {
                     track.copyPersonalFrom(remoteTrack)
@@ -92,11 +93,11 @@ class Kitsu(private val context: Context, id: Int) : TrackService(id) {
     }
 
     override fun search(query: String): Observable<List<TrackSearch>> {
-        return api.search(query)
+        return runAsObservable({ api.search(query) })
     }
 
     override fun refresh(track: Track): Observable<Track> {
-        return api.getLibManga(track)
+        return runAsObservable({ api.getLibManga(track) })
             .map { remoteTrack ->
                 track.copyPersonalFrom(remoteTrack)
                 track.total_chapters = remoteTrack.total_chapters
@@ -105,9 +106,9 @@ class Kitsu(private val context: Context, id: Int) : TrackService(id) {
     }
 
     override fun login(username: String, password: String): Completable {
-        return api.login(username, password)
+        return runAsObservable({ api.login(username, password) })
             .doOnNext { interceptor.newAuth(it) }
-            .flatMap { api.getCurrentUser() }
+            .flatMap { runAsObservable({ api.getCurrentUser() }) }
             .doOnNext { userId -> saveCredentials(username, userId) }
             .doOnError { logout() }
             .toCompletable()

+ 105 - 120
app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuApi.kt

@@ -1,21 +1,22 @@
 package eu.kanade.tachiyomi.data.track.kitsu
 
-import com.github.salomonbrys.kotson.array
-import com.github.salomonbrys.kotson.get
-import com.github.salomonbrys.kotson.int
-import com.github.salomonbrys.kotson.jsonObject
-import com.github.salomonbrys.kotson.obj
-import com.github.salomonbrys.kotson.string
-import com.google.gson.GsonBuilder
-import com.google.gson.JsonObject
+import com.jakewharton.retrofit2.converter.kotlinx.serialization.asConverterFactory
 import eu.kanade.tachiyomi.data.database.models.Track
 import eu.kanade.tachiyomi.data.track.model.TrackSearch
 import eu.kanade.tachiyomi.network.POST
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.json.JsonObject
+import kotlinx.serialization.json.buildJsonObject
+import kotlinx.serialization.json.int
+import kotlinx.serialization.json.jsonArray
+import kotlinx.serialization.json.jsonObject
+import kotlinx.serialization.json.jsonPrimitive
+import kotlinx.serialization.json.put
+import kotlinx.serialization.json.putJsonObject
 import okhttp3.FormBody
+import okhttp3.MediaType.Companion.toMediaType
 import okhttp3.OkHttpClient
 import retrofit2.Retrofit
-import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory
-import retrofit2.converter.gson.GsonConverterFactory
 import retrofit2.http.Body
 import retrofit2.http.Field
 import retrofit2.http.FormUrlEncoded
@@ -26,7 +27,6 @@ import retrofit2.http.PATCH
 import retrofit2.http.POST
 import retrofit2.http.Path
 import retrofit2.http.Query
-import rx.Observable
 
 class KitsuApi(private val client: OkHttpClient, interceptor: KitsuInterceptor) {
 
@@ -35,196 +35,179 @@ class KitsuApi(private val client: OkHttpClient, interceptor: KitsuInterceptor)
     private val rest = Retrofit.Builder()
         .baseUrl(baseUrl)
         .client(authClient)
-        .addConverterFactory(GsonConverterFactory.create(GsonBuilder().serializeNulls().create()))
-        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
+        .addConverterFactory(jsonConverter)
         .build()
         .create(Rest::class.java)
 
     private val searchRest = Retrofit.Builder()
         .baseUrl(algoliaKeyUrl)
         .client(authClient)
-        .addConverterFactory(GsonConverterFactory.create())
-        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
+        .addConverterFactory(jsonConverter)
         .build()
         .create(SearchKeyRest::class.java)
 
     private val algoliaRest = Retrofit.Builder()
         .baseUrl(algoliaUrl)
         .client(client)
-        .addConverterFactory(GsonConverterFactory.create())
-        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
+        .addConverterFactory(jsonConverter)
         .build()
         .create(AgoliaSearchRest::class.java)
 
-    fun addLibManga(track: Track, userId: String): Observable<Track> {
-        return Observable.defer {
-            // @formatter:off
-            val data = jsonObject(
-                "type" to "libraryEntries",
-                "attributes" to jsonObject(
-                    "status" to track.toKitsuStatus(),
-                    "progress" to track.last_chapter_read
-                ),
-                "relationships" to jsonObject(
-                    "user" to jsonObject(
-                        "data" to jsonObject(
-                            "id" to userId,
-                            "type" to "users"
-                        )
-                    ),
-                    "media" to jsonObject(
-                        "data" to jsonObject(
-                            "id" to track.media_id,
-                            "type" to "manga"
-                        )
-                    )
-                )
-            )
-
-            rest.addLibManga(jsonObject("data" to data))
-                .map { json ->
-                    track.media_id = json["data"]["id"].int
-                    track
+    suspend fun addLibManga(track: Track, userId: String): Track {
+        val data = buildJsonObject {
+            putJsonObject("data") {
+                put("type", "libraryEntries")
+                putJsonObject("attributes") {
+                    put("status", track.toKitsuStatus())
+                    put("progress", track.last_chapter_read)
                 }
+                putJsonObject("relationships") {
+                    putJsonObject("user") {
+                        putJsonObject("data") {
+                            put("id", userId)
+                            put("type", "users")
+                        }
+                    }
+                    putJsonObject("media") {
+                        putJsonObject("data") {
+                            put("id", track.media_id)
+                            put("type", "manga")
+                        }
+                    }
+                }
+            }
         }
+
+        val json = rest.addLibManga(data)
+        track.media_id = json["data"]!!.jsonObject["id"]!!.jsonPrimitive.int
+        return track
     }
 
-    fun updateLibManga(track: Track): Observable<Track> {
-        return Observable.defer {
-            // @formatter:off
-            val data = jsonObject(
-                "type" to "libraryEntries",
-                "id" to track.media_id,
-                "attributes" to jsonObject(
-                    "status" to track.toKitsuStatus(),
-                    "progress" to track.last_chapter_read,
-                    "ratingTwenty" to track.toKitsuScore()
-                )
-            )
-            // @formatter:on
-
-            rest.updateLibManga(track.media_id, jsonObject("data" to data))
-                .map { track }
+    suspend fun updateLibManga(track: Track): Track {
+        val data = buildJsonObject {
+            putJsonObject("data") {
+                put("type", "libraryEntries")
+                put("id", track.media_id)
+                putJsonObject("attributes") {
+                    put("status", track.toKitsuStatus())
+                    put("progress", track.last_chapter_read)
+                    put("ratingTwenty", track.toKitsuScore())
+                }
+            }
         }
+
+        rest.updateLibManga(track.media_id, data)
+        return track
     }
 
-    fun search(query: String): Observable<List<TrackSearch>> {
-        return searchRest
-            .getKey().map { json ->
-                json["media"].asJsonObject["key"].string
-            }.flatMap { key ->
-                algoliaSearch(key, query)
-            }
+    suspend fun search(query: String): List<TrackSearch> {
+        val json = searchRest.getKey()
+        val key = json["media"]!!.jsonObject["key"]!!.jsonPrimitive.content
+        return algoliaSearch(key, query)
     }
 
-    private fun algoliaSearch(key: String, query: String): Observable<List<TrackSearch>> {
-        val jsonObject = jsonObject("params" to "query=$query$algoliaFilter")
-        return algoliaRest
-            .getSearchQuery(algoliaAppId, key, jsonObject)
-            .map { json ->
-                val data = json["hits"].array
-                data.map { KitsuSearchManga(it.obj) }
-                    .filter { it.subType != "novel" }
-                    .map { it.toTrack() }
-            }
+    private suspend fun algoliaSearch(key: String, query: String): List<TrackSearch> {
+        val jsonObject = buildJsonObject {
+            put("params", "query=$query$algoliaFilter")
+        }
+        val json = algoliaRest.getSearchQuery(algoliaAppId, key, jsonObject)
+        val data = json["hits"]!!.jsonArray
+        return data.map { KitsuSearchManga(it.jsonObject) }
+            .filter { it.subType != "novel" }
+            .map { it.toTrack() }
     }
 
-    fun findLibManga(track: Track, userId: String): Observable<Track?> {
-        return rest.findLibManga(track.media_id, userId)
-            .map { json ->
-                val data = json["data"].array
-                if (data.size() > 0) {
-                    val manga = json["included"].array[0].obj
-                    KitsuLibManga(data[0].obj, manga).toTrack()
-                } else {
-                    null
-                }
-            }
+    suspend fun findLibManga(track: Track, userId: String): Track? {
+        val json = rest.findLibManga(track.media_id, userId)
+        val data = json["data"]!!.jsonArray
+        return if (data.size > 0) {
+            val manga = json["included"]!!.jsonArray[0].jsonObject
+            KitsuLibManga(data[0].jsonObject, manga).toTrack()
+        } else {
+            null
+        }
     }
 
-    fun getLibManga(track: Track): Observable<Track> {
-        return rest.getLibManga(track.media_id)
-            .map { json ->
-                val data = json["data"].array
-                if (data.size() > 0) {
-                    val manga = json["included"].array[0].obj
-                    KitsuLibManga(data[0].obj, manga).toTrack()
-                } else {
-                    throw Exception("Could not find manga")
-                }
-            }
+    suspend fun getLibManga(track: Track): Track {
+        val json = rest.getLibManga(track.media_id)
+        val data = json["data"]!!.jsonArray
+        return if (data.size > 0) {
+            val manga = json["included"]!!.jsonArray[0].jsonObject
+            KitsuLibManga(data[0].jsonObject, manga).toTrack()
+        } else {
+            throw Exception("Could not find manga")
+        }
     }
 
-    fun login(username: String, password: String): Observable<OAuth> {
+    suspend fun login(username: String, password: String): OAuth {
         return Retrofit.Builder()
             .baseUrl(loginUrl)
             .client(client)
-            .addConverterFactory(GsonConverterFactory.create())
-            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
+            .addConverterFactory(jsonConverter)
             .build()
             .create(LoginRest::class.java)
             .requestAccessToken(username, password)
     }
 
-    fun getCurrentUser(): Observable<String> {
-        return rest.getCurrentUser().map { it["data"].array[0]["id"].string }
+    suspend fun getCurrentUser(): String {
+        return rest.getCurrentUser()["data"]!!.jsonArray[0].jsonObject["id"]!!.jsonPrimitive.content
     }
 
     private interface Rest {
 
         @Headers("Content-Type: application/vnd.api+json")
         @POST("library-entries")
-        fun addLibManga(
+        suspend fun addLibManga(
             @Body data: JsonObject
-        ): Observable<JsonObject>
+        ): JsonObject
 
         @Headers("Content-Type: application/vnd.api+json")
         @PATCH("library-entries/{id}")
-        fun updateLibManga(
+        suspend fun updateLibManga(
             @Path("id") remoteId: Int,
             @Body data: JsonObject
-        ): Observable<JsonObject>
+        ): JsonObject
 
         @GET("library-entries")
-        fun findLibManga(
+        suspend fun findLibManga(
             @Query("filter[manga_id]", encoded = true) remoteId: Int,
             @Query("filter[user_id]", encoded = true) userId: String,
             @Query("include") includes: String = "manga"
-        ): Observable<JsonObject>
+        ): JsonObject
 
         @GET("library-entries")
-        fun getLibManga(
+        suspend fun getLibManga(
             @Query("filter[id]", encoded = true) remoteId: Int,
             @Query("include") includes: String = "manga"
-        ): Observable<JsonObject>
+        ): JsonObject
 
         @GET("users")
-        fun getCurrentUser(
+        suspend fun getCurrentUser(
             @Query("filter[self]", encoded = true) self: Boolean = true
-        ): Observable<JsonObject>
+        ): JsonObject
     }
 
     private interface SearchKeyRest {
         @GET("media/")
-        fun getKey(): Observable<JsonObject>
+        suspend fun getKey(): JsonObject
     }
 
     private interface AgoliaSearchRest {
         @POST("query/")
-        fun getSearchQuery(@Header("X-Algolia-Application-Id") appid: String, @Header("X-Algolia-API-Key") key: String, @Body json: JsonObject): Observable<JsonObject>
+        suspend fun getSearchQuery(@Header("X-Algolia-Application-Id") appid: String, @Header("X-Algolia-API-Key") key: String, @Body json: JsonObject): JsonObject
     }
 
     private interface LoginRest {
 
         @FormUrlEncoded
         @POST("oauth/token")
-        fun requestAccessToken(
+        suspend fun requestAccessToken(
             @Field("username") username: String,
             @Field("password") password: String,
             @Field("grant_type") grantType: String = "password",
             @Field("client_id") client_id: String = clientId,
             @Field("client_secret") client_secret: String = clientSecret
-        ): Observable<OAuth>
+        ): OAuth
     }
 
     companion object {
@@ -238,6 +221,8 @@ class KitsuApi(private val client: OkHttpClient, interceptor: KitsuInterceptor)
         private const val algoliaAppId = "AWQO5J657S"
         private const val algoliaFilter = "&facetFilters=%5B%22kind%3Amanga%22%5D&attributesToRetrieve=%5B%22synopsis%22%2C%22canonicalTitle%22%2C%22chapterCount%22%2C%22posterImage%22%2C%22startDate%22%2C%22subtype%22%2C%22endDate%22%2C%20%22id%22%5D"
 
+        private val jsonConverter = Json { ignoreUnknownKeys = true }.asConverterFactory("application/json".toMediaType())
+
         fun mangaUrl(remoteId: Int): String {
             return baseMangaUrl + remoteId
         }

+ 25 - 26
app/src/main/java/eu/kanade/tachiyomi/data/track/kitsu/KitsuModels.kt

@@ -1,32 +1,31 @@
 package eu.kanade.tachiyomi.data.track.kitsu
 
 import androidx.annotation.CallSuper
-import com.github.salomonbrys.kotson.byInt
-import com.github.salomonbrys.kotson.byString
-import com.github.salomonbrys.kotson.nullInt
-import com.github.salomonbrys.kotson.nullObj
-import com.github.salomonbrys.kotson.nullString
-import com.github.salomonbrys.kotson.obj
-import com.google.gson.JsonObject
 import eu.kanade.tachiyomi.data.database.models.Track
 import eu.kanade.tachiyomi.data.track.TrackManager
 import eu.kanade.tachiyomi.data.track.model.TrackSearch
+import kotlinx.serialization.json.JsonObject
+import kotlinx.serialization.json.contentOrNull
+import kotlinx.serialization.json.int
+import kotlinx.serialization.json.intOrNull
+import kotlinx.serialization.json.jsonObject
+import kotlinx.serialization.json.jsonPrimitive
 import java.text.SimpleDateFormat
 import java.util.Date
 import java.util.Locale
 
 class KitsuSearchManga(obj: JsonObject) {
-    val id by obj.byInt
-    private val canonicalTitle by obj.byString
-    private val chapterCount = obj.get("chapterCount").nullInt
-    val subType = obj.get("subtype").nullString
-    val original = obj.get("posterImage").nullObj?.get("original")?.asString
-    private val synopsis by obj.byString
-    private var startDate = obj.get("startDate").nullString?.let {
+    val id = obj["id"]!!.jsonPrimitive.int
+    private val canonicalTitle = obj["canonicalTitle"]!!.jsonPrimitive.content
+    private val chapterCount = obj["chapterCount"]?.jsonPrimitive?.intOrNull
+    val subType = obj["subtype"]?.jsonPrimitive?.contentOrNull
+    val original = obj["posterImage"]?.jsonObject?.get("original")?.jsonPrimitive?.content
+    private val synopsis = obj["synopsis"]!!.jsonPrimitive.content
+    private var startDate = obj["startDate"]?.jsonPrimitive?.contentOrNull?.let {
         val outputDf = SimpleDateFormat("yyyy-MM-dd", Locale.US)
         outputDf.format(Date(it.toLong() * 1000))
     }
-    private val endDate = obj.get("endDate").nullString
+    private val endDate = obj["endDate"]?.jsonPrimitive?.contentOrNull
 
     @CallSuper
     fun toTrack() = TrackSearch.create(TrackManager.KITSU).apply {
@@ -47,17 +46,17 @@ class KitsuSearchManga(obj: JsonObject) {
 }
 
 class KitsuLibManga(obj: JsonObject, manga: JsonObject) {
-    val id by manga.byInt
-    private val canonicalTitle by manga["attributes"].byString
-    private val chapterCount = manga["attributes"].obj.get("chapterCount").nullInt
-    val type = manga["attributes"].obj.get("mangaType").nullString.orEmpty()
-    val original by manga["attributes"].obj["posterImage"].byString
-    private val synopsis by manga["attributes"].byString
-    private val startDate = manga["attributes"].obj.get("startDate").nullString.orEmpty()
-    private val libraryId by obj.byInt("id")
-    val status by obj["attributes"].byString
-    private val ratingTwenty = obj["attributes"].obj.get("ratingTwenty").nullString
-    val progress by obj["attributes"].byInt
+    val id = manga["id"]!!.jsonPrimitive.int
+    private val canonicalTitle = manga["attributes"]!!.jsonObject["canonicalTitle"]!!.jsonPrimitive.content
+    private val chapterCount = manga["attributes"]!!.jsonObject["chapterCount"]?.jsonPrimitive?.intOrNull
+    val type = manga["attributes"]!!.jsonObject["mangaType"]?.jsonPrimitive?.contentOrNull.orEmpty()
+    val original = manga["attributes"]!!.jsonObject["original"]!!.jsonObject["posterImage"]!!.jsonPrimitive.content
+    private val synopsis = manga["attributes"]!!.jsonObject["synopsis"]!!.jsonPrimitive.content
+    private val startDate = manga["attributes"]!!.jsonObject["startDate"]?.jsonPrimitive?.contentOrNull.orEmpty()
+    private val libraryId = obj["id"]!!.jsonPrimitive.int
+    val status = obj["attributes"]!!.jsonObject["status"]!!.jsonPrimitive.content
+    private val ratingTwenty = obj["attributes"]!!.jsonObject["ratingTwenty"]?.jsonPrimitive?.contentOrNull
+    val progress = obj["attributes"]!!.jsonObject["progress"]!!.jsonPrimitive.int
 
     fun toTrack() = TrackSearch.create(TrackManager.KITSU).apply {
         media_id = libraryId

+ 6 - 13
app/src/main/java/eu/kanade/tachiyomi/data/track/myanimelist/MyAnimeList.kt

@@ -6,7 +6,7 @@ import eu.kanade.tachiyomi.R
 import eu.kanade.tachiyomi.data.database.models.Track
 import eu.kanade.tachiyomi.data.track.TrackService
 import eu.kanade.tachiyomi.data.track.model.TrackSearch
-import kotlinx.coroutines.Dispatchers
+import eu.kanade.tachiyomi.util.lang.runAsObservable
 import kotlinx.coroutines.runBlocking
 import kotlinx.serialization.decodeFromString
 import kotlinx.serialization.encodeToString
@@ -68,24 +68,24 @@ class MyAnimeList(private val context: Context, id: Int) : TrackService(id) {
     }
 
     override fun add(track: Track): Observable<Track> {
-        return runAsObservable { api.addItemToList(track) }
+        return runAsObservable({ api.addItemToList(track) })
     }
 
     override fun update(track: Track): Observable<Track> {
-        return runAsObservable { api.updateItem(track) }
+        return runAsObservable({ api.updateItem(track) })
     }
 
     override fun bind(track: Track): Observable<Track> {
         // TODO: change this to call add and update like the other trackers?
-        return runAsObservable { api.getListItem(track) }
+        return runAsObservable({ api.getListItem(track) })
     }
 
     override fun search(query: String): Observable<List<TrackSearch>> {
-        return runAsObservable { api.search(query) }
+        return runAsObservable({ api.search(query) })
     }
 
     override fun refresh(track: Track): Observable<Track> {
-        return runAsObservable { api.getListItem(track) }
+        return runAsObservable({ api.getListItem(track) })
     }
 
     override fun login(username: String, password: String) = login(password)
@@ -122,11 +122,4 @@ class MyAnimeList(private val context: Context, id: Int) : TrackService(id) {
             null
         }
     }
-
-    private fun <T> runAsObservable(block: suspend () -> T): Observable<T> {
-        return Observable.fromCallable { runBlocking(Dispatchers.IO) { block() } }
-            .subscribeOn(Schedulers.io())
-            .observeOn(AndroidSchedulers.mainThread())
-            .map { it }
-    }
 }

+ 32 - 5
app/src/main/java/eu/kanade/tachiyomi/util/lang/RxCoroutineBridge.kt

@@ -119,7 +119,8 @@ suspend fun <T> Single<T>.await(): T = suspendCancellableCoroutine { cont ->
 suspend fun <T> Observable<T>.awaitFirst(): T = first().awaitOne()
 
 @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
-suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T = firstOrDefault(default).awaitOne()
+suspend fun <T> Observable<T>.awaitFirstOrDefault(default: T): T =
+    firstOrDefault(default).awaitOne()
 
 @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
 suspend fun <T> Observable<T>.awaitFirstOrNull(): T? = firstOrDefault(null).awaitOne()
@@ -137,7 +138,8 @@ suspend fun <T> Observable<T>.awaitLast(): T = last().awaitOne()
 @OptIn(InternalCoroutinesApi::class, ExperimentalCoroutinesApi::class)
 suspend fun <T> Observable<T>.awaitSingle(): T = single().awaitOne()
 
-suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T = singleOrDefault(default).awaitOne()
+suspend fun <T> Observable<T>.awaitSingleOrDefault(default: T): T =
+    singleOrDefault(default).awaitOne()
 
 suspend fun <T> Observable<T>.awaitSingleOrNull(): T? = singleOrDefault(null).awaitOne()
 
@@ -203,9 +205,9 @@ fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode =
     return Observable.create(
         { emitter ->
             /*
-         * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
-         * asObservable is already invoked from unconfined
-         */
+             * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
+             * asObservable is already invoked from unconfined
+             */
             val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
                 try {
                     collect { emitter.onNext(it) }
@@ -224,3 +226,28 @@ fun <T : Any> Flow<T>.asObservable(backpressureMode: Emitter.BackpressureMode =
         backpressureMode
     )
 }
+
+fun <T> runAsObservable(
+    block: suspend () -> T,
+    backpressureMode: Emitter.BackpressureMode = Emitter.BackpressureMode.NONE
+): Observable<T> {
+    return Observable.create(
+        { emitter ->
+            val job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.ATOMIC) {
+                try {
+                    emitter.onNext(block())
+                    emitter.onCompleted()
+                } catch (e: Throwable) {
+                    // Ignore `CancellationException` as error, since it indicates "normal cancellation"
+                    if (e !is CancellationException) {
+                        emitter.onError(e)
+                    } else {
+                        emitter.onCompleted()
+                    }
+                }
+            }
+            emitter.setCancellation { job.cancel() }
+        },
+        backpressureMode
+    )
+}