Explorar o código

Replace RxJava in HttpPageLoader downloader (#8955)

* Convert downloader Observable to flow

Uses `runInterruptible` to turn the blocking call to `queue.take()`
into a cancellable call.

Flow collection is ended by cancelling the scope in `recycle`. This
means the `HttpPageLoader` can't be reused after calling `recycle`,
but this was true with the `Observable` as well.)

* Convert load Observables to suspending function

Inlining the Observables allows for some simplification of the error
handling. Behavior should be otherwise identical.

* Convert cleanup Completable to coroutine

Uses global `launchIO`, not ideal but similar to previous behavior.
Can't be scheduled on the local `scope` as this runs after `scope` is
cancelled.
Two-Ai %!s(int64=2) %!d(string=hai) anos
pai
achega
e4bc8990fb

+ 51 - 80
app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt

@@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page
 import eu.kanade.tachiyomi.source.online.HttpSource
 import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
 import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
-import eu.kanade.tachiyomi.util.lang.plusAssign
-import eu.kanade.tachiyomi.util.system.logcat
+import eu.kanade.tachiyomi.util.lang.awaitSingle
+import eu.kanade.tachiyomi.util.lang.launchIO
 import kotlinx.coroutines.CancellationException
-import logcat.LogPriority
-import rx.Completable
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.SupervisorJob
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.flow.filter
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.runInterruptible
 import rx.Observable
 import rx.schedulers.Schedulers
 import rx.subjects.PublishSubject
 import rx.subjects.SerializedSubject
-import rx.subscriptions.CompositeSubscription
 import uy.kohesive.injekt.Injekt
 import uy.kohesive.injekt.api.get
 import java.util.concurrent.PriorityBlockingQueue
@@ -31,33 +35,27 @@ class HttpPageLoader(
     private val chapterCache: ChapterCache = Injekt.get(),
 ) : PageLoader() {
 
+    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
+
     /**
      * A queue used to manage requests one by one while allowing priorities.
      */
     private val queue = PriorityBlockingQueue<PriorityPage>()
 
-    /**
-     * Current active subscriptions.
-     */
-    private val subscriptions = CompositeSubscription()
-
     private val preloadSize = 4
 
     init {
-        subscriptions += Observable.defer { Observable.just(queue.take().page) }
-            .filter { it.status == Page.State.QUEUE }
-            .concatMap { source.fetchImageFromCacheThenNet(it) }
-            .repeat()
-            .subscribeOn(Schedulers.io())
-            .subscribe(
-                {
-                },
-                { error ->
-                    if (error !is InterruptedException) {
-                        logcat(LogPriority.ERROR, error)
-                    }
-                },
-            )
+        scope.launchIO {
+            flow {
+                while (true) {
+                    emit(runInterruptible { queue.take() }.page)
+                }
+            }
+                .filter { it.status == Page.State.QUEUE }
+                .collect {
+                    loadPage(it)
+                }
+        }
     }
 
     /**
@@ -65,21 +63,23 @@ class HttpPageLoader(
      */
     override fun recycle() {
         super.recycle()
-        subscriptions.unsubscribe()
+        scope.cancel()
         queue.clear()
 
         // Cache current page list progress for online chapters to allow a faster reopen
         val pages = chapter.pages
         if (pages != null) {
-            Completable
-                .fromAction {
+            launchIO {
+                try {
                     // Convert to pages without reader information
                     val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) }
                     chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
+                } catch (e: Throwable) {
+                    if (e is CancellationException) {
+                        throw e
+                    }
                 }
-                .onErrorComplete()
-                .subscribeOn(Schedulers.io())
-                .subscribe()
+            }
         }
     }
 
@@ -192,61 +192,32 @@ class HttpPageLoader(
     }
 
     /**
-     * Returns an observable of the page with the downloaded image.
+     * Loads the page, retrieving the image URL and downloading the image if necessary.
+     * Downloaded images are stored in the chapter cache.
      *
      * @param page the page whose source image has to be downloaded.
      */
-    private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> {
-        return if (page.imageUrl.isNullOrEmpty()) {
-            getImageUrl(page).flatMap { getCachedImage(it) }
-        } else {
-            getCachedImage(page)
-        }
-    }
-
-    private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
-        page.status = Page.State.LOAD_PAGE
-        return fetchImageUrl(page)
-            .doOnError { page.status = Page.State.ERROR }
-            .onErrorReturn { null }
-            .doOnNext { page.imageUrl = it }
-            .map { page }
-    }
-
-    /**
-     * Returns an observable of the page that gets the image from the chapter or fallbacks to
-     * network and copies it to the cache calling [cacheImage].
-     *
-     * @param page the page.
-     */
-    private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
-        val imageUrl = page.imageUrl ?: return Observable.just(page)
-
-        return Observable.just(page)
-            .flatMap {
-                if (!chapterCache.isImageInCache(imageUrl)) {
-                    cacheImage(page)
-                } else {
-                    Observable.just(page)
-                }
+    private suspend fun loadPage(page: ReaderPage) {
+        try {
+            if (page.imageUrl.isNullOrEmpty()) {
+                page.status = Page.State.LOAD_PAGE
+                page.imageUrl = source.fetchImageUrl(page).awaitSingle()
             }
-            .doOnNext {
-                page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
-                page.status = Page.State.READY
+            val imageUrl = page.imageUrl!!
+
+            if (!chapterCache.isImageInCache(imageUrl)) {
+                page.status = Page.State.DOWNLOAD_IMAGE
+                val imageResponse = source.fetchImage(page).awaitSingle()
+                chapterCache.putImageToCache(imageUrl, imageResponse)
             }
-            .doOnError { page.status = Page.State.ERROR }
-            .onErrorReturn { page }
-    }
 
-    /**
-     * Returns an observable of the page that downloads the image to [ChapterCache].
-     *
-     * @param page the page.
-     */
-    private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
-        page.status = Page.State.DOWNLOAD_IMAGE
-        return fetchImage(page)
-            .doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) }
-            .map { page }
+            page.stream = { chapterCache.getImageFile(imageUrl).inputStream() }
+            page.status = Page.State.READY
+        } catch (e: Throwable) {
+            page.status = Page.State.ERROR
+            if (e is CancellationException) {
+                throw e
+            }
+        }
     }
 }