Parcourir la source

Remove custom presenter class

len il y a 8 ans
Parent
commit
96a39f5c54

+ 5 - 4
app/src/main/java/eu/kanade/tachiyomi/ui/backup/BackupPresenter.kt

@@ -4,6 +4,7 @@ import android.os.Bundle
 import eu.kanade.tachiyomi.data.backup.BackupManager
 import eu.kanade.tachiyomi.data.database.DatabaseHelper
 import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
+import eu.kanade.tachiyomi.util.isNullOrUnsubscribed
 import rx.Observable
 import rx.Subscription
 import rx.android.schedulers.AndroidSchedulers
@@ -48,13 +49,13 @@ class BackupPresenter : BasePresenter<BackupFragment>() {
      * @param file the path where the file will be saved.
      */
     fun createBackup(file: File) {
-        if (isUnsubscribed(backupSubscription)) {
+        if (backupSubscription.isNullOrUnsubscribed()) {
             backupSubscription = getBackupObservable(file)
                     .subscribeOn(Schedulers.io())
                     .observeOn(AndroidSchedulers.mainThread())
                     .subscribeFirst(
                             { view, result -> view.onBackupCompleted(file) },
-                            { view, error -> view.onBackupError(error) })
+                            BackupFragment::onBackupError)
         }
     }
 
@@ -64,13 +65,13 @@ class BackupPresenter : BasePresenter<BackupFragment>() {
      * @param stream the input stream of the backup file.
      */
     fun restoreBackup(stream: InputStream) {
-        if (isUnsubscribed(restoreSubscription)) {
+        if (restoreSubscription.isNullOrUnsubscribed()) {
             restoreSubscription = getRestoreObservable(stream)
                     .subscribeOn(Schedulers.io())
                     .observeOn(AndroidSchedulers.mainThread())
                     .subscribeFirst(
                             { view, result -> view.onRestoreCompleted() },
-                            { view, error -> view.onRestoreError(error) })
+                            BackupFragment::onRestoreError)
         }
     }
 

+ 1 - 0
app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/BasePresenter.kt

@@ -1,6 +1,7 @@
 package eu.kanade.tachiyomi.ui.base.presenter
 
 import android.content.Context
+import nucleus.presenter.RxPresenter
 import nucleus.view.ViewWithPresenter
 import rx.Observable
 

+ 0 - 492
app/src/main/java/eu/kanade/tachiyomi/ui/base/presenter/RxPresenter.java

@@ -1,492 +0,0 @@
-package eu.kanade.tachiyomi.ui.base.presenter;
-
-import android.os.Bundle;
-import android.support.annotation.CallSuper;
-import android.support.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import nucleus.presenter.Presenter;
-import nucleus.presenter.delivery.DeliverFirst;
-import nucleus.presenter.delivery.DeliverLatestCache;
-import nucleus.presenter.delivery.DeliverReplay;
-import nucleus.presenter.delivery.Delivery;
-import rx.Observable;
-import rx.Subscription;
-import rx.functions.Action1;
-import rx.functions.Action2;
-import rx.functions.Func0;
-import rx.internal.util.SubscriptionList;
-import rx.subjects.BehaviorSubject;
-
-/**
- * This is an extension of {@link Presenter} which provides RxJava functionality.
- *
- * @param <View> a type of view.
- */
-public class RxPresenter<View> extends Presenter<View> {
-
-    private static final String REQUESTED_KEY = RxPresenter.class.getName() + "#requested";
-
-    private final BehaviorSubject<View> views = BehaviorSubject.create();
-    private final SubscriptionList subscriptions = new SubscriptionList();
-
-    private final HashMap<Integer, Func0<Subscription>> restartables = new HashMap<>();
-    private final HashMap<Integer, Subscription> restartableSubscriptions = new HashMap<>();
-    private final ArrayList<Integer> requested = new ArrayList<>();
-
-    /**
-     * Returns an {@link rx.Observable} that emits the current attached view or null.
-     * See {@link BehaviorSubject} for more information.
-     *
-     * @return an observable that emits the current attached view or null.
-     */
-    public Observable<View> view() {
-        return views;
-    }
-
-    /**
-     * Registers a subscription to automatically unsubscribe it during onDestroy.
-     * See {@link SubscriptionList#add(Subscription) for details.}
-     *
-     * @param subscription a subscription to add.
-     */
-    public void add(Subscription subscription) {
-        subscriptions.add(subscription);
-    }
-
-    /**
-     * Removes and unsubscribes a subscription that has been registered with {@link #add} previously.
-     * See {@link SubscriptionList#remove(Subscription)} for details.
-     *
-     * @param subscription a subscription to remove.
-     */
-    public void remove(Subscription subscription) {
-        subscriptions.remove(subscription);
-    }
-
-    /**
-     * A restartable is any RxJava observable that can be started (subscribed) and
-     * should be automatically restarted (re-subscribed) after a process restart if
-     * it was still subscribed at the moment of saving presenter's state.
-     *
-     * Registers a factory. Re-subscribes the restartable after the process restart.
-     *
-     * @param restartableId id of the restartable
-     * @param factory       factory of the restartable
-     */
-    public void restartable(int restartableId, Func0<Subscription> factory) {
-        restartables.put(restartableId, factory);
-        if (requested.contains(restartableId))
-            start(restartableId);
-    }
-
-    /**
-     * Starts the given restartable.
-     *
-     * @param restartableId id of the restartable
-     */
-    public void start(int restartableId) {
-        stop(restartableId);
-        requested.add(restartableId);
-        restartableSubscriptions.put(restartableId, restartables.get(restartableId).call());
-    }
-
-    /**
-     * Unsubscribes a restartable
-     *
-     * @param restartableId id of a restartable.
-     */
-    public void stop(int restartableId) {
-        requested.remove((Integer) restartableId);
-        Subscription subscription = restartableSubscriptions.get(restartableId);
-        if (subscription != null)
-            subscription.unsubscribe();
-    }
-
-    /**
-     * Checks if a restartable is unsubscribed.
-     *
-     * @param restartableId id of the restartable.
-     * @return true if the subscription is null or unsubscribed, false otherwise.
-     */
-    public boolean isUnsubscribed(int restartableId) {
-        return isUnsubscribed(restartableSubscriptions.get(restartableId));
-    }
-
-    /**
-     * Checks if a subscription is unsubscribed.
-     *
-     * @param subscription the subscription to check.
-     * @return true if the subscription is null or unsubscribed, false otherwise.
-     */
-    public boolean isUnsubscribed(@Nullable Subscription subscription) {
-        return subscription == null || subscription.isUnsubscribed();
-    }
-
-    /**
-     * This is a shortcut that can be used instead of combining together
-     * {@link #restartable(int, Func0)},
-     * {@link #deliverFirst()},
-     * {@link #split(Action2, Action2)}.
-     *
-     * @param restartableId     an id of the restartable.
-     * @param observableFactory a factory that should return an Observable when the restartable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     * @param <T>               the type of the observable.
-     */
-    public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory,
-        final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-
-        restartable(restartableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {
-                return observableFactory.call()
-                    .compose(RxPresenter.this.<T>deliverFirst())
-                    .subscribe(split(onNext, onError));
-            }
-        });
-    }
-
-    /**
-     * This is a shortcut for calling {@link #restartableFirst(int, Func0, Action2, Action2)} with the last parameter = null.
-     */
-    public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
-        restartableFirst(restartableId, observableFactory, onNext, null);
-    }
-
-    /**
-     * This is a shortcut that can be used instead of combining together
-     * {@link #restartable(int, Func0)},
-     * {@link #deliverLatestCache()},
-     * {@link #split(Action2, Action2)}.
-     *
-     * @param restartableId     an id of the restartable.
-     * @param observableFactory a factory that should return an Observable when the restartable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     * @param <T>               the type of the observable.
-     */
-    public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory,
-        final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-
-        restartable(restartableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {
-                return observableFactory.call()
-                    .compose(RxPresenter.this.<T>deliverLatestCache())
-                    .subscribe(split(onNext, onError));
-            }
-        });
-    }
-
-    /**
-     * This is a shortcut for calling {@link #restartableLatestCache(int, Func0, Action2, Action2)} with the last parameter = null.
-     */
-    public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
-        restartableLatestCache(restartableId, observableFactory, onNext, null);
-    }
-
-    /**
-     * This is a shortcut that can be used instead of combining together
-     * {@link #restartable(int, Func0)},
-     * {@link #deliverReplay()},
-     * {@link #split(Action2, Action2)}.
-     *
-     * @param restartableId     an id of the restartable.
-     * @param observableFactory a factory that should return an Observable when the restartable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     * @param <T>               the type of the observable.
-     */
-    public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory,
-        final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-
-        restartable(restartableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {
-                return observableFactory.call()
-                    .compose(RxPresenter.this.<T>deliverReplay())
-                    .subscribe(split(onNext, onError));
-            }
-        });
-    }
-
-    /**
-     * This is a shortcut for calling {@link #restartableReplay(int, Func0, Action2, Action2)} with the last parameter = null.
-     */
-    public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
-        restartableReplay(restartableId, observableFactory, onNext, null);
-    }
-
-    /**
-     * A startable behaves the same as a restartable but it does not resubscribe on process restart
-     *
-     * @param startableId       an id of the restartable.
-     * @param observableFactory a factory that should return an Observable when the startable should run.
-     */
-    public <T> void startable(int startableId, final Func0<Observable<T>> observableFactory) {
-        restartables.put(startableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {return observableFactory.call().subscribe();}
-        });
-    }
-
-    /**
-     * A startable behaves the same as a restartable but it does not resubscribe on process restart
-     *
-     * @param startableId       an id of the restartable.
-     * @param observableFactory a factory that should return an Observable when the startable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     */
-    public <T> void startable(int startableId, final Func0<Observable<T>> observableFactory,
-        final Action1<T> onNext, final Action1<Throwable> onError) {
-
-        restartables.put(startableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {return observableFactory.call().subscribe(onNext, onError);}
-        });
-    }
-
-    /**
-     * A startable behaves the same as a restartable but it does not resubscribe on process restart
-     *
-     * @param startableId       an id of the restartable.
-     * @param observableFactory a factory that should return an Observable when the startable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     */
-    public <T> void startable(int startableId, final Func0<Observable<T>> observableFactory, final Action1<T> onNext) {
-        restartables.put(startableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {return observableFactory.call().subscribe(onNext);}
-        });
-    }
-    
-    /**
-     * This is a shortcut that can be used instead of combining together
-     * {@link #startable(int, Func0)},
-     * {@link #deliverFirst()},
-     * {@link #split(Action2, Action2)}.
-     *
-     * @param startableId       an id of the startable.
-     * @param observableFactory a factory that should return an Observable when the startable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     * @param <T>               the type of the observable.
-     */
-    public <T> void startableFirst(int startableId, final Func0<Observable<T>> observableFactory,
-        final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-
-        restartables.put(startableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {
-                return observableFactory.call()
-                        .compose(RxPresenter.this.<T>deliverFirst())
-                        .subscribe(split(onNext, onError));
-            }
-        });
-    }
-
-    /**
-     * This is a shortcut for calling {@link #startableFirst(int, Func0, Action2, Action2)} with the last parameter = null.
-     */
-    public <T> void startableFirst(int startableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
-        startableFirst(startableId, observableFactory, onNext, null);
-    }
-
-    /**
-     * This is a shortcut that can be used instead of combining together
-     * {@link #startable(int, Func0)},
-     * {@link #deliverLatestCache()},
-     * {@link #split(Action2, Action2)}.
-     *
-     * @param startableId       an id of the startable.
-     * @param observableFactory a factory that should return an Observable when the startable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     * @param <T>               the type of the observable.
-     */
-    public <T> void startableLatestCache(int startableId, final Func0<Observable<T>> observableFactory,
-        final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-
-        restartables.put(startableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {
-                return observableFactory.call()
-                        .compose(RxPresenter.this.<T>deliverLatestCache())
-                        .subscribe(split(onNext, onError));
-            }
-        });
-    }
-
-    /**
-     * This is a shortcut for calling {@link #startableLatestCache(int, Func0, Action2, Action2)} with the last parameter = null.
-     */
-    public <T> void startableLatestCache(int startableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
-        startableLatestCache(startableId, observableFactory, onNext, null);
-    }
-
-    /**
-     * This is a shortcut that can be used instead of combining together
-     * {@link #startable(int, Func0)},
-     * {@link #deliverReplay()},
-     * {@link #split(Action2, Action2)}.
-     *
-     * @param startableId       an id of the startable.
-     * @param observableFactory a factory that should return an Observable when the startable should run.
-     * @param onNext            a callback that will be called when received data should be delivered to view.
-     * @param onError           a callback that will be called if the source observable emits onError.
-     * @param <T>               the type of the observable.
-     */
-    public <T> void startableReplay(int startableId, final Func0<Observable<T>> observableFactory,
-        final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-
-        restartables.put(startableId, new Func0<Subscription>() {
-            @Override
-            public Subscription call() {
-                return observableFactory.call()
-                        .compose(RxPresenter.this.<T>deliverReplay())
-                        .subscribe(split(onNext, onError));
-            }
-        });
-    }
-
-    /**
-     * This is a shortcut for calling {@link #startableReplay(int, Func0, Action2, Action2)} with the last parameter = null.
-     */
-    public <T> void startableReplay(int startableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
-        startableReplay(startableId, observableFactory, onNext, null);
-    }
-
-    /**
-     * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
-     * the source {@link rx.Observable}.
-     *
-     * {@link #deliverLatestCache} keeps the latest onNext value and emits it each time a new view gets attached.
-     * If a new onNext value appears while a view is attached, it will be delivered immediately.
-     *
-     * @param <T> the type of source observable emissions
-     */
-    public <T> DeliverLatestCache<View, T> deliverLatestCache() {
-        return new DeliverLatestCache<>(views);
-    }
-
-    /**
-     * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
-     * the source {@link rx.Observable}.
-     *
-     * {@link #deliverFirst} delivers only the first onNext value that has been emitted by the source observable.
-     *
-     * @param <T> the type of source observable emissions
-     */
-    public <T> DeliverFirst<View, T> deliverFirst() {
-        return new DeliverFirst<>(views);
-    }
-
-    /**
-     * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
-     * the source {@link rx.Observable}.
-     *
-     * {@link #deliverReplay} keeps all onNext values and emits them each time a new view gets attached.
-     * If a new onNext value appears while a view is attached, it will be delivered immediately.
-     *
-     * @param <T> the type of source observable emissions
-     */
-    public <T> DeliverReplay<View, T> deliverReplay() {
-        return new DeliverReplay<>(views);
-    }
-
-    /**
-     * Returns a method that can be used for manual restartable chain build. It returns an Action1 that splits
-     * a received {@link Delivery} into two {@link Action2} onNext and onError calls.
-     *
-     * @param onNext  a method that will be called if the delivery contains an emitted onNext value.
-     * @param onError a method that will be called if the delivery contains an onError throwable.
-     * @param <T>     a type on onNext value.
-     * @return an Action1 that splits a received {@link Delivery} into two {@link Action2} onNext and onError calls.
-     */
-    public <T> Action1<Delivery<View, T>> split(final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
-        return new Action1<Delivery<View, T>>() {
-            @Override
-            public void call(Delivery<View, T> delivery) {
-                delivery.split(onNext, onError);
-            }
-        };
-    }
-
-    /**
-     * This is a shortcut for calling {@link #split(Action2, Action2)} when the second parameter is null.
-     */
-    public <T> Action1<Delivery<View, T>> split(Action2<View, T> onNext) {
-        return split(onNext, null);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @CallSuper
-    @Override
-    protected void onCreate(Bundle savedState) {
-        if (savedState != null)
-            requested.addAll(savedState.getIntegerArrayList(REQUESTED_KEY));
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @CallSuper
-    @Override
-    protected void onDestroy() {
-        views.onCompleted();
-        subscriptions.unsubscribe();
-        for (Map.Entry<Integer, Subscription> entry : restartableSubscriptions.entrySet())
-            entry.getValue().unsubscribe();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @CallSuper
-    @Override
-    protected void onSave(Bundle state) {
-        for (int i = requested.size() - 1; i >= 0; i--) {
-            int restartableId = requested.get(i);
-            Subscription subscription = restartableSubscriptions.get(restartableId);
-            if (subscription != null && subscription.isUnsubscribed())
-                requested.remove(i);
-        }
-        state.putIntegerArrayList(REQUESTED_KEY, requested);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @CallSuper
-    @Override
-    protected void onTakeView(View view) {
-        views.onNext(view);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @CallSuper
-    @Override
-    protected void onDropView() {
-        views.onNext(null);
-    }
-
-    /**
-     * Please, use restartableXX and deliverXX methods for pushing data from RxPresenter into View.
-     */
-    @Deprecated
-    @Nullable
-    @Override
-    public View getView() {
-        return super.getView();
-    }
-}

+ 3 - 2
app/src/main/java/eu/kanade/tachiyomi/ui/manga/MangaPresenter.kt

@@ -8,6 +8,7 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
 import eu.kanade.tachiyomi.ui.manga.info.ChapterCountEvent
 import eu.kanade.tachiyomi.ui.manga.info.MangaFavoriteEvent
 import eu.kanade.tachiyomi.util.SharedData
+import eu.kanade.tachiyomi.util.isNullOrUnsubscribed
 import rx.Observable
 import rx.Subscription
 import uy.kohesive.injekt.injectLazy
@@ -44,10 +45,10 @@ class MangaPresenter : BasePresenter<MangaActivity>() {
     }
 
     fun setMangaEvent(event: MangaEvent) {
-        if (isUnsubscribed(mangaSubscription)) {
+        if (mangaSubscription.isNullOrUnsubscribed()) {
             manga = event.manga
             mangaSubscription = Observable.just(manga)
-                    .subscribeLatestCache({ view, manga -> view.onSetManga(manga) })
+                    .subscribeLatestCache(MangaActivity::onSetManga)
         }
     }
 

+ 44 - 65
app/src/main/java/eu/kanade/tachiyomi/ui/manga/chapter/ChaptersPresenter.kt

@@ -1,6 +1,7 @@
 package eu.kanade.tachiyomi.ui.manga.chapter
 
 import android.os.Bundle
+import com.jakewharton.rxrelay.PublishRelay
 import eu.kanade.tachiyomi.data.database.DatabaseHelper
 import eu.kanade.tachiyomi.data.database.models.Chapter
 import eu.kanade.tachiyomi.data.database.models.Manga
@@ -15,11 +16,12 @@ import eu.kanade.tachiyomi.ui.manga.MangaEvent
 import eu.kanade.tachiyomi.ui.manga.info.ChapterCountEvent
 import eu.kanade.tachiyomi.ui.manga.info.MangaFavoriteEvent
 import eu.kanade.tachiyomi.util.SharedData
+import eu.kanade.tachiyomi.util.isNullOrUnsubscribed
 import eu.kanade.tachiyomi.util.syncChaptersWithSource
 import rx.Observable
+import rx.Subscription
 import rx.android.schedulers.AndroidSchedulers
 import rx.schedulers.Schedulers
-import rx.subjects.PublishSubject
 import timber.log.Timber
 import uy.kohesive.injekt.injectLazy
 
@@ -69,8 +71,8 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
     /**
      * Subject of list of chapters to allow updating the view without going to DB.
      */
-    val chaptersSubject: PublishSubject<List<ChapterModel>>
-            by lazy { PublishSubject.create<List<ChapterModel>>() }
+    val chaptersRelay: PublishRelay<List<ChapterModel>>
+            by lazy { PublishRelay.create<List<ChapterModel>>() }
 
     /**
      * Whether the chapter list has been requested to the source.
@@ -78,56 +80,33 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
     var hasRequested = false
         private set
 
-    companion object {
-        /**
-         * Id of the restartable which sends a filtered and ordered list of chapters to the view.
-         */
-        private const val GET_CHAPTERS = 1
-
-        /**
-         * Id of the restartable which requests an updated list of chapters to the source.
-         */
-        private const val FETCH_CHAPTERS = 2
-
-        /**
-         * Id of the restartable which listens for download status changes.
-         */
-        private const val CHAPTER_STATUS_CHANGES = 3
-    }
+    /**
+     * Subscription to retrieve the new list of chapters from the source.
+     */
+    private var fetchChaptersSubscription: Subscription? = null
+
+    /**
+     * Subscription to observe download status changes.
+     */
+    private var observeDownloadsSubscription: Subscription? = null
 
     override fun onCreate(savedState: Bundle?) {
         super.onCreate(savedState)
 
-        startableLatestCache(GET_CHAPTERS,
-                // On each subject emission, apply filters and sort then update the view.
-                { chaptersSubject
-                        .flatMap { applyChapterFilters(it) }
-                        .observeOn(AndroidSchedulers.mainThread())
-                }, ChaptersFragment::onNextChapters)
-
-        startableFirst(FETCH_CHAPTERS,
-                { getRemoteChaptersObservable() },
-                { view, result -> view.onFetchChaptersDone() },
-                ChaptersFragment::onFetchChaptersError)
-
-        startableLatestCache(CHAPTER_STATUS_CHANGES,
-                { getChapterStatusObservable() },
-                ChaptersFragment::onChapterStatusChange,
-                { view, error -> Timber.e(error) })
-
         // Find the active manga from the shared data or return.
         manga = SharedData.get(MangaEvent::class.java)?.manga ?: return
+        source = sourceManager.get(manga.source)!!
         Observable.just(manga)
                 .subscribeLatestCache(ChaptersFragment::onNextManga)
 
-        // Find the source for this manga.
-        source = sourceManager.get(manga.source)!!
-
-        // Prepare the publish subject.
-        start(GET_CHAPTERS)
+        // Prepare the relay.
+        chaptersRelay.flatMap { applyChapterFilters(it) }
+                .observeOn(AndroidSchedulers.mainThread())
+                .subscribeLatestCache(ChaptersFragment::onNextChapters,
+                        { view, error -> Timber.e(error) })
 
         // Add the subscription that retrieves the chapters from the database, keeps subscribed to
-        // changes, and sends the list of chapters to the publish subject.
+        // changes, and sends the list of chapters to the relay.
         add(db.getChapters(manga).asRxObservable()
                 .map { chapters ->
                     // Convert every chapter to a model.
@@ -141,12 +120,22 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
                     this.chapters = chapters
 
                     // Listen for download status changes
-                    start(CHAPTER_STATUS_CHANGES)
+                    observeDownloads()
 
                     // Emit the number of chapters to the info tab.
                     SharedData.get(ChapterCountEvent::class.java)?.emit(chapters.size)
                 }
-                .subscribe { chaptersSubject.onNext(it) })
+                .subscribe { chaptersRelay.call(it) })
+    }
+
+    private fun observeDownloads() {
+        observeDownloadsSubscription?.let { remove(it) }
+        observeDownloadsSubscription = downloadManager.queue.getStatusObservable()
+                .observeOn(AndroidSchedulers.mainThread())
+                .filter { download -> download.manga.id == manga.id }
+                .doOnNext { onDownloadStatusChange(it) }
+                .subscribeLatestCache(ChaptersFragment::onChapterStatusChange,
+                        { view, error -> Timber.e(error) })
     }
 
     /**
@@ -186,34 +175,24 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
      */
     fun fetchChaptersFromSource() {
         hasRequested = true
-        start(FETCH_CHAPTERS)
+
+        if (!fetchChaptersSubscription.isNullOrUnsubscribed()) return
+        fetchChaptersSubscription = Observable.defer { source.fetchChapterList(manga) }
+                .subscribeOn(Schedulers.io())
+                .map { syncChaptersWithSource(db, it, manga, source) }
+                .observeOn(AndroidSchedulers.mainThread())
+                .subscribeFirst({ view, chapters ->
+                    view.onFetchChaptersDone()
+                }, ChaptersFragment::onFetchChaptersError)
     }
 
     /**
      * Updates the UI after applying the filters.
      */
     private fun refreshChapters() {
-        chaptersSubject.onNext(chapters)
+        chaptersRelay.call(chapters)
     }
 
-    /**
-     * Returns an observable that updates the chapter list with the latest from the source.
-     */
-    fun getRemoteChaptersObservable(): Observable<Pair<List<Chapter>, List<Chapter>>> =
-            Observable.defer { source.fetchChapterList(manga) }
-                    .subscribeOn(Schedulers.io())
-                    .map { syncChaptersWithSource(db, it, manga, source) }
-                    .observeOn(AndroidSchedulers.mainThread())
-
-    /**
-     * Returns an observable that listens to download queue status changes.
-     */
-    fun getChapterStatusObservable(): Observable<Download> =
-            downloadManager.queue.getStatusObservable()
-                    .observeOn(AndroidSchedulers.mainThread())
-                    .filter { download -> download.manga.id == manga.id }
-                    .doOnNext { onDownloadStatusChange(it) }
-
     /**
      * Applies the view filters to the list of chapters obtained from the database.
      * @param chapters the list of chapters from the database
@@ -224,7 +203,7 @@ class ChaptersPresenter : BasePresenter<ChaptersFragment>() {
         if (onlyUnread()) {
             observable = observable.filter { !it.read }
         }
-        if (onlyRead()) {
+        else if (onlyRead()) {
             observable = observable.filter { it.read }
         }
         if (onlyDownloaded()) {

+ 27 - 37
app/src/main/java/eu/kanade/tachiyomi/ui/manga/info/MangaInfoPresenter.kt

@@ -9,7 +9,9 @@ import eu.kanade.tachiyomi.data.source.SourceManager
 import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
 import eu.kanade.tachiyomi.ui.manga.MangaEvent
 import eu.kanade.tachiyomi.util.SharedData
+import eu.kanade.tachiyomi.util.isNullOrUnsubscribed
 import rx.Observable
+import rx.Subscription
 import rx.android.schedulers.AndroidSchedulers
 import rx.schedulers.Schedulers
 import uy.kohesive.injekt.injectLazy
@@ -49,32 +51,21 @@ class MangaInfoPresenter : BasePresenter<MangaInfoFragment>() {
     val coverCache: CoverCache by injectLazy()
 
     /**
-     * The id of the restartable.
+     * Subscription to send the manga to the view.
      */
-    private val GET_MANGA = 1
+    private var viewMangaSubcription: Subscription? = null
 
     /**
-     * The id of the restartable.
+     * Subscription to update the manga from the source.
      */
-    private val FETCH_MANGA_INFO = 2
+    private var fetchMangaSubscription: Subscription? = null
 
     override fun onCreate(savedState: Bundle?) {
         super.onCreate(savedState)
 
-        // Notify the view a manga is available or has changed.
-        startableLatestCache(GET_MANGA,
-                { Observable.just(manga) },
-                { view, manga -> view.onNextManga(manga, source) })
-
-        // Fetch manga info from source.
-        startableFirst(FETCH_MANGA_INFO,
-                { fetchMangaObs() },
-                { view, manga -> view.onFetchMangaDone() },
-                { view, error -> view.onFetchMangaError() })
-
         manga = SharedData.get(MangaEvent::class.java)?.manga ?: return
         source = sourceManager.get(manga.source)!!
-        refreshManga()
+        sendMangaToView()
 
         // Update chapter count
         SharedData.get(ChapterCountEvent::class.java)?.observable
@@ -88,30 +79,34 @@ class MangaInfoPresenter : BasePresenter<MangaInfoFragment>() {
     }
 
     /**
-     * Fetch manga information from source.
+     * Sends the active manga to the view.
      */
-    fun fetchMangaFromSource() {
-        if (isUnsubscribed(FETCH_MANGA_INFO)) {
-            start(FETCH_MANGA_INFO)
-        }
+    fun sendMangaToView() {
+        viewMangaSubcription?.let { remove(it) }
+        viewMangaSubcription = Observable.just(manga)
+                .subscribeLatestCache({ view, manga -> view.onNextManga(manga, source) })
     }
 
     /**
      * Fetch manga information from source.
-     *
-     * @return manga information.
      */
-    private fun fetchMangaObs(): Observable<Manga> {
-        return Observable.defer { source.fetchMangaDetails(manga) }
-                .flatMap { networkManga ->
+    fun fetchMangaFromSource() {
+        if (!fetchMangaSubscription.isNullOrUnsubscribed()) return
+        fetchMangaSubscription = Observable.defer { source.fetchMangaDetails(manga) }
+                .map { networkManga ->
                     manga.copyFrom(networkManga)
                     manga.initialized = true
                     db.insertManga(manga).executeAsBlocking()
-                    Observable.just<Manga>(manga)
+                    manga
                 }
                 .subscribeOn(Schedulers.io())
                 .observeOn(AndroidSchedulers.mainThread())
-                .doOnNext { refreshManga() }
+                .doOnNext { sendMangaToView() }
+                .subscribeFirst({ view, manga ->
+                    view.onFetchMangaDone()
+                }, { view, error ->
+                    view.onFetchMangaError()
+                })
     }
 
     /**
@@ -123,19 +118,14 @@ class MangaInfoPresenter : BasePresenter<MangaInfoFragment>() {
             coverCache.deleteFromCache(manga.thumbnail_url)
         }
         db.insertManga(manga).executeAsBlocking()
-        refreshManga()
+        sendMangaToView()
     }
 
-    private fun setFavorite(favorite:Boolean){
-        if (manga.favorite == favorite)
+    private fun setFavorite(favorite: Boolean) {
+        if (manga.favorite == favorite) {
             return
+        }
         toggleFavorite()
     }
 
-    /**
-     * Refresh MangaInfo view.
-     */
-    private fun refreshManga() {
-        start(GET_MANGA)
-    }
 }