|
@@ -8,6 +8,7 @@ import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
|
|
|
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
|
|
|
import eu.kanade.tachiyomi.util.lang.awaitSingle
|
|
|
import eu.kanade.tachiyomi.util.lang.launchIO
|
|
|
+import eu.kanade.tachiyomi.util.lang.withIOContext
|
|
|
import kotlinx.coroutines.CancellationException
|
|
|
import kotlinx.coroutines.CoroutineScope
|
|
|
import kotlinx.coroutines.Dispatchers
|
|
@@ -16,10 +17,7 @@ import kotlinx.coroutines.cancel
|
|
|
import kotlinx.coroutines.flow.filter
|
|
|
import kotlinx.coroutines.flow.flow
|
|
|
import kotlinx.coroutines.runInterruptible
|
|
|
-import rx.Observable
|
|
|
-import rx.schedulers.Schedulers
|
|
|
-import rx.subjects.PublishSubject
|
|
|
-import rx.subjects.SerializedSubject
|
|
|
+import kotlinx.coroutines.suspendCancellableCoroutine
|
|
|
import uy.kohesive.injekt.Injekt
|
|
|
import uy.kohesive.injekt.api.get
|
|
|
import java.util.concurrent.PriorityBlockingQueue
|
|
@@ -53,7 +51,7 @@ class HttpPageLoader(
|
|
|
}
|
|
|
.filter { it.status == Page.State.QUEUE }
|
|
|
.collect {
|
|
|
- loadPage(it)
|
|
|
+ _loadPage(it)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -103,11 +101,10 @@ class HttpPageLoader(
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns an observable that loads a page through the queue and listens to its result to
|
|
|
- * emit new states. It handles re-enqueueing pages if they were evicted from the cache.
|
|
|
+ * Loads a page through the queue. Handles re-enqueueing pages if they were evicted from the cache.
|
|
|
*/
|
|
|
- override fun getPage(page: ReaderPage): Observable<Page.State> {
|
|
|
- return Observable.defer {
|
|
|
+ override suspend fun loadPage(page: ReaderPage) {
|
|
|
+ withIOContext {
|
|
|
val imageUrl = page.imageUrl
|
|
|
|
|
|
// Check if the image has been deleted
|
|
@@ -120,26 +117,22 @@ class HttpPageLoader(
|
|
|
page.status = Page.State.QUEUE
|
|
|
}
|
|
|
|
|
|
- val statusSubject = SerializedSubject(PublishSubject.create<Page.State>())
|
|
|
- page.statusSubject = statusSubject
|
|
|
-
|
|
|
val queuedPages = mutableListOf<PriorityPage>()
|
|
|
if (page.status == Page.State.QUEUE) {
|
|
|
queuedPages += PriorityPage(page, 1).also { queue.offer(it) }
|
|
|
}
|
|
|
queuedPages += preloadNextPages(page, preloadSize)
|
|
|
|
|
|
- statusSubject.startWith(page.status)
|
|
|
- .doOnUnsubscribe {
|
|
|
+ suspendCancellableCoroutine<Nothing> { continuation ->
|
|
|
+ continuation.invokeOnCancellation {
|
|
|
queuedPages.forEach {
|
|
|
if (it.page.status == Page.State.QUEUE) {
|
|
|
queue.remove(it)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
- .subscribeOn(Schedulers.io())
|
|
|
- .unsubscribeOn(Schedulers.io())
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -197,7 +190,7 @@ class HttpPageLoader(
|
|
|
*
|
|
|
* @param page the page whose source image has to be downloaded.
|
|
|
*/
|
|
|
- private suspend fun loadPage(page: ReaderPage) {
|
|
|
+ private suspend fun _loadPage(page: ReaderPage) {
|
|
|
try {
|
|
|
if (page.imageUrl.isNullOrEmpty()) {
|
|
|
page.status = Page.State.LOAD_PAGE
|