DynamicConcurrentMergeOperator.java 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package eu.kanade.mangafeed.util;
  2. import java.util.Queue;
  3. import java.util.concurrent.ConcurrentLinkedQueue;
  4. import java.util.concurrent.CopyOnWriteArrayList;
  5. import java.util.concurrent.atomic.AtomicBoolean;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. import rx.Observable;
  8. import rx.Observable.Operator;
  9. import rx.Subscriber;
  10. import rx.Subscription;
  11. import rx.functions.Func1;
  12. import rx.subscriptions.CompositeSubscription;
  13. import rx.subscriptions.Subscriptions;
  14. public class DynamicConcurrentMergeOperator<T, R> implements Operator<R, T> {
  15. final Func1<? super T, ? extends Observable<? extends R>> mapper;
  16. final Observable<Integer> workerCount;
  17. public DynamicConcurrentMergeOperator(
  18. Func1<? super T, ? extends Observable<? extends R>> mapper,
  19. Observable<Integer> workerCount) {
  20. this.mapper = mapper;
  21. this.workerCount = workerCount;
  22. }
  23. @Override
  24. public Subscriber<? super T> call(Subscriber<? super R> t) {
  25. DynamicConcurrentMerge<T, R> parent = new DynamicConcurrentMerge<>(t, mapper);
  26. t.add(parent);
  27. parent.init(workerCount);
  28. return parent;
  29. }
  30. static final class DynamicConcurrentMerge<T, R> extends Subscriber<T> {
  31. final Subscriber<? super R> actual;
  32. final Func1<? super T, ? extends Observable<? extends R>> mapper;
  33. final Queue<T> queue;
  34. final CopyOnWriteArrayList<DynamicWorker<T, R>> workers;
  35. final CompositeSubscription composite;
  36. final AtomicInteger wipActive;
  37. final AtomicBoolean once;
  38. long id;
  39. public DynamicConcurrentMerge(Subscriber<? super R> actual,
  40. Func1<? super T, ? extends Observable<? extends R>> mapper) {
  41. this.actual = actual;
  42. this.mapper = mapper;
  43. this.queue = new ConcurrentLinkedQueue<>();
  44. this.workers = new CopyOnWriteArrayList<>();
  45. this.composite = new CompositeSubscription();
  46. this.wipActive = new AtomicInteger(1);
  47. this.once = new AtomicBoolean();
  48. this.add(composite);
  49. this.request(0);
  50. }
  51. public void init(Observable<Integer> workerCount) {
  52. Subscription wc = workerCount.subscribe(n -> {
  53. int n0 = workers.size();
  54. if (n0 < n) {
  55. for (int i = n0; i < n; i++) {
  56. DynamicWorker<T, R> dw = new DynamicWorker<>(++id, this);
  57. workers.add(dw);
  58. request(1);
  59. dw.tryNext();
  60. }
  61. } else if (n0 > n) {
  62. for (int i = 0; i < n; i++) {
  63. workers.get(i).start();
  64. }
  65. for (int i = n0 - 1; i >= n; i--) {
  66. workers.get(i).stop();
  67. }
  68. }
  69. if (!once.get() && once.compareAndSet(false, true)) {
  70. request(n);
  71. }
  72. }, this::onError);
  73. composite.add(wc);
  74. }
  75. void requestMore(long n) {
  76. request(n);
  77. }
  78. @Override
  79. public void onNext(T t) {
  80. queue.offer(t);
  81. wipActive.getAndIncrement();
  82. for (DynamicWorker<T, R> w : workers) {
  83. w.tryNext();
  84. }
  85. }
  86. @Override
  87. public void onError(Throwable e) {
  88. composite.unsubscribe();
  89. actual.onError(e);
  90. }
  91. @Override
  92. public void onCompleted() {
  93. if (wipActive.decrementAndGet() == 0) {
  94. actual.onCompleted();
  95. }
  96. }
  97. }
  98. static final class DynamicWorker<T, R> {
  99. final long id;
  100. final AtomicBoolean running;
  101. final DynamicConcurrentMerge<T, R> parent;
  102. final AtomicBoolean stop;
  103. public DynamicWorker(long id, DynamicConcurrentMerge<T, R> parent) {
  104. this.id = id;
  105. this.parent = parent;
  106. this.stop = new AtomicBoolean();
  107. this.running = new AtomicBoolean();
  108. }
  109. public void tryNext() {
  110. if (!running.get() && running.compareAndSet(false, true)) {
  111. T t;
  112. if (stop.get()) {
  113. parent.workers.remove(this);
  114. return;
  115. }
  116. t = parent.queue.poll();
  117. if (t == null) {
  118. running.set(false);
  119. return;
  120. }
  121. Observable out = parent.mapper.call(t);
  122. Subscriber<R> s = new Subscriber<R>() {
  123. @Override
  124. public void onNext(R t) {
  125. parent.actual.onNext(t);
  126. }
  127. @Override
  128. public void onError(Throwable e) {
  129. parent.onError(e);
  130. }
  131. @Override
  132. public void onCompleted() {
  133. parent.onCompleted();
  134. if (parent.wipActive.get() != 0) {
  135. running.set(false);
  136. parent.requestMore(1);
  137. tryNext();
  138. }
  139. }
  140. };
  141. parent.composite.add(s);
  142. s.add(Subscriptions.create(() -> parent.composite.remove(s)));
  143. // Unchecked assignment to avoid weird Android Studio errors
  144. out.subscribe(s);
  145. }
  146. }
  147. public void start() {
  148. stop.set(false);
  149. tryNext();
  150. }
  151. public void stop() {
  152. stop.set(true);
  153. if (running.compareAndSet(false, true)) {
  154. parent.workers.remove(this);
  155. }
  156. }
  157. }
  158. }