Эх сурвалжийг харах

Replace RxJava in DownloadQueue (#9016)

* Misc cleanup

- Replace !List.isEmpty with List.isNotEmpty
- Remove redundant case in MoreScreenModel
- Drop no-op StateFlow.catch
  - From lint warning:
> SharedFlow never completes, so this operator typically has not
> effect, it can only catch exceptions from 'onSubscribe' operator

* Convert DownloadQueue queue to MutableStateFlow

Replace delegation to a MutableList with an internal

In order to avoid modifying every usage of the queue as a list, add
passthrough functions for the currently used list functions. This
should be later refactored, possibly by inlining DownloadQueue
into Downloader.

DownloadQueue.updates was a SharedFlow which updated every time a
change was made to the queue. This is now equivalent to the queue

Simultaneous assignments to _state.value could cause concurrency
issues. To avoid this, always modify the queue using _state.update.

* Add Download.statusFlow/progressFlow

progressFlow is based on the DownloadQueueScreenModel implementation
rather than the DownloadQueue implementation.

* Reimplement DownloadQueue.statusFlow/progressFlow

Use StateFlow<List<T>>.flatMapLatest() and List<Flow<T>>.merge() to
replicate the effect of PublishSubject.

Use drop(1) to avoid re-emitting the state of each download each time
the merged flow is recreated.

* fixup! Reimplement DownloadQueue.statusFlow/progressFlow
Two-Ai 2 жил өмнө

+ 1 - 1

@@ -148,7 +148,7 @@ class Downloader(
-        if (notifier.paused && !queue.isEmpty()) {
+        if (notifier.paused && queue.isNotEmpty()) {
         } else {

+ 27 - 9

@@ -5,7 +5,14 @@ import eu.kanade.domain.manga.interactor.GetManga
 import eu.kanade.tachiyomi.source.SourceManager
 import eu.kanade.tachiyomi.source.model.Page
 import eu.kanade.tachiyomi.source.online.HttpSource
-import rx.subjects.PublishSubject
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.asStateFlow
+import kotlinx.coroutines.flow.combine
+import kotlinx.coroutines.flow.debounce
+import kotlinx.coroutines.flow.distinctUntilChanged
+import kotlinx.coroutines.flow.emitAll
+import kotlinx.coroutines.flow.flow
 import tachiyomi.domain.chapter.model.Chapter
 import tachiyomi.domain.manga.model.Manga
 import uy.kohesive.injekt.Injekt
@@ -25,20 +32,31 @@ data class Download(
     var downloadedImages: Int = 0
-    @Volatile
-    var status: State = State.NOT_DOWNLOADED
+    private val _statusFlow = MutableStateFlow(State.NOT_DOWNLOADED)
+    @Transient
+    val statusFlow = _statusFlow.asStateFlow()
+    var status: State
+        get() = _statusFlow.value
         set(status) {
-            field = status
-            statusSubject?.onNext(this)
-            statusCallback?.invoke(this)
+            _statusFlow.value = status
-    var statusSubject: PublishSubject<Download>? = null
+    val progressFlow = flow {
+        if (pages == null) {
+            emit(0)
+            while (pages == null) {
+                delay(50)
+            }
+        }
-    @Transient
-    var statusCallback: ((Download) -> Unit)? = null
+        val progressFlows = pages!!.map(Page::progressFlow)
+        emitAll(combine(progressFlows) { it.average().toInt() })
+    }
+        .distinctUntilChanged()
+        .debounce(50)
     val progress: Int
         get() {

+ 59 - 91

@@ -1,69 +1,48 @@
 package eu.kanade.tachiyomi.data.download.model
-import eu.kanade.core.util.asFlow
 import eu.kanade.tachiyomi.data.download.DownloadStore
-import eu.kanade.tachiyomi.source.model.Page
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.SharingStarted
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.asFlow
+import kotlinx.coroutines.flow.asStateFlow
+import kotlinx.coroutines.flow.drop
+import kotlinx.coroutines.flow.emitAll
+import kotlinx.coroutines.flow.flatMapLatest
 import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.merge
 import kotlinx.coroutines.flow.onStart
-import kotlinx.coroutines.flow.receiveAsFlow
-import kotlinx.coroutines.flow.shareIn
-import rx.Observable
-import rx.subjects.PublishSubject
-import tachiyomi.core.util.lang.launchNonCancellable
+import kotlinx.coroutines.flow.update
 import tachiyomi.domain.chapter.model.Chapter
 import tachiyomi.domain.manga.model.Manga
-import java.util.concurrent.CopyOnWriteArrayList
 class DownloadQueue(
     private val store: DownloadStore,
-    private val queue: MutableList<Download> = CopyOnWriteArrayList(),
-) : List<Download> by queue {
-    private val scope = CoroutineScope(Dispatchers.IO)
-    private val statusSubject = PublishSubject.create<Download>()
-    private val _updates: Channel<Unit> = Channel(Channel.UNLIMITED)
-    val updates = _updates.receiveAsFlow()
-        .onStart { emit(Unit) }
-        .map { queue }
-        .shareIn(scope, SharingStarted.Eagerly, 1)
+) {
+    private val _state = MutableStateFlow<List<Download>>(emptyList())
+    val state = _state.asStateFlow()
     fun addAll(downloads: List<Download>) {
-        downloads.forEach { download ->
-            download.statusSubject = statusSubject
-            download.statusCallback = ::setPagesFor
-            download.status = Download.State.QUEUE
-        }
-        queue.addAll(downloads)
-        store.addAll(downloads)
-        scope.launchNonCancellable {
-            _updates.send(Unit)
+        _state.update {
+            downloads.forEach { download ->
+                download.status = Download.State.QUEUE
+            }
+            store.addAll(downloads)
+            it + downloads
     fun remove(download: Download) {
-        val removed = queue.remove(download)
-        store.remove(download)
-        download.statusSubject = null
-        download.statusCallback = null
-        if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
-            download.status = Download.State.NOT_DOWNLOADED
-        }
-        if (removed) {
-            scope.launchNonCancellable {
-                _updates.send(Unit)
+        _state.update {
+            store.remove(download)
+            if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
+                download.status = Download.State.NOT_DOWNLOADED
+            it - download
     fun remove(chapter: Chapter) {
-        find { it.chapter.id == chapter.id }?.let { remove(it) }
+        _state.value.find { it.chapter.id == chapter.id }?.let { remove(it) }
     fun remove(chapters: List<Chapter>) {
@@ -71,61 +50,50 @@ class DownloadQueue(
     fun remove(manga: Manga) {
-        filter { it.manga.id == manga.id }.forEach { remove(it) }
+        _state.value.filter { it.manga.id == manga.id }.forEach { remove(it) }
     fun clear() {
-        queue.forEach { download ->
-            download.statusSubject = null
-            download.statusCallback = null
-            if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
-                download.status = Download.State.NOT_DOWNLOADED
+        _state.update {
+            it.forEach { download ->
+                if (download.status == Download.State.DOWNLOADING || download.status == Download.State.QUEUE) {
+                    download.status = Download.State.NOT_DOWNLOADED
+                }
-        }
-        queue.clear()
-        store.clear()
-        scope.launchNonCancellable {
-            _updates.send(Unit)
+            store.clear()
+            emptyList()
-    fun statusFlow(): Flow<Download> = getStatusObservable().asFlow()
-    fun progressFlow(): Flow<Download> = getProgressObservable().asFlow()
-    private fun getActiveDownloads(): Observable<Download> =
-        Observable.from(this).filter { download -> download.status == Download.State.DOWNLOADING }
-    private fun getStatusObservable(): Observable<Download> = statusSubject
-        .startWith(getActiveDownloads())
-        .onBackpressureBuffer()
-    private fun getProgressObservable(): Observable<Download> {
-        return statusSubject.onBackpressureBuffer()
-            .startWith(getActiveDownloads())
-            .flatMap { download ->
-                if (download.status == Download.State.DOWNLOADING) {
-                    val pageStatusSubject = PublishSubject.create<Page.State>()
-                    setPagesSubject(download.pages, pageStatusSubject)
-                    return@flatMap pageStatusSubject
-                        .onBackpressureBuffer()
-                        .filter { it == Page.State.READY }
-                        .map { download }
-                } else if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) {
-                    setPagesSubject(download.pages, null)
+    fun statusFlow(): Flow<Download> = state
+        .flatMapLatest { downloads ->
+            downloads
+                .map { download ->
+                    download.statusFlow.drop(1).map { download }
-                Observable.just(download)
-            }
-            .filter { it.status == Download.State.DOWNLOADING }
-    }
-    private fun setPagesFor(download: Download) {
-        if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) {
-            setPagesSubject(download.pages, null)
+                .merge()
-    }
+        .onStart { emitAll(getActiveDownloads()) }
-    private fun setPagesSubject(pages: List<Page>?, subject: PublishSubject<Page.State>?) {
-        pages?.forEach { it.statusSubject = subject }
-    }
+    fun progressFlow(): Flow<Download> = state
+        .flatMapLatest { downloads ->
+            downloads
+                .map { download ->
+                    download.progressFlow.drop(1).map { download }
+                }
+                .merge()
+        }
+        .onStart { emitAll(getActiveDownloads()) }
+    private fun getActiveDownloads(): Flow<Download> =
+        _state.value.filter { download -> download.status == Download.State.DOWNLOADING }.asFlow()
+    fun count(predicate: (Download) -> Boolean) = _state.value.count(predicate)
+    fun filter(predicate: (Download) -> Boolean) = _state.value.filter(predicate)
+    fun find(predicate: (Download) -> Boolean) = _state.value.find(predicate)
+    fun <K> groupBy(keySelector: (Download) -> K) = _state.value.groupBy(keySelector)
+    fun isEmpty() = _state.value.isEmpty()
+    fun isNotEmpty() = _state.value.isNotEmpty()
+    fun none(predicate: (Download) -> Boolean) = _state.value.none(predicate)
+    fun toMutableList() = _state.value.toMutableList()

+ 1 - 5

@@ -14,7 +14,6 @@ import kotlinx.coroutines.Job
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.asStateFlow
-import kotlinx.coroutines.flow.catch
 import kotlinx.coroutines.flow.collectLatest
 import kotlinx.coroutines.flow.combine
 import kotlinx.coroutines.flow.debounce
@@ -22,8 +21,6 @@ import kotlinx.coroutines.flow.distinctUntilChanged
 import kotlinx.coroutines.flow.map
 import kotlinx.coroutines.flow.update
 import kotlinx.coroutines.launch
-import logcat.LogPriority
-import tachiyomi.core.util.system.logcat
 import uy.kohesive.injekt.Injekt
 import uy.kohesive.injekt.api.get
@@ -116,8 +113,7 @@ class DownloadQueueScreenModel(
     init {
         coroutineScope.launch {
-            downloadManager.queue.updates
-                .catch { logcat(LogPriority.ERROR, it) }
+            downloadManager.queue.state
                 .map { downloads ->
                         .groupBy { it.source }

+ 2 - 3

@@ -95,14 +95,13 @@ private class MoreScreenModel(
         coroutineScope.launchIO {
-                downloadManager.queue.updates,
+                downloadManager.queue.state,
             ) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) }
                 .collectLatest { (isDownloading, downloadQueueSize) ->
                     val pendingDownloadExists = downloadQueueSize != 0
                     _state.value = when {
                         !pendingDownloadExists -> DownloadQueueState.Stopped
-                        !isDownloading && !pendingDownloadExists -> DownloadQueueState.Paused(0)
-                        !isDownloading && pendingDownloadExists -> DownloadQueueState.Paused(downloadQueueSize)
+                        !isDownloading -> DownloadQueueState.Paused(downloadQueueSize)
                         else -> DownloadQueueState.Downloading(downloadQueueSize)

+ 0 - 5

@@ -6,7 +6,6 @@ import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.asStateFlow
 import kotlinx.serialization.Serializable
 import kotlinx.serialization.Transient
-import rx.subjects.Subject
 open class Page(
@@ -28,7 +27,6 @@ open class Page(
         get() = _statusFlow.value
         set(value) {
             _statusFlow.value = value
-            statusSubject?.onNext(value)
@@ -42,9 +40,6 @@ open class Page(
             _progressFlow.value = value
-    @Transient
-    var statusSubject: Subject<State, State>? = null
     override fun update(bytesRead: Long, contentLength: Long, done: Boolean) {
         progress = if (contentLength > 0) {
             (100 * bytesRead / contentLength).toInt()