|
@@ -4,6 +4,7 @@ import android.os.Bundle;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import javax.inject.Inject;
|
|
|
|
|
@@ -16,6 +17,7 @@ import rx.Observable;
|
|
|
import rx.Subscription;
|
|
|
import rx.android.schedulers.AndroidSchedulers;
|
|
|
import rx.schedulers.Schedulers;
|
|
|
+import rx.subjects.PublishSubject;
|
|
|
import timber.log.Timber;
|
|
|
|
|
|
public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment> {
|
|
@@ -25,6 +27,7 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
private DownloadQueue downloadQueue;
|
|
|
private Subscription statusSubscription;
|
|
|
private HashMap<Download, Subscription> progressSubscriptions;
|
|
|
+ private HashMap<Download, Subscription> pageStatusSubscriptions;
|
|
|
|
|
|
public final static int GET_DOWNLOAD_QUEUE = 1;
|
|
|
|
|
@@ -34,6 +37,7 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
|
|
|
downloadQueue = downloadManager.getQueue();
|
|
|
progressSubscriptions = new HashMap<>();
|
|
|
+ pageStatusSubscriptions = new HashMap<>();
|
|
|
|
|
|
restartableLatestCache(GET_DOWNLOAD_QUEUE,
|
|
|
() -> Observable.just(downloadQueue.get()),
|
|
@@ -48,12 +52,12 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
protected void onTakeView(DownloadQueueFragment view) {
|
|
|
super.onTakeView(view);
|
|
|
|
|
|
- statusSubscription = downloadQueue.getStatusObservable()
|
|
|
+ add(statusSubscription = downloadQueue.getStatusObservable()
|
|
|
.subscribeOn(Schedulers.io())
|
|
|
.observeOn(AndroidSchedulers.mainThread())
|
|
|
.subscribe(download -> {
|
|
|
processStatus(download, view);
|
|
|
- });
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -66,9 +70,11 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
switch (download.getStatus()) {
|
|
|
case Download.DOWNLOADING:
|
|
|
observeProgress(download, view);
|
|
|
+ observePagesStatus(download, view);
|
|
|
break;
|
|
|
case Download.DOWNLOADED:
|
|
|
unsubscribeProgress(download);
|
|
|
+ unsubscribePagesStatus(download);
|
|
|
download.totalProgress = download.pages.size() * 100;
|
|
|
view.updateProgress(download);
|
|
|
break;
|
|
@@ -89,13 +95,52 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
progressSubscriptions.put(download, subscription);
|
|
|
}
|
|
|
|
|
|
+ private void observePagesStatus(Download download, DownloadQueueFragment view) {
|
|
|
+ PublishSubject<Integer> pageStatusSubject = PublishSubject.create();
|
|
|
+ for (Page page : download.pages)
|
|
|
+ page.setStatusSubject(pageStatusSubject);
|
|
|
+
|
|
|
+ final AtomicInteger downloadedPages = new AtomicInteger(0);
|
|
|
+
|
|
|
+ Subscription subscription = pageStatusSubject
|
|
|
+ .startWith(Observable.from(download.pages)
|
|
|
+ .filter(page -> page.getStatus() == Page.READY)
|
|
|
+ .map(page -> Page.READY))
|
|
|
+ .filter(status -> status == Page.READY)
|
|
|
+ .map(status -> downloadedPages.incrementAndGet())
|
|
|
+ .subscribe(count -> {
|
|
|
+ // TODO
|
|
|
+ });
|
|
|
+
|
|
|
+ pageStatusSubscriptions.put(download, subscription);
|
|
|
+ }
|
|
|
+
|
|
|
private void unsubscribeProgress(Download download) {
|
|
|
Subscription subscription = progressSubscriptions.remove(download);
|
|
|
if (subscription != null)
|
|
|
subscription.unsubscribe();
|
|
|
}
|
|
|
|
|
|
+ private void unsubscribePagesStatus(Download download) {
|
|
|
+ for (Page page : download.pages)
|
|
|
+ page.setStatusSubject(null);
|
|
|
+
|
|
|
+ Subscription subscription = pageStatusSubscriptions.remove(download);
|
|
|
+ if (subscription != null)
|
|
|
+ subscription.unsubscribe();
|
|
|
+ }
|
|
|
+
|
|
|
private void destroySubscriptions() {
|
|
|
+ for (Download download : pageStatusSubscriptions.keySet()) {
|
|
|
+ for (Page page : download.pages)
|
|
|
+ page.setStatusSubject(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Subscription subscription : pageStatusSubscriptions.values()) {
|
|
|
+ subscription.unsubscribe();
|
|
|
+ }
|
|
|
+ pageStatusSubscriptions.clear();
|
|
|
+
|
|
|
for (Subscription subscription : progressSubscriptions.values()) {
|
|
|
subscription.unsubscribe();
|
|
|
}
|