|
@@ -4,7 +4,6 @@ import android.os.Bundle;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import javax.inject.Inject;
|
|
|
|
|
@@ -76,12 +75,17 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
unsubscribeProgress(download);
|
|
|
unsubscribePagesStatus(download);
|
|
|
view.updateProgress(download);
|
|
|
+ view.updateDownloadedPages(download);
|
|
|
+ break;
|
|
|
+ case Download.ERROR:
|
|
|
+ unsubscribeProgress(download);
|
|
|
+ unsubscribePagesStatus(download);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void observeProgress(Download download, DownloadQueueFragment view) {
|
|
|
- Subscription subscription = Observable.interval(75, TimeUnit.MILLISECONDS, Schedulers.newThread())
|
|
|
+ Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS, Schedulers.newThread())
|
|
|
.flatMap(tick -> Observable.from(download.pages)
|
|
|
.map(Page::getProgress)
|
|
|
.reduce((x, y) -> x + y))
|
|
@@ -93,26 +97,31 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
}
|
|
|
});
|
|
|
|
|
|
+ // Avoid leaking subscriptions
|
|
|
+ Subscription oldSubscription = progressSubscriptions.remove(download);
|
|
|
+ if (oldSubscription != null) oldSubscription.unsubscribe();
|
|
|
+
|
|
|
progressSubscriptions.put(download, subscription);
|
|
|
}
|
|
|
|
|
|
private void observePagesStatus(Download download, DownloadQueueFragment view) {
|
|
|
PublishSubject<Integer> pageStatusSubject = PublishSubject.create();
|
|
|
- for (Page page : download.pages)
|
|
|
- page.setStatusSubject(pageStatusSubject);
|
|
|
-
|
|
|
- final AtomicInteger downloadedPages = new AtomicInteger(0);
|
|
|
+ for (Page page : download.pages) {
|
|
|
+ if (page.getStatus() != Page.READY)
|
|
|
+ page.setStatusSubject(pageStatusSubject);
|
|
|
+ }
|
|
|
|
|
|
Subscription subscription = pageStatusSubject
|
|
|
- .startWith(Observable.from(download.pages)
|
|
|
- .filter(page -> page.getStatus() == Page.READY)
|
|
|
- .map(page -> Page.READY))
|
|
|
.filter(status -> status == Page.READY)
|
|
|
- .map(status -> downloadedPages.incrementAndGet())
|
|
|
- .subscribe(count -> {
|
|
|
- // TODO
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
+ .subscribe(status -> {
|
|
|
+ view.updateDownloadedPages(download);
|
|
|
});
|
|
|
|
|
|
+ // Avoid leaking subscriptions
|
|
|
+ Subscription oldSubscription = progressSubscriptions.remove(download);
|
|
|
+ if (oldSubscription != null) oldSubscription.unsubscribe();
|
|
|
+
|
|
|
pageStatusSubscriptions.put(download, subscription);
|
|
|
}
|
|
|
|
|
@@ -123,8 +132,10 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
}
|
|
|
|
|
|
private void unsubscribePagesStatus(Download download) {
|
|
|
- for (Page page : download.pages)
|
|
|
- page.setStatusSubject(null);
|
|
|
+ if (download.pages != null) {
|
|
|
+ for (Page page : download.pages)
|
|
|
+ page.setStatusSubject(null);
|
|
|
+ }
|
|
|
|
|
|
Subscription subscription = pageStatusSubscriptions.remove(download);
|
|
|
if (subscription != null)
|
|
@@ -136,7 +147,6 @@ public class DownloadQueuePresenter extends BasePresenter<DownloadQueueFragment>
|
|
|
for (Page page : download.pages)
|
|
|
page.setStatusSubject(null);
|
|
|
}
|
|
|
-
|
|
|
for (Subscription subscription : pageStatusSubscriptions.values()) {
|
|
|
subscription.unsubscribe();
|
|
|
}
|