|
@@ -40,7 +40,6 @@ import rx.Observable
|
|
|
import rx.Subscription
|
|
|
import rx.android.schedulers.AndroidSchedulers
|
|
|
import rx.schedulers.Schedulers
|
|
|
-import rx.subscriptions.CompositeSubscription
|
|
|
import tachiyomi.core.util.lang.awaitSingle
|
|
|
import tachiyomi.core.util.lang.launchIO
|
|
|
import tachiyomi.core.util.lang.launchNow
|
|
@@ -61,7 +60,7 @@ import java.util.zip.ZipOutputStream
|
|
|
* This class is the one in charge of downloading chapters.
|
|
|
*
|
|
|
* Its [queue] contains the list of chapters to download. In order to download them, the downloader
|
|
|
- * subscriptions must be running and the list of chapters must be sent to them by [downloadsRelay].
|
|
|
+ * subscription must be running and the list of chapters must be sent to them by [downloadsRelay].
|
|
|
*
|
|
|
* The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected
|
|
|
* behavior, but it's safe to read it from multiple threads.
|
|
@@ -97,9 +96,9 @@ class Downloader(
|
|
|
private val notifier by lazy { DownloadNotifier(context) }
|
|
|
|
|
|
/**
|
|
|
- * Downloader subscriptions.
|
|
|
+ * Downloader subscription.
|
|
|
*/
|
|
|
- private val subscriptions = CompositeSubscription()
|
|
|
+ private var subscription: Subscription? = null
|
|
|
|
|
|
/**
|
|
|
* Relay to send a list of downloads to the downloader.
|
|
@@ -109,9 +108,14 @@ class Downloader(
|
|
|
/**
|
|
|
* Whether the downloader is running.
|
|
|
*/
|
|
|
+ val isRunning: Boolean
|
|
|
+ get() = subscription != null
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether the downloader is paused
|
|
|
+ */
|
|
|
@Volatile
|
|
|
- var isRunning: Boolean = false
|
|
|
- private set
|
|
|
+ var isPaused: Boolean = false
|
|
|
|
|
|
init {
|
|
|
launchNow {
|
|
@@ -127,18 +131,16 @@ class Downloader(
|
|
|
* @return true if the downloader is started, false otherwise.
|
|
|
*/
|
|
|
fun start(): Boolean {
|
|
|
- if (isRunning || queue.isEmpty()) {
|
|
|
+ if (subscription != null || queue.isEmpty()) {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
- if (!subscriptions.hasSubscriptions()) {
|
|
|
- initializeSubscriptions()
|
|
|
- }
|
|
|
+ initializeSubscription()
|
|
|
|
|
|
val pending = queue.filter { it.status != Download.State.DOWNLOADED }
|
|
|
pending.forEach { if (it.status != Download.State.QUEUE) it.status = Download.State.QUEUE }
|
|
|
|
|
|
- notifier.paused = false
|
|
|
+ isPaused = false
|
|
|
|
|
|
downloadsRelay.call(pending)
|
|
|
return pending.isNotEmpty()
|
|
@@ -148,7 +150,7 @@ class Downloader(
|
|
|
* Stops the downloader.
|
|
|
*/
|
|
|
fun stop(reason: String? = null) {
|
|
|
- destroySubscriptions()
|
|
|
+ destroySubscription()
|
|
|
queue
|
|
|
.filter { it.status == Download.State.DOWNLOADING }
|
|
|
.forEach { it.status = Download.State.ERROR }
|
|
@@ -158,36 +160,31 @@ class Downloader(
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if (notifier.paused && queue.isNotEmpty()) {
|
|
|
+ if (isPaused && queue.isNotEmpty()) {
|
|
|
notifier.onPaused()
|
|
|
} else {
|
|
|
notifier.onComplete()
|
|
|
}
|
|
|
|
|
|
- notifier.paused = false
|
|
|
+ isPaused = false
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Pauses the downloader
|
|
|
*/
|
|
|
fun pause() {
|
|
|
- destroySubscriptions()
|
|
|
+ destroySubscription()
|
|
|
queue
|
|
|
.filter { it.status == Download.State.DOWNLOADING }
|
|
|
.forEach { it.status = Download.State.QUEUE }
|
|
|
- notifier.paused = true
|
|
|
+ isPaused = true
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Check if downloader is paused
|
|
|
- */
|
|
|
- fun isPaused() = !isRunning
|
|
|
-
|
|
|
/**
|
|
|
* Removes everything from the queue.
|
|
|
*/
|
|
|
fun clearQueue() {
|
|
|
- destroySubscriptions()
|
|
|
+ destroySubscription()
|
|
|
|
|
|
queue.clear()
|
|
|
notifier.dismissProgress()
|
|
@@ -196,12 +193,10 @@ class Downloader(
|
|
|
/**
|
|
|
* Prepares the subscriptions to start downloading.
|
|
|
*/
|
|
|
- private fun initializeSubscriptions() {
|
|
|
- if (isRunning) return
|
|
|
- isRunning = true
|
|
|
+ private fun initializeSubscription() {
|
|
|
+ if (subscription != null) return
|
|
|
|
|
|
- subscriptions.clear()
|
|
|
- subscriptions += downloadsRelay.concatMapIterable { it }
|
|
|
+ subscription = downloadsRelay.concatMapIterable { it }
|
|
|
// Concurrently download from 5 different sources
|
|
|
.groupBy { it.source }
|
|
|
.flatMap(
|
|
@@ -232,11 +227,9 @@ class Downloader(
|
|
|
/**
|
|
|
* Destroys the downloader subscriptions.
|
|
|
*/
|
|
|
- private fun destroySubscriptions() {
|
|
|
- if (!isRunning) return
|
|
|
- isRunning = false
|
|
|
-
|
|
|
- subscriptions.clear()
|
|
|
+ private fun destroySubscription() {
|
|
|
+ subscription?.unsubscribe()
|
|
|
+ subscription = null
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -652,8 +645,6 @@ class Downloader(
|
|
|
return queue.none { it.status.value <= Download.State.DOWNLOADING.value }
|
|
|
}
|
|
|
|
|
|
- private operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)
|
|
|
-
|
|
|
companion object {
|
|
|
const val TMP_DIR_SUFFIX = "_tmp"
|
|
|
const val WARNING_NOTIF_TIMEOUT_MS = 30_000L
|