Pārlūkot izejas kodu

Replace RxJava in Downloader (#9256)

* Rename removeFromQueueByPredicate to removeFromQueueIf

Follow-up to PR comment in #9511

* Make Download hashCode stable

Mutating pages would previously change the Download hashCode, which
breaks HashMap lookups.

* Convert Donwloader subscription to coroutine

Replace downloadsRelay with activeDownloadsFlow. Instead of managing
a PublishRelay independent from the queue, derive a Flow of active
downloads directly from the queue StateFlow. (This will allow
updating the queue without pausing the downloader, to be done in a
follow-up PR.)

When a download completes successfully, the downloads is removed from
queueState. This updates activeDownloadsFlow and causes the
downloaderJob start the download job for the next active download.

When a download fails, the download is left in the queue, so
queueState is not modified. To make activeDownloadsFlow update
without a change to queueState, use transformLatest and use the
Download statusFlows to suspend until a download reaches the ERROR
state.

To avoid stopping and starting downloads every time
activeDownloadsFlow emits a new value, maintain a map of current
download Jobs and only start/stop jobs in the difference between
downloadJobs and activeDownloads. To make sure all child download
jobs are cancelled when the top-level downloader job is cancelled,
use supervisorScope.

* Remove obsolete main thread references in Downloader

Thread safety of the queue state used to be guaranteed by running all
queue mutation on the main thread, but this has not been true for
some time. Since the queue state is now backed by a StateFlow,
queueState can be safely updated by any thread.
Two-Ai 1 gadu atpakaļ
vecāks
revīzija
3ae1e37c40

+ 90 - 82
app/src/main/java/eu/kanade/tachiyomi/data/download/Downloader.kt

@@ -2,7 +2,6 @@ package eu.kanade.tachiyomi.data.download
 
 import android.content.Context
 import com.hippo.unifile.UniFile
-import com.jakewharton.rxrelay.PublishRelay
 import eu.kanade.domain.chapter.model.toSChapter
 import eu.kanade.domain.manga.model.getComicInfo
 import eu.kanade.tachiyomi.R
@@ -17,26 +16,31 @@ import eu.kanade.tachiyomi.util.storage.DiskUtil
 import eu.kanade.tachiyomi.util.storage.DiskUtil.NOMEDIA_FILE
 import eu.kanade.tachiyomi.util.storage.saveTo
 import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.SupervisorJob
 import kotlinx.coroutines.async
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.asFlow
 import kotlinx.coroutines.flow.asStateFlow
+import kotlinx.coroutines.flow.collectLatest
+import kotlinx.coroutines.flow.combine
+import kotlinx.coroutines.flow.distinctUntilChanged
+import kotlinx.coroutines.flow.filter
 import kotlinx.coroutines.flow.first
 import kotlinx.coroutines.flow.flatMapMerge
 import kotlinx.coroutines.flow.flow
 import kotlinx.coroutines.flow.flowOn
 import kotlinx.coroutines.flow.retryWhen
+import kotlinx.coroutines.flow.transformLatest
 import kotlinx.coroutines.flow.update
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.supervisorScope
 import logcat.LogPriority
 import nl.adaptivity.xmlutil.serialization.XML
 import okhttp3.Response
-import rx.Observable
-import rx.Subscription
-import rx.android.schedulers.AndroidSchedulers
-import rx.schedulers.Schedulers
 import tachiyomi.core.metadata.comicinfo.COMIC_INFO_FILE
 import tachiyomi.core.metadata.comicinfo.ComicInfo
 import tachiyomi.core.util.lang.awaitSingle
@@ -61,11 +65,7 @@ import java.util.zip.ZipOutputStream
 /**
  * This class is the one in charge of downloading chapters.
  *
- * Its queue contains the list of chapters to download. In order to download them, the downloader
- * subscription must be running and the list of chapters must be sent to them by [downloadsRelay].
- *
- * The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected
- * behavior, but it's safe to read it from multiple threads.
+ * Its queue contains the list of chapters to download.
  */
 class Downloader(
     private val context: Context,
@@ -93,21 +93,14 @@ class Downloader(
      */
     private val notifier by lazy { DownloadNotifier(context) }
 
-    /**
-     * Downloader subscription.
-     */
-    private var subscription: Subscription? = null
-
-    /**
-     * Relay to send a list of downloads to the downloader.
-     */
-    private val downloadsRelay = PublishRelay.create<List<Download>>()
+    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
+    private var downloaderJob: Job? = null
 
     /**
      * Whether the downloader is running.
      */
     val isRunning: Boolean
-        get() = subscription != null
+        get() = downloaderJob?.isActive ?: false
 
     /**
      * Whether the downloader is paused
@@ -129,18 +122,17 @@ class Downloader(
      * @return true if the downloader is started, false otherwise.
      */
     fun start(): Boolean {
-        if (subscription != null || queueState.value.isEmpty()) {
+        if (isRunning || queueState.value.isEmpty()) {
             return false
         }
 
-        initializeSubscription()
-
         val pending = queueState.value.filter { it.status != Download.State.DOWNLOADED }
         pending.forEach { if (it.status != Download.State.QUEUE) it.status = Download.State.QUEUE }
 
         isPaused = false
 
-        downloadsRelay.call(pending)
+        launchDownloaderJob()
+
         return pending.isNotEmpty()
     }
 
@@ -148,7 +140,7 @@ class Downloader(
      * Stops the downloader.
      */
     fun stop(reason: String? = null) {
-        destroySubscription()
+        cancelDownloaderJob()
         queueState.value
             .filter { it.status == Download.State.DOWNLOADING }
             .forEach { it.status = Download.State.ERROR }
@@ -176,7 +168,7 @@ class Downloader(
      * Pauses the downloader
      */
     fun pause() {
-        destroySubscription()
+        cancelDownloaderJob()
         queueState.value
             .filter { it.status == Download.State.DOWNLOADING }
             .forEach { it.status = Download.State.QUEUE }
@@ -187,7 +179,7 @@ class Downloader(
      * Removes everything from the queue.
      */
     fun clearQueue() {
-        destroySubscription()
+        cancelDownloaderJob()
 
         _clearQueue()
         notifier.dismissProgress()
@@ -196,49 +188,74 @@ class Downloader(
     /**
      * Prepares the subscriptions to start downloading.
      */
-    private fun initializeSubscription() {
-        if (subscription != null) return
-
-        subscription = downloadsRelay.concatMapIterable { it }
-            // Concurrently download from 5 different sources
-            .groupBy { it.source }
-            .flatMap(
-                { bySource ->
-                    bySource.concatMap { download ->
-                        Observable.fromCallable {
-                            runBlocking { downloadChapter(download) }
-                            download
-                        }.subscribeOn(Schedulers.io())
-                    }
-                },
-                5,
-            )
-            .onBackpressureLatest()
-            .observeOn(AndroidSchedulers.mainThread())
-            .subscribe(
-                {
-                    // Remove successful download from queue
-                    if (it.status == Download.State.DOWNLOADED) {
-                        removeFromQueue(it)
+    private fun launchDownloaderJob() {
+        if (isRunning) return
+
+        downloaderJob = scope.launch {
+            val activeDownloadsFlow = queueState.transformLatest { queue ->
+                while (true) {
+                    val activeDownloads = queue.asSequence()
+                        .filter { it.status.value <= Download.State.DOWNLOADING.value } // Ignore completed downloads, leave them in the queue
+                        .groupBy { it.source }
+                        .toList().take(5) // Concurrently download from 5 different sources
+                        .map { (_, downloads) -> downloads.first() }
+                    emit(activeDownloads)
+
+                    if (activeDownloads.isEmpty()) break
+                    // Suspend until a download enters the ERROR state
+                    val activeDownloadsErroredFlow =
+                        combine(activeDownloads.map(Download::statusFlow)) { states ->
+                            states.contains(Download.State.ERROR)
+                        }.filter { it }
+                    activeDownloadsErroredFlow.first()
+                }
+            }.distinctUntilChanged()
+
+            // Use supervisorScope to cancel child jobs when the downloader job is cancelled
+            supervisorScope {
+                val downloadJobs = mutableMapOf<Download, Job>()
+
+                activeDownloadsFlow.collectLatest { activeDownloads ->
+                    val downloadJobsToStop = downloadJobs.filter { it.key !in activeDownloads }
+                    downloadJobsToStop.forEach { (download, job) ->
+                        job.cancel()
+                        downloadJobs.remove(download)
                     }
-                    if (areAllDownloadsFinished()) {
-                        stop()
+
+                    val downloadsToStart = activeDownloads.filter { it !in downloadJobs }
+                    downloadsToStart.forEach { download ->
+                        downloadJobs[download] = launchDownloadJob(download)
                     }
-                },
-                { error ->
-                    logcat(LogPriority.ERROR, error)
-                    notifier.onError(error.message)
-                    stop()
-                },
-            )
+                }
+            }
+        }
+    }
+
+    private fun CoroutineScope.launchDownloadJob(download: Download) = launchIO {
+        try {
+            downloadChapter(download)
+
+            // Remove successful download from queue
+            if (download.status == Download.State.DOWNLOADED) {
+                removeFromQueue(download)
+            }
+            if (areAllDownloadsFinished()) {
+                stop()
+            }
+        } catch (e: Throwable) {
+            if (e is CancellationException) throw e
+            logcat(LogPriority.ERROR, e)
+            notifier.onError(e.message)
+            stop()
+        }
     }
 
     /**
      * Destroys the downloader subscriptions.
      */
-    private fun destroySubscription() {
-        subscription?.unsubscribe()
-        subscription = null
+    private fun cancelDownloaderJob() {
+        downloaderJob?.cancel()
+        downloaderJob = null
     }
 
     /**
@@ -255,17 +272,13 @@ class Downloader(
 
         val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchIO
         val wasEmpty = queueState.value.isEmpty()
-        // Called in background thread, the operation can be slow with SAF.
-        val chaptersWithoutDir = async {
-            chapters
-                // Filter out those already downloaded.
-                .filter { provider.findChapterDir(it.name, it.scanlator, manga.title, source) == null }
-                // Add chapters to queue from the start.
-                .sortedByDescending { it.sourceOrder }
-        }
+        val chaptersWithoutDir = chapters
+            // Filter out those already downloaded.
+            .filter { provider.findChapterDir(it.name, it.scanlator, manga.title, source) == null }
+            // Add chapters to queue from the start.
+            .sortedByDescending { it.sourceOrder }
 
-        // Runs in main thread (synchronization needed).
-        val chaptersToQueue = chaptersWithoutDir.await()
+        val chaptersToQueue = chaptersWithoutDir
             // Filter out those already enqueued.
             .filter { chapter -> queueState.value.none { it.chapter.id == chapter.id } }
             // Create a download for each one.
@@ -274,11 +287,6 @@ class Downloader(
         if (chaptersToQueue.isNotEmpty()) {
             addAllToQueue(chaptersToQueue)
 
-            if (isRunning) {
-                // Send the list of downloads to the downloader.
-                downloadsRelay.call(chaptersToQueue)
-            }
-
             // Start downloader if needed
             if (autoStart && wasEmpty) {
                 val queuedDownloads = queueState.value.count { it.source !is UnmeteredSource }
@@ -656,7 +664,7 @@ class Downloader(
         }
     }
 
-    private inline fun removeFromQueueByPredicate(predicate: (Download) -> Boolean) {
+    private inline fun removeFromQueueIf(predicate: (Download) -> Boolean) {
         _queueState.update { queue ->
             val downloads = queue.filter { predicate(it) }
             store.removeAll(downloads)
@@ -671,11 +679,11 @@ class Downloader(
 
     fun removeFromQueue(chapters: List<Chapter>) {
         val chapterIds = chapters.map { it.id }
-        removeFromQueueByPredicate { it.chapter.id in chapterIds }
+        removeFromQueueIf { it.chapter.id in chapterIds }
     }
 
     fun removeFromQueue(manga: Manga) {
-        removeFromQueueByPredicate { it.manga.id == manga.id }
+        removeFromQueueIf { it.manga.id == manga.id }
     }
 
     private fun _clearQueue() {

+ 1 - 1
app/src/main/java/eu/kanade/tachiyomi/data/download/model/Download.kt

@@ -22,8 +22,8 @@ data class Download(
     val source: HttpSource,
     val manga: Manga,
     val chapter: Chapter,
-    var pages: List<Page>? = null,
 ) {
+    var pages: List<Page>? = null
 
     val totalProgress: Int
         get() = pages?.sumOf(Page::progress) ?: 0