RxPresenter.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package nucleus.presenter;
  2. import android.os.Bundle;
  3. import android.support.annotation.CallSuper;
  4. import android.support.annotation.Nullable;
  5. import java.util.ArrayList;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import nucleus.presenter.delivery.DeliverFirst;
  9. import nucleus.presenter.delivery.DeliverLatestCache;
  10. import nucleus.presenter.delivery.DeliverReplay;
  11. import nucleus.presenter.delivery.Delivery;
  12. import rx.Observable;
  13. import rx.Subscription;
  14. import rx.functions.Action1;
  15. import rx.functions.Action2;
  16. import rx.functions.Func0;
  17. import rx.internal.util.SubscriptionList;
  18. import rx.subjects.BehaviorSubject;
  19. /**
  20. * This is an extension of {@link Presenter} which provides RxJava functionality.
  21. *
  22. * @param <View> a type of view.
  23. */
  24. public class RxPresenter<View> extends Presenter<View> {
  25. private static final String REQUESTED_KEY = RxPresenter.class.getName() + "#requested";
  26. private final BehaviorSubject<View> views = BehaviorSubject.create();
  27. private final SubscriptionList subscriptions = new SubscriptionList();
  28. private final HashMap<Integer, Func0<Subscription>> restartables = new HashMap<>();
  29. private final HashMap<Integer, Subscription> restartableSubscriptions = new HashMap<>();
  30. private final ArrayList<Integer> requested = new ArrayList<>();
  31. /**
  32. * Returns an {@link rx.Observable} that emits the current attached view or null.
  33. * See {@link BehaviorSubject} for more information.
  34. *
  35. * @return an observable that emits the current attached view or null.
  36. */
  37. public Observable<View> view() {
  38. return views;
  39. }
  40. /**
  41. * Registers a subscription to automatically unsubscribe it during onDestroy.
  42. * See {@link SubscriptionList#add(Subscription) for details.}
  43. *
  44. * @param subscription a subscription to add.
  45. */
  46. public void add(Subscription subscription) {
  47. subscriptions.add(subscription);
  48. }
  49. /**
  50. * Removes and unsubscribes a subscription that has been registered with {@link #add} previously.
  51. * See {@link SubscriptionList#remove(Subscription)} for details.
  52. *
  53. * @param subscription a subscription to remove.
  54. */
  55. public void remove(Subscription subscription) {
  56. subscriptions.remove(subscription);
  57. }
  58. /**
  59. * A restartable is any RxJava observable that can be started (subscribed) and
  60. * should be automatically restarted (re-subscribed) after a process restart if
  61. * it was still subscribed at the moment of saving presenter's state.
  62. *
  63. * Registers a factory. Re-subscribes the restartable after the process restart.
  64. *
  65. * @param restartableId id of the restartable
  66. * @param factory factory of the restartable
  67. */
  68. public void restartable(int restartableId, Func0<Subscription> factory) {
  69. restartables.put(restartableId, factory);
  70. if (requested.contains(restartableId))
  71. start(restartableId);
  72. }
  73. /**
  74. * Starts the given restartable.
  75. *
  76. * @param restartableId id of the restartable
  77. */
  78. public void start(int restartableId) {
  79. stop(restartableId);
  80. requested.add(restartableId);
  81. restartableSubscriptions.put(restartableId, restartables.get(restartableId).call());
  82. }
  83. /**
  84. * Unsubscribes a restartable
  85. *
  86. * @param restartableId id of a restartable.
  87. */
  88. public void stop(int restartableId) {
  89. requested.remove((Integer) restartableId);
  90. Subscription subscription = restartableSubscriptions.get(restartableId);
  91. if (subscription != null)
  92. subscription.unsubscribe();
  93. }
  94. /**
  95. * Checks if a restartable is unsubscribed.
  96. *
  97. * @param restartableId id of the restartable.
  98. * @return true if the subscription is null or unsubscribed, false otherwise.
  99. */
  100. public boolean isUnsubscribed(int restartableId) {
  101. Subscription subscription = restartableSubscriptions.get(restartableId);
  102. return subscription == null || subscription.isUnsubscribed();
  103. }
  104. /**
  105. * This is a shortcut that can be used instead of combining together
  106. * {@link #restartable(int, Func0)},
  107. * {@link #deliverFirst()},
  108. * {@link #split(Action2, Action2)}.
  109. *
  110. * @param restartableId an id of the restartable.
  111. * @param observableFactory a factory that should return an Observable when the restartable should run.
  112. * @param onNext a callback that will be called when received data should be delivered to view.
  113. * @param onError a callback that will be called if the source observable emits onError.
  114. * @param <T> the type of the observable.
  115. */
  116. public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory,
  117. final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
  118. restartable(restartableId, new Func0<Subscription>() {
  119. @Override
  120. public Subscription call() {
  121. return observableFactory.call()
  122. .compose(RxPresenter.this.<T>deliverFirst())
  123. .subscribe(split(onNext, onError));
  124. }
  125. });
  126. }
  127. /**
  128. * This is a shortcut for calling {@link #restartableFirst(int, Func0, Action2, Action2)} with the last parameter = null.
  129. */
  130. public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
  131. restartableFirst(restartableId, observableFactory, onNext, null);
  132. }
  133. /**
  134. * This is a shortcut that can be used instead of combining together
  135. * {@link #restartable(int, Func0)},
  136. * {@link #deliverLatestCache()},
  137. * {@link #split(Action2, Action2)}.
  138. *
  139. * @param restartableId an id of the restartable.
  140. * @param observableFactory a factory that should return an Observable when the restartable should run.
  141. * @param onNext a callback that will be called when received data should be delivered to view.
  142. * @param onError a callback that will be called if the source observable emits onError.
  143. * @param <T> the type of the observable.
  144. */
  145. public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory,
  146. final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
  147. restartable(restartableId, new Func0<Subscription>() {
  148. @Override
  149. public Subscription call() {
  150. return observableFactory.call()
  151. .compose(RxPresenter.this.<T>deliverLatestCache())
  152. .subscribe(split(onNext, onError));
  153. }
  154. });
  155. }
  156. /**
  157. * This is a shortcut for calling {@link #restartableLatestCache(int, Func0, Action2, Action2)} with the last parameter = null.
  158. */
  159. public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
  160. restartableLatestCache(restartableId, observableFactory, onNext, null);
  161. }
  162. /**
  163. * This is a shortcut that can be used instead of combining together
  164. * {@link #restartable(int, Func0)},
  165. * {@link #deliverReplay()},
  166. * {@link #split(Action2, Action2)}.
  167. *
  168. * @param restartableId an id of the restartable.
  169. * @param observableFactory a factory that should return an Observable when the restartable should run.
  170. * @param onNext a callback that will be called when received data should be delivered to view.
  171. * @param onError a callback that will be called if the source observable emits onError.
  172. * @param <T> the type of the observable.
  173. */
  174. public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory,
  175. final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
  176. restartable(restartableId, new Func0<Subscription>() {
  177. @Override
  178. public Subscription call() {
  179. return observableFactory.call()
  180. .compose(RxPresenter.this.<T>deliverReplay())
  181. .subscribe(split(onNext, onError));
  182. }
  183. });
  184. }
  185. /**
  186. * This is a shortcut for calling {@link #restartableReplay(int, Func0, Action2, Action2)} with the last parameter = null.
  187. */
  188. public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext) {
  189. restartableReplay(restartableId, observableFactory, onNext, null);
  190. }
  191. /**
  192. * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
  193. * the source {@link rx.Observable}.
  194. *
  195. * {@link #deliverLatestCache} keeps the latest onNext value and emits it each time a new view gets attached.
  196. * If a new onNext value appears while a view is attached, it will be delivered immediately.
  197. *
  198. * @param <T> the type of source observable emissions
  199. */
  200. public <T> DeliverLatestCache<View, T> deliverLatestCache() {
  201. return new DeliverLatestCache<>(views);
  202. }
  203. /**
  204. * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
  205. * the source {@link rx.Observable}.
  206. *
  207. * {@link #deliverFirst} delivers only the first onNext value that has been emitted by the source observable.
  208. *
  209. * @param <T> the type of source observable emissions
  210. */
  211. public <T> DeliverFirst<View, T> deliverFirst() {
  212. return new DeliverFirst<>(views);
  213. }
  214. /**
  215. * Returns an {@link rx.Observable.Transformer} that couples views with data that has been emitted by
  216. * the source {@link rx.Observable}.
  217. *
  218. * {@link #deliverReplay} keeps all onNext values and emits them each time a new view gets attached.
  219. * If a new onNext value appears while a view is attached, it will be delivered immediately.
  220. *
  221. * @param <T> the type of source observable emissions
  222. */
  223. public <T> DeliverReplay<View, T> deliverReplay() {
  224. return new DeliverReplay<>(views);
  225. }
  226. /**
  227. * Returns a method that can be used for manual restartable chain build. It returns an Action1 that splits
  228. * a received {@link Delivery} into two {@link Action2} onNext and onError calls.
  229. *
  230. * @param onNext a method that will be called if the delivery contains an emitted onNext value.
  231. * @param onError a method that will be called if the delivery contains an onError throwable.
  232. * @param <T> a type on onNext value.
  233. * @return an Action1 that splits a received {@link Delivery} into two {@link Action2} onNext and onError calls.
  234. */
  235. public <T> Action1<Delivery<View, T>> split(final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) {
  236. return new Action1<Delivery<View, T>>() {
  237. @Override
  238. public void call(Delivery<View, T> delivery) {
  239. delivery.split(onNext, onError);
  240. }
  241. };
  242. }
  243. /**
  244. * This is a shortcut for calling {@link #split(Action2, Action2)} when the second parameter is null.
  245. */
  246. public <T> Action1<Delivery<View, T>> split(Action2<View, T> onNext) {
  247. return split(onNext, null);
  248. }
  249. /**
  250. * {@inheritDoc}
  251. */
  252. @CallSuper
  253. @Override
  254. protected void onCreate(Bundle savedState) {
  255. if (savedState != null)
  256. requested.addAll(savedState.getIntegerArrayList(REQUESTED_KEY));
  257. }
  258. /**
  259. * {@inheritDoc}
  260. */
  261. @CallSuper
  262. @Override
  263. protected void onDestroy() {
  264. views.onCompleted();
  265. subscriptions.unsubscribe();
  266. for (Map.Entry<Integer, Subscription> entry : restartableSubscriptions.entrySet())
  267. entry.getValue().unsubscribe();
  268. }
  269. /**
  270. * {@inheritDoc}
  271. */
  272. @CallSuper
  273. @Override
  274. protected void onSave(Bundle state) {
  275. for (int i = requested.size() - 1; i >= 0; i--) {
  276. int restartableId = requested.get(i);
  277. Subscription subscription = restartableSubscriptions.get(restartableId);
  278. if (subscription != null && subscription.isUnsubscribed())
  279. requested.remove(i);
  280. }
  281. state.putIntegerArrayList(REQUESTED_KEY, requested);
  282. }
  283. /**
  284. * {@inheritDoc}
  285. */
  286. @CallSuper
  287. @Override
  288. protected void onTakeView(View view) {
  289. views.onNext(view);
  290. }
  291. /**
  292. * {@inheritDoc}
  293. */
  294. @CallSuper
  295. @Override
  296. protected void onDropView() {
  297. views.onNext(null);
  298. }
  299. /**
  300. * Please, use restartableXX and deliverXX methods for pushing data from RxPresenter into View.
  301. */
  302. @Deprecated
  303. @Nullable
  304. @Override
  305. public View getView() {
  306. return super.getView();
  307. }
  308. }