|
@@ -1,12 +1,18 @@
|
|
|
package eu.kanade.tachiyomi.data.download.model
|
|
|
|
|
|
-import com.jakewharton.rxrelay.PublishRelay
|
|
|
import eu.kanade.core.util.asFlow
|
|
|
import eu.kanade.domain.chapter.model.Chapter
|
|
|
import eu.kanade.domain.manga.model.Manga
|
|
|
import eu.kanade.tachiyomi.data.download.DownloadStore
|
|
|
import eu.kanade.tachiyomi.source.model.Page
|
|
|
+import eu.kanade.tachiyomi.util.lang.launchNonCancellable
|
|
|
+import kotlinx.coroutines.CoroutineScope
|
|
|
+import kotlinx.coroutines.Dispatchers
|
|
|
+import kotlinx.coroutines.channels.Channel
|
|
|
import kotlinx.coroutines.flow.Flow
|
|
|
+import kotlinx.coroutines.flow.map
|
|
|
+import kotlinx.coroutines.flow.onStart
|
|
|
+import kotlinx.coroutines.flow.receiveAsFlow
|
|
|
import rx.Observable
|
|
|
import rx.subjects.PublishSubject
|
|
|
import java.util.concurrent.CopyOnWriteArrayList
|
|
@@ -16,9 +22,12 @@ class DownloadQueue(
|
|
|
private val queue: MutableList<Download> = CopyOnWriteArrayList(),
|
|
|
) : List<Download> by queue {
|
|
|
|
|
|
+ private val scope = CoroutineScope(Dispatchers.IO)
|
|
|
+
|
|
|
private val statusSubject = PublishSubject.create<Download>()
|
|
|
|
|
|
- private val updatedRelay = PublishRelay.create<Unit>()
|
|
|
+ private val _updates: Channel<Unit> = Channel(Channel.UNLIMITED)
|
|
|
+ val updates = _updates.receiveAsFlow().onStart { emit(Unit) }.map { queue }
|
|
|
|
|
|
fun addAll(downloads: List<Download>) {
|
|
|
downloads.forEach { download ->
|
|
@@ -28,7 +37,9 @@ class DownloadQueue(
|
|
|
}
|
|
|
queue.addAll(downloads)
|
|
|
store.addAll(downloads)
|
|
|
- updatedRelay.call(Unit)
|
|
|
+ scope.launchNonCancellable {
|
|
|
+ _updates.send(Unit)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
fun remove(download: Download) {
|
|
@@ -40,7 +51,9 @@ class DownloadQueue(
|
|
|
download.status = Download.State.NOT_DOWNLOADED
|
|
|
}
|
|
|
if (removed) {
|
|
|
- updatedRelay.call(Unit)
|
|
|
+ scope.launchNonCancellable {
|
|
|
+ _updates.send(Unit)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -68,7 +81,9 @@ class DownloadQueue(
|
|
|
}
|
|
|
queue.clear()
|
|
|
store.clear()
|
|
|
- updatedRelay.call(Unit)
|
|
|
+ scope.launchNonCancellable {
|
|
|
+ _updates.send(Unit)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private fun getActiveDownloads(): Observable<Download> =
|
|
@@ -80,12 +95,6 @@ class DownloadQueue(
|
|
|
|
|
|
fun statusFlow(): Flow<Download> = getStatusObservable().asFlow()
|
|
|
|
|
|
- private fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
|
|
|
- .startWith(Unit)
|
|
|
- .map { this }
|
|
|
-
|
|
|
- fun updatedFlow(): Flow<List<Download>> = getUpdatedObservable().asFlow()
|
|
|
-
|
|
|
private fun setPagesFor(download: Download) {
|
|
|
if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) {
|
|
|
setPagesSubject(download.pages, null)
|