|
@@ -10,7 +10,13 @@ import eu.kanade.tachiyomi.ui.main.MainActivity
|
|
|
import eu.kanade.tachiyomi.widget.NpaLinearLayoutManager
|
|
|
import kotlinx.android.synthetic.main.fragment_download_queue.*
|
|
|
import nucleus.factory.RequiresPresenter
|
|
|
+import rx.Observable
|
|
|
import rx.Subscription
|
|
|
+import rx.android.schedulers.AndroidSchedulers
|
|
|
+import rx.schedulers.Schedulers
|
|
|
+import rx.subscriptions.CompositeSubscription
|
|
|
+import java.util.*
|
|
|
+import java.util.concurrent.TimeUnit
|
|
|
|
|
|
/**
|
|
|
* Fragment that shows the currently active downloads.
|
|
@@ -40,9 +46,14 @@ class DownloadFragment : BaseRxFragment<DownloadPresenter>() {
|
|
|
private var clearButton: MenuItem? = null
|
|
|
|
|
|
/**
|
|
|
- * Subscription to know if the download queue is running.
|
|
|
+ * Subscription list to be cleared during [onPause].
|
|
|
*/
|
|
|
- private var queueStatusSubscription: Subscription? = null
|
|
|
+ private val resumeSubscriptions by lazy { CompositeSubscription() }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Map of subscriptions for active downloads.
|
|
|
+ */
|
|
|
+ private val progressSubscriptions by lazy { HashMap<Download, Subscription>() }
|
|
|
|
|
|
/**
|
|
|
* Whether the download queue is running or not.
|
|
@@ -66,8 +77,7 @@ class DownloadFragment : BaseRxFragment<DownloadPresenter>() {
|
|
|
setHasOptionsMenu(true)
|
|
|
}
|
|
|
|
|
|
- override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?,
|
|
|
- savedState: Bundle?): View? {
|
|
|
+ override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedState: Bundle?): View {
|
|
|
return inflater.inflate(R.layout.fragment_download_queue, container, false)
|
|
|
}
|
|
|
|
|
@@ -123,15 +133,91 @@ class DownloadFragment : BaseRxFragment<DownloadPresenter>() {
|
|
|
|
|
|
override fun onResume() {
|
|
|
super.onResume()
|
|
|
- queueStatusSubscription = presenter.downloadManager.runningSubject
|
|
|
+ presenter.downloadManager.runningSubject
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
.subscribe { onQueueStatusChange(it) }
|
|
|
+ .apply { resumeSubscriptions.add(this) }
|
|
|
+
|
|
|
+ presenter.getStatusObservable()
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
+ .subscribe { onStatusChange(it) }
|
|
|
+ .apply { resumeSubscriptions.add(this) }
|
|
|
+
|
|
|
+ presenter.getProgressObservable()
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
+ .subscribe { onUpdateDownloadedPages(it) }
|
|
|
+ .apply { resumeSubscriptions.add(this) }
|
|
|
}
|
|
|
|
|
|
override fun onPause() {
|
|
|
- queueStatusSubscription?.unsubscribe()
|
|
|
+ for (subscription in progressSubscriptions.values) {
|
|
|
+ subscription.unsubscribe()
|
|
|
+ }
|
|
|
+ progressSubscriptions.clear()
|
|
|
+ resumeSubscriptions.clear()
|
|
|
super.onPause()
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Called when the status of a download changes.
|
|
|
+ *
|
|
|
+ * @param download the download whose status has changed.
|
|
|
+ */
|
|
|
+ private fun onStatusChange(download: Download) {
|
|
|
+ when (download.status) {
|
|
|
+ Download.DOWNLOADING -> {
|
|
|
+ observeProgress(download)
|
|
|
+ // Initial update of the downloaded pages
|
|
|
+ onUpdateDownloadedPages(download)
|
|
|
+ }
|
|
|
+ Download.DOWNLOADED -> {
|
|
|
+ unsubscribeProgress(download)
|
|
|
+ onUpdateProgress(download)
|
|
|
+ onUpdateDownloadedPages(download)
|
|
|
+ }
|
|
|
+ Download.ERROR -> unsubscribeProgress(download)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Observe the progress of a download and notify the view.
|
|
|
+ *
|
|
|
+ * @param download the download to observe its progress.
|
|
|
+ */
|
|
|
+ private fun observeProgress(download: Download) {
|
|
|
+ val subscription = Observable.interval(50, TimeUnit.MILLISECONDS, Schedulers.newThread())
|
|
|
+ // Get the sum of percentages for all the pages.
|
|
|
+ .flatMap {
|
|
|
+ Observable.from(download.pages)
|
|
|
+ .map { it.progress }
|
|
|
+ .reduce { x, y -> x + y }
|
|
|
+ }
|
|
|
+ // Keep only the latest emission to avoid backpressure.
|
|
|
+ .onBackpressureLatest()
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
+ .subscribe { progress ->
|
|
|
+ // Update the view only if the progress has changed.
|
|
|
+ if (download.totalProgress != progress) {
|
|
|
+ download.totalProgress = progress
|
|
|
+ onUpdateProgress(download)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Avoid leaking subscriptions
|
|
|
+ progressSubscriptions.remove(download)?.unsubscribe()
|
|
|
+
|
|
|
+ progressSubscriptions.put(download, subscription)
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Unsubscribes the given download from the progress subscriptions.
|
|
|
+ *
|
|
|
+ * @param download the download to unsubscribe.
|
|
|
+ */
|
|
|
+ private fun unsubscribeProgress(download: Download) {
|
|
|
+ progressSubscriptions.remove(download)?.unsubscribe()
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Called when the queue's status has changed. Updates the visibility of the buttons.
|
|
|
*
|
|
@@ -157,16 +243,16 @@ class DownloadFragment : BaseRxFragment<DownloadPresenter>() {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Called from the presenter when the status of a download changes.
|
|
|
+ * Called when the progress of a download changes.
|
|
|
*
|
|
|
- * @param download the download whose status has changed.
|
|
|
+ * @param download the download whose progress has changed.
|
|
|
*/
|
|
|
fun onUpdateProgress(download: Download) {
|
|
|
getHolder(download)?.notifyProgress()
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Called from the presenter when a page of a download is downloaded.
|
|
|
+ * Called when a page of a download is downloaded.
|
|
|
*
|
|
|
* @param download the download whose page has been downloaded.
|
|
|
*/
|