|
@@ -16,7 +16,6 @@ import rx.Observable;
|
|
|
import rx.Subscription;
|
|
|
import rx.android.schedulers.AndroidSchedulers;
|
|
|
import rx.schedulers.Schedulers;
|
|
|
-import rx.subjects.PublishSubject;
|
|
|
import timber.log.Timber;
|
|
|
|
|
|
public class DownloadPresenter extends BasePresenter<DownloadFragment> {
|
|
@@ -25,8 +24,8 @@ public class DownloadPresenter extends BasePresenter<DownloadFragment> {
|
|
|
|
|
|
private DownloadQueue downloadQueue;
|
|
|
private Subscription statusSubscription;
|
|
|
+ private Subscription pageProgressSubscription;
|
|
|
private HashMap<Download, Subscription> progressSubscriptions;
|
|
|
- private HashMap<Download, Subscription> pageStatusSubscriptions;
|
|
|
|
|
|
public final static int GET_DOWNLOAD_QUEUE = 1;
|
|
|
|
|
@@ -36,7 +35,6 @@ public class DownloadPresenter extends BasePresenter<DownloadFragment> {
|
|
|
|
|
|
downloadQueue = downloadManager.getQueue();
|
|
|
progressSubscriptions = new HashMap<>();
|
|
|
- pageStatusSubscriptions = new HashMap<>();
|
|
|
|
|
|
restartableLatestCache(GET_DOWNLOAD_QUEUE,
|
|
|
() -> Observable.just(downloadQueue.get()),
|
|
@@ -57,6 +55,10 @@ public class DownloadPresenter extends BasePresenter<DownloadFragment> {
|
|
|
.subscribe(download -> {
|
|
|
processStatus(download, view);
|
|
|
}));
|
|
|
+
|
|
|
+ add(pageProgressSubscription = downloadQueue.getProgressObservable()
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
+ .subscribe(view::updateDownloadedPages));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -69,17 +71,16 @@ public class DownloadPresenter extends BasePresenter<DownloadFragment> {
|
|
|
switch (download.getStatus()) {
|
|
|
case Download.DOWNLOADING:
|
|
|
observeProgress(download, view);
|
|
|
- observePagesStatus(download, view);
|
|
|
+ // Initial update of the downloaded pages
|
|
|
+ view.updateDownloadedPages(download);
|
|
|
break;
|
|
|
case Download.DOWNLOADED:
|
|
|
unsubscribeProgress(download);
|
|
|
- unsubscribePagesStatus(download);
|
|
|
view.updateProgress(download);
|
|
|
view.updateDownloadedPages(download);
|
|
|
break;
|
|
|
case Download.ERROR:
|
|
|
unsubscribeProgress(download);
|
|
|
- unsubscribePagesStatus(download);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -104,62 +105,19 @@ public class DownloadPresenter extends BasePresenter<DownloadFragment> {
|
|
|
progressSubscriptions.put(download, subscription);
|
|
|
}
|
|
|
|
|
|
- private void observePagesStatus(Download download, DownloadFragment view) {
|
|
|
- // Initial update of the downloaded pages
|
|
|
- view.updateDownloadedPages(download);
|
|
|
-
|
|
|
- PublishSubject<Integer> pageStatusSubject = PublishSubject.create();
|
|
|
- for (Page page : download.pages) {
|
|
|
- if (page.getStatus() != Page.READY)
|
|
|
- page.setStatusSubject(pageStatusSubject);
|
|
|
- }
|
|
|
-
|
|
|
- Subscription subscription = pageStatusSubject
|
|
|
- .filter(status -> status == Page.READY)
|
|
|
- .observeOn(AndroidSchedulers.mainThread())
|
|
|
- .subscribe(status -> {
|
|
|
- view.updateDownloadedPages(download);
|
|
|
- });
|
|
|
-
|
|
|
- // Avoid leaking subscriptions
|
|
|
- Subscription oldSubscription = pageStatusSubscriptions.remove(download);
|
|
|
- if (oldSubscription != null) oldSubscription.unsubscribe();
|
|
|
-
|
|
|
- pageStatusSubscriptions.put(download, subscription);
|
|
|
- }
|
|
|
-
|
|
|
private void unsubscribeProgress(Download download) {
|
|
|
Subscription subscription = progressSubscriptions.remove(download);
|
|
|
if (subscription != null)
|
|
|
subscription.unsubscribe();
|
|
|
}
|
|
|
|
|
|
- private void unsubscribePagesStatus(Download download) {
|
|
|
- if (download.pages != null) {
|
|
|
- for (Page page : download.pages)
|
|
|
- page.setStatusSubject(null);
|
|
|
- }
|
|
|
-
|
|
|
- Subscription subscription = pageStatusSubscriptions.remove(download);
|
|
|
- if (subscription != null)
|
|
|
- subscription.unsubscribe();
|
|
|
- }
|
|
|
-
|
|
|
private void destroySubscriptions() {
|
|
|
- for (Download download : pageStatusSubscriptions.keySet()) {
|
|
|
- for (Page page : download.pages)
|
|
|
- page.setStatusSubject(null);
|
|
|
- }
|
|
|
- for (Subscription subscription : pageStatusSubscriptions.values()) {
|
|
|
- subscription.unsubscribe();
|
|
|
- }
|
|
|
- pageStatusSubscriptions.clear();
|
|
|
-
|
|
|
for (Subscription subscription : progressSubscriptions.values()) {
|
|
|
subscription.unsubscribe();
|
|
|
}
|
|
|
progressSubscriptions.clear();
|
|
|
|
|
|
+ remove(pageProgressSubscription);
|
|
|
remove(statusSubscription);
|
|
|
}
|
|
|
|