Browse Source

Convert downloadChapter to suspend function (#9127)

1:1 translation from the RxJava implementation, should match the
previous behavior.

Dropped the return value from functions of the form
```
fun foo(t: T, ...): Observable<T>
```
where the Observable produced the original argument `t`.
The caller already has the result if necessary.

While this conversion is not flow-based overall, some sections use
flows to use the flatMapMerge and retryWhen operators.

Removed RetryWithDelay as it was only used here.

Inlined fetchAllImageUrlsFromPageList instead of converting it to a
suspending equivalent. fetchAllImageUrlsFromPageList is no longer
used in the app, but was not removed as it is part of source-api.
(However, it does not seem to be used exposed in extensions-lib or
used in tachiyomi-extensions.)

runBlocking is used as a temporary stop-gap.
Two-Ai 2 years ago
parent
commit
fa61c8fe6f

+ 122 - 97
app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt

@@ -18,13 +18,21 @@ import eu.kanade.tachiyomi.source.SourceManager
 import eu.kanade.tachiyomi.source.UnmeteredSource
 import eu.kanade.tachiyomi.source.model.Page
 import eu.kanade.tachiyomi.source.online.HttpSource
-import eu.kanade.tachiyomi.source.online.fetchAllImageUrlsFromPageList
-import eu.kanade.tachiyomi.util.lang.RetryWithDelay
 import eu.kanade.tachiyomi.util.storage.DiskUtil
 import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE
 import eu.kanade.tachiyomi.util.storage.saveTo
 import eu.kanade.tachiyomi.util.system.ImageUtil
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.asFlow
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.flow.flatMapMerge
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.flowOn
+import kotlinx.coroutines.flow.retryWhen
+import kotlinx.coroutines.runBlocking
 import logcat.LogPriority
 import nl.adaptivity.xmlutil.serialization.XML
 import okhttp3.Response
@@ -33,8 +41,10 @@ import rx.Subscription
 import rx.android.schedulers.AndroidSchedulers
 import rx.schedulers.Schedulers
 import rx.subscriptions.CompositeSubscription
+import tachiyomi.core.util.lang.awaitSingle
 import tachiyomi.core.util.lang.launchIO
 import tachiyomi.core.util.lang.launchNow
+import tachiyomi.core.util.lang.withIOContext
 import tachiyomi.core.util.lang.withUIContext
 import tachiyomi.core.util.system.logcat
 import tachiyomi.domain.chapter.model.Chapter
@@ -205,7 +215,10 @@ class Downloader(
             .flatMap(
                 { bySource ->
                     bySource.concatMap { download ->
-                        downloadChapter(download).subscribeOn(Schedulers.io())
+                        Observable.fromCallable {
+                            runBlocking { downloadChapter(download) }
+                            download
+                        }.subscribeOn(Schedulers.io())
                     }
                 },
                 5,
@@ -298,82 +311,91 @@ class Downloader(
     }
 
     /**
-     * Returns the observable which downloads a chapter.
+     * Downloads a chapter.
      *
      * @param download the chapter to be downloaded.
      */
-    private fun downloadChapter(download: Download): Observable<Download> = Observable.defer {
+    private suspend fun downloadChapter(download: Download) {
         val mangaDir = provider.getMangaDir(download.manga.title, download.source)
 
         val availSpace = DiskUtil.getAvailableStorageSpace(mangaDir)
         if (availSpace != -1L && availSpace < MIN_DISK_SPACE) {
             download.status = Download.State.ERROR
             notifier.onError(context.getString(R.string.download_insufficient_space), download.chapter.name, download.manga.title)
-            return@defer Observable.just(download)
+            return
         }
 
         val chapterDirname = provider.getChapterDirName(download.chapter.name, download.chapter.scanlator)
         val tmpDir = mangaDir.createDirectory(chapterDirname + TMP_DIR_SUFFIX)
 
-        val pageListObservable = if (download.pages == null) {
-            // Pull page list from network and add them to download object
-            download.source.fetchPageList(download.chapter.toSChapter())
-                .map { pages ->
-                    if (pages.isEmpty()) {
-                        throw Exception(context.getString(R.string.page_list_empty_error))
-                    }
-                    // Don't trust index from source
-                    val reIndexedPages = pages.mapIndexed { index, page -> Page(index, page.url, page.imageUrl, page.uri) }
-                    download.pages = reIndexedPages
-                    reIndexedPages
+        try {
+            // If the page list already exists, start from the file
+            val pageList = download.pages ?: run {
+                // Otherwise, pull page list from network and add them to download object
+                val pages = download.source.getPageList(download.chapter.toSChapter())
+
+                if (pages.isEmpty()) {
+                    throw Exception(context.getString(R.string.page_list_empty_error))
                 }
-        } else {
-            // Or if the page list already exists, start from the file
-            Observable.just(download.pages!!)
-        }
+                // Don't trust index from source
+                val reIndexedPages = pages.mapIndexed { index, page -> Page(index, page.url, page.imageUrl, page.uri) }
+                download.pages = reIndexedPages
+                reIndexedPages
+            }
 
-        pageListObservable
-            .doOnNext { _ ->
-                // Delete all temporary (unfinished) files
-                tmpDir.listFiles()
-                    ?.filter { it.name!!.endsWith(".tmp") }
-                    ?.forEach { it.delete() }
+            // Delete all temporary (unfinished) files
+            tmpDir.listFiles()
+                ?.filter { it.name!!.endsWith(".tmp") }
+                ?.forEach { it.delete() }
+
+            download.status = Download.State.DOWNLOADING
 
-                download.status = Download.State.DOWNLOADING
-            }
             // Get all the URLs to the source images, fetch pages if necessary
-            .flatMap { download.source.fetchAllImageUrlsFromPageList(it) }
+            pageList.filter { it.imageUrl.isNullOrEmpty() }.forEach { page ->
+                page.status = Page.State.LOAD_PAGE
+                try {
+                    page.imageUrl = download.source.fetchImageUrl(page).awaitSingle()
+                } catch (e: Throwable) {
+                    page.status = Page.State.ERROR
+                }
+            }
+
             // Start downloading images, consider we can have downloaded images already
             // Concurrently do 2 pages at a time
-            .flatMap({ page -> getOrDownloadImage(page, download, tmpDir).subscribeOn(Schedulers.io()) }, 2)
-            .onBackpressureLatest()
-            // Do when page is downloaded.
-            .doOnNext { notifier.onProgressChange(download) }
-            .toList()
-            .map { download }
+            pageList.asFlow()
+                .flatMapMerge(concurrency = 2) { page ->
+                    flow {
+                        withIOContext { getOrDownloadImage(page, download, tmpDir) }
+                        emit(page)
+                    }.flowOn(Dispatchers.IO)
+                }
+                .collect {
+                    // Do when page is downloaded.
+                    notifier.onProgressChange(download)
+                }
+
             // Do after download completes
-            .doOnNext { ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname) }
+            ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname)
+        } catch (error: Throwable) {
+            if (error is CancellationException) throw error
             // If the page list threw, it will resume here
-            .onErrorReturn { error ->
-                logcat(LogPriority.ERROR, error)
-                download.status = Download.State.ERROR
-                notifier.onError(error.message, download.chapter.name, download.manga.title)
-                download
-            }
+            logcat(LogPriority.ERROR, error)
+            download.status = Download.State.ERROR
+            notifier.onError(error.message, download.chapter.name, download.manga.title)
+        }
     }
 
     /**
-     * Returns the observable which gets the image from the filesystem if it exists or downloads it
-     * otherwise.
+     * Gets the image from the filesystem if it exists or downloads it otherwise.
      *
      * @param page the page to download.
      * @param download the download of the page.
      * @param tmpDir the temporary directory of the download.
      */
-    private fun getOrDownloadImage(page: Page, download: Download, tmpDir: UniFile): Observable<Page> {
+    private suspend fun getOrDownloadImage(page: Page, download: Download, tmpDir: UniFile) {
         // If the image URL is empty, do nothing
         if (page.imageUrl == null) {
-            return Observable.just(page)
+            return
         }
 
         val digitCount = (download.pages?.size ?: 0).toString().length.coerceAtLeast(3)
@@ -386,83 +408,86 @@ class Downloader(
         // Try to find the image file
         val imageFile = tmpDir.listFiles()?.firstOrNull { it.name!!.startsWith("$filename.") || it.name!!.startsWith("${filename}__001") }
 
-        // If the image is already downloaded, do nothing. Otherwise download from network
-        val pageObservable = when {
-            imageFile != null -> Observable.just(imageFile)
-            chapterCache.isImageInCache(page.imageUrl!!) -> copyImageFromCache(chapterCache.getImageFile(page.imageUrl!!), tmpDir, filename)
-            else -> downloadImage(page, download.source, tmpDir, filename)
-        }
+        try {
+            // If the image is already downloaded, do nothing. Otherwise download from network
+            val file = when {
+                imageFile != null -> imageFile
+                chapterCache.isImageInCache(page.imageUrl!!) -> copyImageFromCache(chapterCache.getImageFile(page.imageUrl!!), tmpDir, filename)
+                else -> downloadImage(page, download.source, tmpDir, filename)
+            }
 
-        return pageObservable
             // When the page is ready, set page path, progress (just in case) and status
-            .doOnNext { file ->
-                val success = splitTallImageIfNeeded(page, tmpDir)
-                if (!success) {
-                    notifier.onError(context.getString(R.string.download_notifier_split_failed), download.chapter.name, download.manga.title)
-                }
-                page.uri = file.uri
-                page.progress = 100
-                page.status = Page.State.READY
+            val success = splitTallImageIfNeeded(page, tmpDir)
+            if (!success) {
+                notifier.onError(context.getString(R.string.download_notifier_split_failed), download.chapter.name, download.manga.title)
             }
-            .map { page }
+            page.uri = file.uri
+            page.progress = 100
+            page.status = Page.State.READY
+        } catch (e: Throwable) {
+            if (e is CancellationException) throw e
             // Mark this page as error and allow to download the remaining
-            .onErrorReturn {
-                page.progress = 0
-                page.status = Page.State.ERROR
-                notifier.onError(it.message, download.chapter.name, download.manga.title)
-                page
-            }
+            page.progress = 0
+            page.status = Page.State.ERROR
+            notifier.onError(e.message, download.chapter.name, download.manga.title)
+        }
     }
 
     /**
-     * Returns the observable which downloads the image from network.
+     * Downloads the image from network to a file in tmpDir.
      *
      * @param page the page to download.
      * @param source the source of the page.
      * @param tmpDir the temporary directory of the download.
      * @param filename the filename of the image.
      */
-    private fun downloadImage(page: Page, source: HttpSource, tmpDir: UniFile, filename: String): Observable<UniFile> {
+    private suspend fun downloadImage(page: Page, source: HttpSource, tmpDir: UniFile, filename: String): UniFile {
         page.status = Page.State.DOWNLOAD_IMAGE
         page.progress = 0
-        return source.fetchImage(page)
-            .map { response ->
-                val file = tmpDir.createFile("$filename.tmp")
-                try {
-                    response.body.source().saveTo(file.openOutputStream())
-                    val extension = getImageExtension(response, file)
-                    file.renameTo("$filename.$extension")
-                } catch (e: Exception) {
-                    response.close()
-                    file.delete()
-                    throw e
-                }
-                file
+        return flow {
+            val response = source.getImage(page)
+            val file = tmpDir.createFile("$filename.tmp")
+            try {
+                response.body.source().saveTo(file.openOutputStream())
+                val extension = getImageExtension(response, file)
+                file.renameTo("$filename.$extension")
+            } catch (e: Exception) {
+                response.close()
+                file.delete()
+                throw e
             }
+            emit(file)
+        }
             // Retry 3 times, waiting 2, 4 and 8 seconds between attempts.
-            .retryWhen(RetryWithDelay(3, { (2 shl it - 1) * 1000 }, Schedulers.trampoline()))
+            .retryWhen { _, attempt ->
+                if (attempt < 3) {
+                    delay((2L shl attempt.toInt()) * 1000)
+                    true
+                } else {
+                    false
+                }
+            }
+            .first()
     }
 
     /**
-     * Return the observable which copies the image from cache.
+     * Copies the image from cache to file in tmpDir.
      *
      * @param cacheFile the file from cache.
      * @param tmpDir the temporary directory of the download.
      * @param filename the filename of the image.
      */
-    private fun copyImageFromCache(cacheFile: File, tmpDir: UniFile, filename: String): Observable<UniFile> {
-        return Observable.just(cacheFile).map {
-            val tmpFile = tmpDir.createFile("$filename.tmp")
-            cacheFile.inputStream().use { input ->
-                tmpFile.openOutputStream().use { output ->
-                    input.copyTo(output)
-                }
+    private fun copyImageFromCache(cacheFile: File, tmpDir: UniFile, filename: String): UniFile {
+        val tmpFile = tmpDir.createFile("$filename.tmp")
+        cacheFile.inputStream().use { input ->
+            tmpFile.openOutputStream().use { output ->
+                input.copyTo(output)
             }
-            val extension = ImageUtil.findImageType(cacheFile.inputStream()) ?: return@map tmpFile
-            tmpFile.renameTo("$filename.${extension.extension}")
-            cacheFile.delete()
-            tmpFile
         }
+        val extension = ImageUtil.findImageType(cacheFile.inputStream()) ?: return tmpFile
+        tmpFile.renameTo("$filename.${extension.extension}")
+        cacheFile.delete()
+        return tmpFile
     }
 
     /**

+ 0 - 25
app/src/main/java/eu/kanade/tachiyomi/util/lang/RetryWithDelay.kt

@@ -1,25 +0,0 @@
-package eu.kanade.tachiyomi.util.lang
-
-import rx.Observable
-import rx.Scheduler
-import rx.functions.Func1
-import rx.schedulers.Schedulers
-import java.util.concurrent.TimeUnit.MILLISECONDS
-
-class RetryWithDelay(
-    private val maxRetries: Int = 1,
-    private val retryStrategy: (Int) -> Int = { 1000 },
-    private val scheduler: Scheduler = Schedulers.computation(),
-) : Func1<Observable<out Throwable>, Observable<*>> {
-
-    private var retryCount = 0
-
-    override fun call(attempts: Observable<out Throwable>) = attempts.flatMap { error ->
-        val count = ++retryCount
-        if (count <= maxRetries) {
-            Observable.timer(retryStrategy(count).toLong(), MILLISECONDS, scheduler)
-        } else {
-            Observable.error(error as Throwable)
-        }
-    }
-}