Browse Source

Replace RxJava in extension installer (#9556)

* Replace RxJava in extension installer

Replace common downloadsRelay with Map of individual StateFlows

* Drop RxRelay dependency

* Simplify updateAllExtensions

* Simplify addDownloadState/removeDownloadState

Use immutable Map functions instead of converting to MutableMap
Two-Ai 1 year ago
parent
commit
0ac38297f4

+ 5 - 4
app/src/main/java/eu/kanade/tachiyomi/extension/ExtensionManager.kt

@@ -14,10 +14,11 @@ import eu.kanade.tachiyomi.extension.util.ExtensionLoader
 import eu.kanade.tachiyomi.util.preference.plusAssign
 import eu.kanade.tachiyomi.util.system.toast
 import kotlinx.coroutines.async
+import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.asStateFlow
+import kotlinx.coroutines.flow.emptyFlow
 import logcat.LogPriority
-import rx.Observable
 import tachiyomi.core.util.lang.launchNow
 import tachiyomi.core.util.lang.withUIContext
 import tachiyomi.core.util.system.logcat
@@ -200,7 +201,7 @@ class ExtensionManager(
      *
      * @param extension The extension to be installed.
      */
-    fun installExtension(extension: Extension.Available): Observable<InstallStep> {
+    fun installExtension(extension: Extension.Available): Flow<InstallStep> {
         return installer.downloadAndInstall(api.getApkUrl(extension), extension)
     }
 
@@ -211,9 +212,9 @@ class ExtensionManager(
      *
      * @param extension The extension to be updated.
      */
-    fun updateExtension(extension: Extension.Installed): Observable<InstallStep> {
+    fun updateExtension(extension: Extension.Installed): Flow<InstallStep> {
         val availableExt = _availableExtensionsFlow.value.find { it.pkgName == extension.pkgName }
-            ?: return Observable.empty()
+            ?: return emptyFlow()
         return installExtension(availableExt)
     }
 

+ 54 - 38
app/src/main/java/eu/kanade/tachiyomi/extension/util/ExtensionInstaller.kt

@@ -10,20 +10,27 @@ import android.os.Environment
 import androidx.core.content.ContextCompat
 import androidx.core.content.getSystemService
 import androidx.core.net.toUri
-import com.jakewharton.rxrelay.PublishRelay
 import eu.kanade.domain.base.BasePreferences
 import eu.kanade.tachiyomi.extension.installer.Installer
 import eu.kanade.tachiyomi.extension.model.Extension
 import eu.kanade.tachiyomi.extension.model.InstallStep
 import eu.kanade.tachiyomi.util.storage.getUriCompat
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.distinctUntilChanged
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.mapNotNull
+import kotlinx.coroutines.flow.merge
+import kotlinx.coroutines.flow.onCompletion
+import kotlinx.coroutines.flow.transformWhile
 import logcat.LogPriority
-import rx.Observable
-import rx.android.schedulers.AndroidSchedulers
+import tachiyomi.core.util.lang.withUIContext
 import tachiyomi.core.util.system.logcat
 import uy.kohesive.injekt.Injekt
 import uy.kohesive.injekt.api.get
 import java.io.File
-import java.util.concurrent.TimeUnit
+import kotlin.time.Duration.Companion.seconds
 
 /**
  * The installer which installs, updates and uninstalls the extensions.
@@ -48,10 +55,7 @@ internal class ExtensionInstaller(private val context: Context) {
      */
     private val activeDownloads = hashMapOf<String, Long>()
 
-    /**
-     * Relay used to notify the installation step of every download.
-     */
-    private val downloadsRelay = PublishRelay.create<Pair<Long, InstallStep>>()
+    private val downloadsStateFlows = hashMapOf<Long, MutableStateFlow<InstallStep>>()
 
     private val extensionInstaller = Injekt.get<BasePreferences>().extensionInstaller()
 
@@ -62,7 +66,7 @@ internal class ExtensionInstaller(private val context: Context) {
      * @param url The url of the apk.
      * @param extension The extension to install.
      */
-    fun downloadAndInstall(url: String, extension: Extension) = Observable.defer {
+    fun downloadAndInstall(url: String, extension: Extension): Flow<InstallStep> {
         val pkgName = extension.pkgName
 
         val oldDownload = activeDownloads[pkgName]
@@ -83,48 +87,59 @@ internal class ExtensionInstaller(private val context: Context) {
         val id = downloadManager.enqueue(request)
         activeDownloads[pkgName] = id
 
-        downloadsRelay.filter { it.first == id }
-            .map { it.second }
-            // Poll download status
-            .mergeWith(pollStatus(id))
+        val downloadStateFlow = MutableStateFlow(InstallStep.Pending)
+        downloadsStateFlows[id] = downloadStateFlow
+
+        // Poll download status
+        val pollStatusFlow = downloadStatusFlow(id).mapNotNull { downloadStatus ->
+            // Map to our model
+            when (downloadStatus) {
+                DownloadManager.STATUS_PENDING -> InstallStep.Pending
+                DownloadManager.STATUS_RUNNING -> InstallStep.Downloading
+                else -> null
+            }
+        }
+
+        return merge(downloadStateFlow, pollStatusFlow).transformWhile {
+            emit(it)
             // Stop when the application is installed or errors
-            .takeUntil { it.isCompleted() }
+            !it.isCompleted()
+        }.onCompletion {
             // Always notify on main thread
-            .observeOn(AndroidSchedulers.mainThread())
-            // Always remove the download when unsubscribed
-            .doOnUnsubscribe { deleteDownload(pkgName) }
+            withUIContext {
+                // Always remove the download when unsubscribed
+                deleteDownload(pkgName)
+            }
+        }
     }
 
     /**
-     * Returns an observable that polls the given download id for its status every second, as the
+     * Returns a flow that polls the given download id for its status every second, as the
      * manager doesn't have any notification system. It'll stop once the download finishes.
      *
      * @param id The id of the download to poll.
      */
-    private fun pollStatus(id: Long): Observable<InstallStep> {
+    private fun downloadStatusFlow(id: Long): Flow<Int> = flow {
         val query = DownloadManager.Query().setFilterById(id)
-
-        return Observable.interval(0, 1, TimeUnit.SECONDS)
+        while (true) {
             // Get the current download status
-            .map {
-                downloadManager.query(query).use { cursor ->
-                    cursor.moveToFirst()
-                    cursor.getInt(cursor.getColumnIndexOrThrow(DownloadManager.COLUMN_STATUS))
-                }
+            val downloadStatus = downloadManager.query(query).use { cursor ->
+                if (!cursor.moveToFirst()) return@flow
+                cursor.getInt(cursor.getColumnIndexOrThrow(DownloadManager.COLUMN_STATUS))
             }
-            // Ignore duplicate results
-            .distinctUntilChanged()
+
+            emit(downloadStatus)
+
             // Stop polling when the download fails or finishes
-            .takeUntil { it == DownloadManager.STATUS_SUCCESSFUL || it == DownloadManager.STATUS_FAILED }
-            // Map to our model
-            .flatMap { status ->
-                when (status) {
-                    DownloadManager.STATUS_PENDING -> Observable.just(InstallStep.Pending)
-                    DownloadManager.STATUS_RUNNING -> Observable.just(InstallStep.Downloading)
-                    else -> Observable.empty()
-                }
+            if (downloadStatus == DownloadManager.STATUS_SUCCESSFUL || downloadStatus == DownloadManager.STATUS_FAILED) {
+                return@flow
             }
+
+            delay(1.seconds)
+        }
     }
+        // Ignore duplicate results
+        .distinctUntilChanged()
 
     /**
      * Starts an intent to install the extension at the given uri.
@@ -176,7 +191,7 @@ internal class ExtensionInstaller(private val context: Context) {
      * @param step New install step.
      */
     fun updateInstallStep(downloadId: Long, step: InstallStep) {
-        downloadsRelay.call(downloadId to step)
+        downloadsStateFlows[downloadId]?.let { it.value = step }
     }
 
     /**
@@ -188,6 +203,7 @@ internal class ExtensionInstaller(private val context: Context) {
         val downloadId = activeDownloads.remove(pkgName)
         if (downloadId != null) {
             downloadManager.remove(downloadId)
+            downloadsStateFlows.remove(downloadId)
         }
         if (activeDownloads.isEmpty()) {
             downloadReceiver.unregister()
@@ -240,7 +256,7 @@ internal class ExtensionInstaller(private val context: Context) {
             // Set next installation step
             if (uri == null) {
                 logcat(LogPriority.ERROR) { "Couldn't locate downloaded APK" }
-                downloadsRelay.call(id to InstallStep.Error)
+                updateInstallStep(id, InstallStep.Error)
                 return
             }
 

+ 20 - 33
app/src/main/java/eu/kanade/tachiyomi/ui/browse/extension/ExtensionsScreenModel.kt

@@ -14,16 +14,18 @@ import eu.kanade.tachiyomi.extension.model.InstallStep
 import eu.kanade.tachiyomi.source.online.HttpSource
 import eu.kanade.tachiyomi.util.system.LocaleHelper
 import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.collect
 import kotlinx.coroutines.flow.collectLatest
 import kotlinx.coroutines.flow.combine
 import kotlinx.coroutines.flow.debounce
 import kotlinx.coroutines.flow.distinctUntilChanged
 import kotlinx.coroutines.flow.launchIn
 import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.onCompletion
 import kotlinx.coroutines.flow.onEach
 import kotlinx.coroutines.flow.update
-import rx.Observable
 import tachiyomi.core.util.lang.launchIO
 import uy.kohesive.injekt.Injekt
 import uy.kohesive.injekt.api.get
@@ -130,28 +132,24 @@ class ExtensionsScreenModel(
 
     fun updateAllExtensions() {
         coroutineScope.launchIO {
-            with(state.value) {
-                if (isEmpty) return@launchIO
-                items.values
-                    .flatten()
-                    .mapNotNull {
-                        when {
-                            it.extension !is Extension.Installed -> null
-                            !it.extension.hasUpdate -> null
-                            else -> it.extension
-                        }
-                    }
-                    .forEach(::updateExtension)
-            }
+            state.value.items.values.flatten()
+                .map { it.extension }
+                .filterIsInstance<Extension.Installed>()
+                .filter { it.hasUpdate }
+                .forEach(::updateExtension)
         }
     }
 
     fun installExtension(extension: Extension.Available) {
-        extensionManager.installExtension(extension).subscribeToInstallUpdate(extension)
+        coroutineScope.launchIO {
+            extensionManager.installExtension(extension).collectToInstallUpdate(extension)
+        }
     }
 
     fun updateExtension(extension: Extension.Installed) {
-        extensionManager.updateExtension(extension).subscribeToInstallUpdate(extension)
+        coroutineScope.launchIO {
+            extensionManager.updateExtension(extension).collectToInstallUpdate(extension)
+        }
     }
 
     fun cancelInstallUpdateExtension(extension: Extension) {
@@ -159,29 +157,18 @@ class ExtensionsScreenModel(
     }
 
     private fun removeDownloadState(extension: Extension) {
-        _currentDownloads.update { _map ->
-            val map = _map.toMutableMap()
-            map.remove(extension.pkgName)
-            map
-        }
+        _currentDownloads.update { it - extension.pkgName }
     }
 
     private fun addDownloadState(extension: Extension, installStep: InstallStep) {
-        _currentDownloads.update { _map ->
-            val map = _map.toMutableMap()
-            map[extension.pkgName] = installStep
-            map
-        }
+        _currentDownloads.update { it + Pair(extension.pkgName, installStep) }
     }
 
-    private fun Observable<InstallStep>.subscribeToInstallUpdate(extension: Extension) {
+    private suspend fun Flow<InstallStep>.collectToInstallUpdate(extension: Extension) =
         this
-            .doOnUnsubscribe { removeDownloadState(extension) }
-            .subscribe(
-                { installStep -> addDownloadState(extension, installStep) },
-                { removeDownloadState(extension) },
-            )
-    }
+            .onEach { installStep -> addDownloadState(extension, installStep) }
+            .onCompletion { removeDownloadState(extension) }
+            .collect()
 
     fun uninstallExtension(pkgName: String) {
         extensionManager.uninstallExtension(pkgName)

+ 1 - 2
gradle/libs.versions.toml

@@ -16,7 +16,6 @@ google-services-gradle = "com.google.gms:google-services:4.3.15"
 
 rxandroid = "io.reactivex:rxandroid:1.2.1"
 rxjava = "io.reactivex:rxjava:1.3.8"
-rxrelay = "com.jakewharton.rxrelay:rxrelay:1.2.0"
 flowreactivenetwork = "ru.beryukhov:flowreactivenetwork:1.0.4"
 
 okhttp-core = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp_version" }
@@ -92,7 +91,7 @@ voyager-transitions = { module = "cafe.adriel.voyager:voyager-transitions", vers
 kotlinter = "org.jmailen.gradle:kotlinter-gradle:3.13.0"
 
 [bundles]
-reactivex = ["rxandroid", "rxjava", "rxrelay"]
+reactivex = ["rxandroid", "rxjava"]
 okhttp = ["okhttp-core", "okhttp-logging", "okhttp-dnsoverhttps"]
 js-engine = ["quickjs-android"]
 sqlite = ["sqlite-framework", "sqlite-ktx", "sqlite-android"]