|
@@ -15,6 +15,8 @@ import java.io.IOException;
|
|
|
import java.lang.reflect.Type;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
|
|
import eu.kanade.tachiyomi.data.database.models.Chapter;
|
|
|
import eu.kanade.tachiyomi.data.database.models.Manga;
|
|
@@ -48,6 +50,8 @@ public class DownloadManager {
|
|
|
private DownloadQueue queue;
|
|
|
private volatile boolean isRunning;
|
|
|
|
|
|
+ private ExecutorService threadPool;
|
|
|
+
|
|
|
public static final String PAGE_LIST_FILE = "index.json";
|
|
|
|
|
|
public DownloadManager(Context context, SourceManager sourceManager, PreferencesHelper preferences) {
|
|
@@ -66,10 +70,11 @@ public class DownloadManager {
|
|
|
if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed())
|
|
|
downloadsSubscription.unsubscribe();
|
|
|
|
|
|
+ threadPool = Executors.newFixedThreadPool(preferences.downloadThreads());
|
|
|
+
|
|
|
downloadsSubscription = downloadsQueueSubject
|
|
|
- .concatMap(downloads -> Observable.from(downloads)
|
|
|
- .flatMap(this::downloadChapter, preferences.downloadThreads()))
|
|
|
- .onBackpressureBuffer()
|
|
|
+ .flatMap(Observable::from)
|
|
|
+ .flatMap(c -> downloadChapter(c).subscribeOn(Schedulers.from(threadPool)))
|
|
|
.observeOn(AndroidSchedulers.mainThread())
|
|
|
.map(download -> areAllDownloadsFinished())
|
|
|
.subscribe(finished -> {
|
|
@@ -94,6 +99,10 @@ public class DownloadManager {
|
|
|
downloadsSubscription.unsubscribe();
|
|
|
downloadsSubscription = null;
|
|
|
}
|
|
|
+
|
|
|
+ if (threadPool != null && !threadPool.isShutdown()) {
|
|
|
+ threadPool.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Create a download object for every chapter in the event and add them to the downloads queue
|
|
@@ -181,8 +190,7 @@ public class DownloadManager {
|
|
|
// Or if the page list already exists, start from the file
|
|
|
Observable.just(download.pages);
|
|
|
|
|
|
- return pageListObservable
|
|
|
- .subscribeOn(Schedulers.io())
|
|
|
+ return Observable.defer(() -> pageListObservable
|
|
|
.doOnNext(pages -> {
|
|
|
download.downloadedImages = 0;
|
|
|
download.setStatus(Download.DOWNLOADING);
|
|
@@ -199,7 +207,7 @@ public class DownloadManager {
|
|
|
.onErrorResumeNext(error -> {
|
|
|
download.setStatus(Download.ERROR);
|
|
|
return Observable.just(download);
|
|
|
- });
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
// Get the image from the filesystem if it exists or download from network
|