|
@@ -7,12 +7,10 @@ import com.google.gson.reflect.TypeToken;
|
|
|
import com.google.gson.stream.JsonReader;
|
|
|
|
|
|
import java.io.File;
|
|
|
-import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Type;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
|
|
|
import eu.kanade.mangafeed.data.models.Chapter;
|
|
@@ -33,16 +31,18 @@ import timber.log.Timber;
|
|
|
|
|
|
public class DownloadManager {
|
|
|
|
|
|
- private PublishSubject<DownloadChaptersEvent> downloadsSubject;
|
|
|
- private Subscription downloadSubscription;
|
|
|
- private Subscription threadNumberSubscription;
|
|
|
-
|
|
|
private Context context;
|
|
|
private SourceManager sourceManager;
|
|
|
private PreferencesHelper preferences;
|
|
|
private Gson gson;
|
|
|
|
|
|
+ private PublishSubject<Download> downloadsQueueSubject;
|
|
|
+ private BehaviorSubject<Integer> threadsNumber;
|
|
|
+ private Subscription downloadsSubscription;
|
|
|
+ private Subscription threadNumberSubscription;
|
|
|
+
|
|
|
private DownloadQueue queue;
|
|
|
+ private transient boolean isQueuePaused;
|
|
|
|
|
|
public static final String PAGE_LIST_FILE = "index.json";
|
|
|
|
|
@@ -54,78 +54,63 @@ public class DownloadManager {
|
|
|
|
|
|
queue = new DownloadQueue();
|
|
|
|
|
|
- initializeDownloadSubscription();
|
|
|
- }
|
|
|
-
|
|
|
- public PublishSubject<DownloadChaptersEvent> getDownloadsSubject() {
|
|
|
- return downloadsSubject;
|
|
|
+ initializeDownloadsSubscription();
|
|
|
}
|
|
|
|
|
|
- private void initializeDownloadSubscription() {
|
|
|
- if (downloadSubscription != null && !downloadSubscription.isUnsubscribed()) {
|
|
|
- downloadSubscription.unsubscribe();
|
|
|
- }
|
|
|
+ private void initializeDownloadsSubscription() {
|
|
|
+ if (downloadsSubscription != null && !downloadsSubscription.isUnsubscribed())
|
|
|
+ downloadsSubscription.unsubscribe();
|
|
|
|
|
|
if (threadNumberSubscription != null && !threadNumberSubscription.isUnsubscribed())
|
|
|
threadNumberSubscription.unsubscribe();
|
|
|
|
|
|
- downloadsSubject = PublishSubject.create();
|
|
|
- BehaviorSubject<Integer> threads = BehaviorSubject.create();
|
|
|
+ downloadsQueueSubject = PublishSubject.create();
|
|
|
+ threadsNumber = BehaviorSubject.create();
|
|
|
|
|
|
- threadNumberSubscription = preferences.getDownloadTheadsObs()
|
|
|
- .subscribe(threads::onNext);
|
|
|
+ threadNumberSubscription = preferences.getDownloadTheadsObservable()
|
|
|
+ .filter(n -> !isQueuePaused)
|
|
|
+ .doOnNext(n -> isQueuePaused = (n == 0))
|
|
|
+ .subscribe(threadsNumber::onNext);
|
|
|
|
|
|
- // Listen for download events, add them to queue and download
|
|
|
- downloadSubscription = downloadsSubject
|
|
|
- .subscribeOn(Schedulers.io())
|
|
|
- .flatMap(this::prepareDownloads)
|
|
|
- .lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threads))
|
|
|
+ downloadsSubscription = downloadsQueueSubject
|
|
|
+ .observeOn(Schedulers.newThread())
|
|
|
+ .lift(new DynamicConcurrentMergeOperator<>(this::downloadChapter, threadsNumber))
|
|
|
.onBackpressureBuffer()
|
|
|
.subscribe(page -> {},
|
|
|
e -> Timber.e(e.fillInStackTrace(), e.getMessage()));
|
|
|
}
|
|
|
|
|
|
- // Create a download object for every chapter and add it to the downloads queue
|
|
|
- private Observable<Download> prepareDownloads(DownloadChaptersEvent event) {
|
|
|
+ // Create a download object for every chapter in the event and add them to the downloads queue
|
|
|
+ public void onDownloadChaptersEvent(DownloadChaptersEvent event) {
|
|
|
final Manga manga = event.getManga();
|
|
|
final Source source = sourceManager.get(manga.source);
|
|
|
- List<Download> downloads = new ArrayList<>();
|
|
|
|
|
|
for (Chapter chapter : event.getChapters()) {
|
|
|
Download download = new Download(source, manga, chapter);
|
|
|
|
|
|
if (!isChapterDownloaded(download)) {
|
|
|
queue.add(download);
|
|
|
- downloads.add(download);
|
|
|
+ downloadsQueueSubject.onNext(download);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return Observable.from(downloads);
|
|
|
}
|
|
|
|
|
|
// Check if a chapter is already downloaded
|
|
|
private boolean isChapterDownloaded(Download download) {
|
|
|
// If the chapter is already queued, don't add it again
|
|
|
for (Download queuedDownload : queue.get()) {
|
|
|
- if (download.chapter.id == queuedDownload.chapter.id)
|
|
|
+ if (download.chapter.id.equals(queuedDownload.chapter.id))
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
// Add the directory to the download object for future access
|
|
|
download.directory = getAbsoluteChapterDirectory(download);
|
|
|
|
|
|
- // If the directory doesn't exist, the chapter isn't downloaded. Create it in this case
|
|
|
+ // If the directory doesn't exist, the chapter isn't downloaded.
|
|
|
if (!download.directory.exists()) {
|
|
|
- // FIXME Sometimes it's failing to create the directory... My fault?
|
|
|
- try {
|
|
|
- DiskUtils.createDirectory(download.directory);
|
|
|
- } catch (IOException e) {
|
|
|
- Timber.e("Unable to create directory for chapter");
|
|
|
- }
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
// If the page list doesn't exist, the chapter isn't download (or maybe it's,
|
|
|
// but we consider it's not)
|
|
|
List<Page> savedPages = getSavedPageList(download);
|
|
@@ -142,6 +127,12 @@ public class DownloadManager {
|
|
|
|
|
|
// Download the entire chapter
|
|
|
private Observable<Page> downloadChapter(Download download) {
|
|
|
+ try {
|
|
|
+ DiskUtils.createDirectory(download.directory);
|
|
|
+ } catch (IOException e) {
|
|
|
+ Timber.e(e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
Observable<List<Page>> pageListObservable = download.pages == null ?
|
|
|
// Pull page list from network and add them to download object
|
|
|
download.source
|
|
@@ -152,7 +143,6 @@ public class DownloadManager {
|
|
|
Observable.just(download.pages);
|
|
|
|
|
|
return pageListObservable
|
|
|
- .subscribeOn(Schedulers.io())
|
|
|
.doOnNext(pages -> download.setStatus(Download.DOWNLOADING))
|
|
|
// Get all the URLs to the source images, fetch pages if necessary
|
|
|
.flatMap(pageList -> Observable.merge(
|
|
@@ -161,31 +151,29 @@ public class DownloadManager {
|
|
|
// Start downloading images, consider we can have downloaded images already
|
|
|
.concatMap(page -> getDownloadedImage(page, download.source, download.directory))
|
|
|
// Do after download completes
|
|
|
- .doOnCompleted(() -> onChapterDownloaded(download));
|
|
|
+ .doOnCompleted(() -> onDownloadCompleted(download));
|
|
|
}
|
|
|
|
|
|
// Get downloaded image if exists, otherwise download it with the method below
|
|
|
public Observable<Page> getDownloadedImage(final Page page, Source source, File chapterDir) {
|
|
|
- Observable<Page> obs = Observable.just(page);
|
|
|
+ Observable<Page> pageObservable = Observable.just(page);
|
|
|
if (page.getImageUrl() == null)
|
|
|
- return obs;
|
|
|
+ return pageObservable;
|
|
|
|
|
|
String imageFilename = getImageFilename(page);
|
|
|
File imagePath = new File(chapterDir, imageFilename);
|
|
|
|
|
|
if (!isImageDownloaded(imagePath)) {
|
|
|
page.setStatus(Page.DOWNLOAD_IMAGE);
|
|
|
- obs = downloadImage(page, source, chapterDir, imageFilename);
|
|
|
+ pageObservable = downloadImage(page, source, chapterDir, imageFilename);
|
|
|
}
|
|
|
|
|
|
- return obs.flatMap(p -> {
|
|
|
- page.setImagePath(imagePath.getAbsolutePath());
|
|
|
- page.setStatus(Page.READY);
|
|
|
- return Observable.just(page);
|
|
|
- }).onErrorResumeNext(e -> {
|
|
|
- page.setStatus(Page.ERROR);
|
|
|
- return Observable.just(page);
|
|
|
- });
|
|
|
+ return pageObservable
|
|
|
+ .doOnNext(p -> p.setImagePath(imagePath.getAbsolutePath()))
|
|
|
+ .doOnNext(p -> p.setStatus(Page.READY))
|
|
|
+ .doOnError(e -> page.setStatus(Page.ERROR))
|
|
|
+ // Allow to download the remaining images
|
|
|
+ .onErrorResumeNext(e -> Observable.just(page));
|
|
|
}
|
|
|
|
|
|
// Download the image
|
|
@@ -210,31 +198,46 @@ public class DownloadManager {
|
|
|
}
|
|
|
|
|
|
private boolean isImageDownloaded(File imagePath) {
|
|
|
- return imagePath.exists() && !imagePath.isDirectory();
|
|
|
+ return imagePath.exists();
|
|
|
}
|
|
|
|
|
|
- private void onChapterDownloaded(final Download download) {
|
|
|
- download.setStatus(Download.DOWNLOADED);
|
|
|
- download.totalProgress = download.pages.size() * 100;
|
|
|
- savePageList(download.source, download.manga, download.chapter, download.pages);
|
|
|
+ // Called when a download finishes. This doesn't mean the download was successful, so we check it
|
|
|
+ private void onDownloadCompleted(final Download download) {
|
|
|
+ checkDownloadIsSuccessful(download);
|
|
|
+ savePageList(download);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkDownloadIsSuccessful(final Download download) {
|
|
|
+ int expectedProgress = download.pages.size() * 100;
|
|
|
+ int actualProgress = 0;
|
|
|
+ int status = Download.DOWNLOADED;
|
|
|
+ // If any page has an error, the download result will be error
|
|
|
+ for (Page page : download.pages) {
|
|
|
+ actualProgress += page.getProgress();
|
|
|
+ if (page.getStatus() == Page.ERROR) status = Download.ERROR;
|
|
|
+ }
|
|
|
+ // If the download is successful, it's safer to use the expected progress
|
|
|
+ download.totalProgress = (status == Download.DOWNLOADED) ? expectedProgress : actualProgress;
|
|
|
+ download.setStatus(status);
|
|
|
}
|
|
|
|
|
|
// Return the page list from the chapter's directory if it exists, null otherwise
|
|
|
public List<Page> getSavedPageList(Source source, Manga manga, Chapter chapter) {
|
|
|
+ List<Page> pages = null;
|
|
|
File chapterDir = getAbsoluteChapterDirectory(source, manga, chapter);
|
|
|
File pagesFile = new File(chapterDir, PAGE_LIST_FILE);
|
|
|
|
|
|
try {
|
|
|
if (pagesFile.exists()) {
|
|
|
JsonReader reader = new JsonReader(new FileReader(pagesFile.getAbsolutePath()));
|
|
|
-
|
|
|
Type collectionType = new TypeToken<List<Page>>() {}.getType();
|
|
|
- return gson.fromJson(reader, collectionType);
|
|
|
+ pages = gson.fromJson(reader, collectionType);
|
|
|
+ reader.close();
|
|
|
}
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
+ } catch (Exception e) {
|
|
|
Timber.e(e.fillInStackTrace(), e.getMessage());
|
|
|
}
|
|
|
- return null;
|
|
|
+ return pages;
|
|
|
}
|
|
|
|
|
|
// Shortcut for the method above
|
|
@@ -287,4 +290,13 @@ public class DownloadManager {
|
|
|
public DownloadQueue getQueue() {
|
|
|
return queue;
|
|
|
}
|
|
|
+
|
|
|
+ public void pauseDownloads() {
|
|
|
+ threadsNumber.onNext(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void resumeDownloads() {
|
|
|
+ isQueuePaused = false;
|
|
|
+ threadsNumber.onNext(preferences.getDownloadThreads());
|
|
|
+ }
|
|
|
}
|