|
@@ -28,6 +28,7 @@ import eu.kanade.mangafeed.util.DiskUtils;
|
|
|
import eu.kanade.mangafeed.util.DynamicConcurrentMergeOperator;
|
|
|
import rx.Observable;
|
|
|
import rx.Subscription;
|
|
|
+import rx.android.schedulers.AndroidSchedulers;
|
|
|
import rx.schedulers.Schedulers;
|
|
|
import rx.subjects.BehaviorSubject;
|
|
|
import rx.subjects.PublishSubject;
|
|
@@ -46,7 +47,8 @@ public class DownloadManager {
|
|
|
private Subscription threadsNumberSubscription;
|
|
|
|
|
|
private DownloadQueue queue;
|
|
|
- private transient boolean isQueuePaused;
|
|
|
+ private volatile boolean isQueuePaused;
|
|
|
+ private volatile boolean isRunning;
|
|
|
|
|
|
public static final String PAGE_LIST_FILE = "index.json";
|
|
|
|
|
@@ -54,9 +56,12 @@ public class DownloadManager {
|
|
|
this.context = context;
|
|
|
this.sourceManager = sourceManager;
|
|
|
this.preferences = preferences;
|
|
|
- this.gson = new Gson();
|
|
|
|
|
|
+ gson = new Gson();
|
|
|
queue = new DownloadQueue();
|
|
|
+
|
|
|
+ downloadsQueueSubject = PublishSubject.create();
|
|
|
+ threadsNumber = BehaviorSubject.create();
|
|
|
}
|
|
|
|
|
|
public void initializeSubscriptions() {
|
|
@@ -66,9 +71,6 @@ public class DownloadManager {
|
|
|
if (threadsNumberSubscription != null && !threadsNumberSubscription.isUnsubscribed())
|
|
|
threadsNumberSubscription.unsubscribe();
|
|
|
|
|
|
- downloadsQueueSubject = PublishSubject.create();
|
|
|
- threadsNumber = BehaviorSubject.create();
|
|
|
-
|
|
|
threadsNumberSubscription = preferences.getDownloadTheadsObservable()
|
|
|
.filter(n -> !isQueuePaused)
|
|
|
.doOnNext(n -> isQueuePaused = (n == 0))
|
|
@@ -78,11 +80,19 @@ public class DownloadManager {
|
|
|
.observeOn(Schedulers.newThread())
|
|
|
.lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threadsNumber))
|
|
|
.onBackpressureBuffer()
|
|
|
- .subscribe(page -> {},
|
|
|
- e -> Timber.e(e.fillInStackTrace(), e.getMessage()));
|
|
|
+ .observeOn(AndroidSchedulers.mainThread())
|
|
|
+ .subscribe(finished -> {
|
|
|
+ if (finished) {
|
|
|
+ DownloadService.stop(context);
|
|
|
+ }
|
|
|
+ }, e -> Timber.e(e.fillInStackTrace(), e.getMessage()));
|
|
|
+
|
|
|
+ isRunning = true;
|
|
|
}
|
|
|
|
|
|
public void destroySubscriptions() {
|
|
|
+ isRunning = false;
|
|
|
+
|
|
|
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed()) {
|
|
|
downloadsSubscription.unsubscribe();
|
|
|
downloadsSubscription = null;
|
|
@@ -104,6 +114,7 @@ public class DownloadManager {
|
|
|
|
|
|
if (!isChapterDownloaded(download)) {
|
|
|
queue.add(download);
|
|
|
+ if (isRunning) downloadsQueueSubject.onNext(download);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -139,7 +150,7 @@ public class DownloadManager {
|
|
|
}
|
|
|
|
|
|
// Download the entire chapter
|
|
|
- private Observable<Page> downloadChapter(Download download) {
|
|
|
+ private Observable<Boolean> downloadChapter(Download download) {
|
|
|
try {
|
|
|
DiskUtils.createDirectory(download.directory);
|
|
|
} catch (IOException e) {
|
|
@@ -164,7 +175,9 @@ public class DownloadManager {
|
|
|
// Start downloading images, consider we can have downloaded images already
|
|
|
.concatMap(page -> getDownloadedImage(page, download.source, download.directory))
|
|
|
// Do after download completes
|
|
|
- .doOnCompleted(() -> onDownloadCompleted(download));
|
|
|
+ .doOnCompleted(() -> onDownloadCompleted(download))
|
|
|
+ .toList()
|
|
|
+ .flatMap(pages -> Observable.just(areAllDownloadsFinished()));
|
|
|
}
|
|
|
|
|
|
// Get downloaded image if exists, otherwise download it with the method below
|
|
@@ -229,9 +242,6 @@ public class DownloadManager {
|
|
|
private void onDownloadCompleted(final Download download) {
|
|
|
checkDownloadIsSuccessful(download);
|
|
|
savePageList(download);
|
|
|
- if (areAllDownloadsFinished()) {
|
|
|
- DownloadService.stop(context);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
private void checkDownloadIsSuccessful(final Download download) {
|
|
@@ -336,20 +346,27 @@ public class DownloadManager {
|
|
|
threadsNumber.onNext(0);
|
|
|
}
|
|
|
|
|
|
- public void startDownloads() {
|
|
|
+ public boolean startDownloads() {
|
|
|
+ boolean hasPendingDownloads = false;
|
|
|
if (downloadsSubscription == null || threadsNumberSubscription == null)
|
|
|
initializeSubscriptions();
|
|
|
|
|
|
for (Download download : queue.get()) {
|
|
|
if (download.getStatus() != Download.DOWNLOADED) {
|
|
|
download.setStatus(Download.QUEUE);
|
|
|
+ if (!hasPendingDownloads) hasPendingDownloads = true;
|
|
|
downloadsQueueSubject.onNext(download);
|
|
|
}
|
|
|
}
|
|
|
+ return hasPendingDownloads;
|
|
|
}
|
|
|
|
|
|
public void stopDownloads() {
|
|
|
destroySubscriptions();
|
|
|
}
|
|
|
|
|
|
+ public boolean isRunning() {
|
|
|
+ return isRunning;
|
|
|
+ }
|
|
|
+
|
|
|
}
|