Downloader.kt 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package eu.kanade.tachiyomi.data.download
  2. import android.content.Context
  3. import android.webkit.MimeTypeMap
  4. import com.hippo.unifile.UniFile
  5. import com.jakewharton.rxrelay.BehaviorRelay
  6. import com.jakewharton.rxrelay.PublishRelay
  7. import eu.kanade.tachiyomi.data.database.models.Chapter
  8. import eu.kanade.tachiyomi.data.database.models.Manga
  9. import eu.kanade.tachiyomi.data.download.model.Download
  10. import eu.kanade.tachiyomi.data.download.model.DownloadQueue
  11. import eu.kanade.tachiyomi.source.SourceManager
  12. import eu.kanade.tachiyomi.source.model.Page
  13. import eu.kanade.tachiyomi.source.online.HttpSource
  14. import eu.kanade.tachiyomi.source.online.fetchAllImageUrlsFromPageList
  15. import eu.kanade.tachiyomi.util.*
  16. import kotlinx.coroutines.experimental.async
  17. import okhttp3.Response
  18. import rx.Observable
  19. import rx.android.schedulers.AndroidSchedulers
  20. import rx.schedulers.Schedulers
  21. import rx.subscriptions.CompositeSubscription
  22. import timber.log.Timber
  23. import uy.kohesive.injekt.injectLazy
  24. /**
  25. * This class is the one in charge of downloading chapters.
  26. *
  27. * Its [queue] contains the list of chapters to download. In order to download them, the downloader
  28. * subscriptions must be running and the list of chapters must be sent to them by [downloadsRelay].
  29. *
  30. * The queue manipulation must be done in one thread (currently the main thread) to avoid unexpected
  31. * behavior, but it's safe to read it from multiple threads.
  32. *
  33. * @param context the application context.
  34. * @param provider the downloads directory provider.
  35. * @param cache the downloads cache, used to add the downloads to the cache after their completion.
  36. */
  37. class Downloader(
  38. private val context: Context,
  39. private val provider: DownloadProvider,
  40. private val cache: DownloadCache
  41. ) {
  42. /**
  43. * Store for persisting downloads across restarts.
  44. */
  45. private val store = DownloadStore(context)
  46. /**
  47. * Queue where active downloads are kept.
  48. */
  49. val queue = DownloadQueue(store)
  50. /**
  51. * Source manager.
  52. */
  53. private val sourceManager: SourceManager by injectLazy()
  54. /**
  55. * Notifier for the downloader state and progress.
  56. */
  57. private val notifier by lazy { DownloadNotifier(context) }
  58. /**
  59. * Downloader subscriptions.
  60. */
  61. private val subscriptions = CompositeSubscription()
  62. /**
  63. * Relay to send a list of downloads to the downloader.
  64. */
  65. private val downloadsRelay = PublishRelay.create<List<Download>>()
  66. /**
  67. * Relay to subscribe to the downloader status.
  68. */
  69. val runningRelay: BehaviorRelay<Boolean> = BehaviorRelay.create(false)
  70. /**
  71. * Whether the downloader is running.
  72. */
  73. @Volatile private var isRunning: Boolean = false
  74. init {
  75. launchNow {
  76. val chapters = async { store.restore() }
  77. queue.addAll(chapters.await())
  78. }
  79. }
  80. /**
  81. * Starts the downloader. It doesn't do anything if it's already running or there isn't anything
  82. * to download.
  83. *
  84. * @return true if the downloader is started, false otherwise.
  85. */
  86. fun start(): Boolean {
  87. if (isRunning || queue.isEmpty())
  88. return false
  89. if (!subscriptions.hasSubscriptions())
  90. initializeSubscriptions()
  91. val pending = queue.filter { it.status != Download.DOWNLOADED }
  92. pending.forEach { if (it.status != Download.QUEUE) it.status = Download.QUEUE }
  93. downloadsRelay.call(pending)
  94. return !pending.isEmpty()
  95. }
  96. /**
  97. * Stops the downloader.
  98. */
  99. fun stop(reason: String? = null) {
  100. destroySubscriptions()
  101. queue
  102. .filter { it.status == Download.DOWNLOADING }
  103. .forEach { it.status = Download.ERROR }
  104. if (reason != null) {
  105. notifier.onWarning(reason)
  106. } else {
  107. if (notifier.paused) {
  108. notifier.paused = false
  109. notifier.onDownloadPaused()
  110. } else if (notifier.isSingleChapter && !notifier.errorThrown) {
  111. notifier.isSingleChapter = false
  112. } else {
  113. notifier.dismiss()
  114. }
  115. }
  116. }
  117. /**
  118. * Pauses the downloader
  119. */
  120. fun pause() {
  121. destroySubscriptions()
  122. queue
  123. .filter { it.status == Download.DOWNLOADING }
  124. .forEach { it.status = Download.QUEUE }
  125. notifier.paused = true
  126. }
  127. /**
  128. * Removes everything from the queue.
  129. *
  130. * @param isNotification value that determines if status is set (needed for view updates)
  131. */
  132. fun clearQueue(isNotification: Boolean = false) {
  133. destroySubscriptions()
  134. //Needed to update the chapter view
  135. if (isNotification) {
  136. queue
  137. .filter { it.status == Download.QUEUE }
  138. .forEach { it.status = Download.NOT_DOWNLOADED }
  139. }
  140. queue.clear()
  141. notifier.dismiss()
  142. }
  143. /**
  144. * Prepares the subscriptions to start downloading.
  145. */
  146. private fun initializeSubscriptions() {
  147. if (isRunning) return
  148. isRunning = true
  149. runningRelay.call(true)
  150. subscriptions.clear()
  151. subscriptions += downloadsRelay.concatMapIterable { it }
  152. .concatMap { downloadChapter(it).subscribeOn(Schedulers.io()) }
  153. .onBackpressureBuffer()
  154. .observeOn(AndroidSchedulers.mainThread())
  155. .subscribe({ completeDownload(it)
  156. }, { error ->
  157. DownloadService.stop(context)
  158. Timber.e(error)
  159. notifier.onError(error.message)
  160. })
  161. }
  162. /**
  163. * Destroys the downloader subscriptions.
  164. */
  165. private fun destroySubscriptions() {
  166. if (!isRunning) return
  167. isRunning = false
  168. runningRelay.call(false)
  169. subscriptions.clear()
  170. }
  171. /**
  172. * Creates a download object for every chapter and adds them to the downloads queue.
  173. *
  174. * @param manga the manga of the chapters to download.
  175. * @param chapters the list of chapters to download.
  176. * @param autoStart whether to start the downloader after enqueing the chapters.
  177. */
  178. fun queueChapters(manga: Manga, chapters: List<Chapter>, autoStart: Boolean) = launchUI {
  179. val source = sourceManager.get(manga.source) as? HttpSource ?: return@launchUI
  180. // Called in background thread, the operation can be slow with SAF.
  181. val chaptersWithoutDir = async {
  182. val mangaDir = provider.findMangaDir(manga, source)
  183. chapters
  184. // Avoid downloading chapters with the same name.
  185. .distinctBy { it.name }
  186. // Filter out those already downloaded.
  187. .filter { mangaDir?.findFile(provider.getChapterDirName(it)) == null }
  188. // Add chapters to queue from the start.
  189. .sortedByDescending { it.source_order }
  190. }
  191. // Runs in main thread (synchronization needed).
  192. val chaptersToQueue = chaptersWithoutDir.await()
  193. // Filter out those already enqueued.
  194. .filter { chapter -> queue.none { it.chapter.id == chapter.id } }
  195. // Create a download for each one.
  196. .map { Download(source, manga, it) }
  197. if (chaptersToQueue.isNotEmpty()) {
  198. queue.addAll(chaptersToQueue)
  199. // Initialize queue size.
  200. notifier.initialQueueSize = queue.size
  201. if (isRunning) {
  202. // Send the list of downloads to the downloader.
  203. downloadsRelay.call(chaptersToQueue)
  204. }
  205. // Start downloader if needed
  206. if (autoStart) {
  207. DownloadService.start([email protected])
  208. }
  209. }
  210. }
  211. /**
  212. * Returns the observable which downloads a chapter.
  213. *
  214. * @param download the chapter to be downloaded.
  215. */
  216. private fun downloadChapter(download: Download): Observable<Download> = Observable.defer {
  217. val chapterDirname = provider.getChapterDirName(download.chapter)
  218. val mangaDir = provider.getMangaDir(download.manga, download.source)
  219. val tmpDir = mangaDir.createDirectory("${chapterDirname}_tmp")
  220. val pageListObservable = if (download.pages == null) {
  221. // Pull page list from network and add them to download object
  222. download.source.fetchPageList(download.chapter)
  223. .doOnNext { pages ->
  224. if (pages.isEmpty()) {
  225. throw Exception("Page list is empty")
  226. }
  227. download.pages = pages
  228. }
  229. } else {
  230. // Or if the page list already exists, start from the file
  231. Observable.just(download.pages!!)
  232. }
  233. pageListObservable
  234. .doOnNext { _ ->
  235. // Delete all temporary (unfinished) files
  236. tmpDir.listFiles()
  237. ?.filter { it.name!!.endsWith(".tmp") }
  238. ?.forEach { it.delete() }
  239. download.downloadedImages = 0
  240. download.status = Download.DOWNLOADING
  241. }
  242. // Get all the URLs to the source images, fetch pages if necessary
  243. .flatMap { download.source.fetchAllImageUrlsFromPageList(it) }
  244. // Start downloading images, consider we can have downloaded images already
  245. .concatMap { page -> getOrDownloadImage(page, download, tmpDir) }
  246. // Do when page is downloaded.
  247. .doOnNext { notifier.onProgressChange(download) }
  248. .toList()
  249. .map { _ -> download }
  250. // Do after download completes
  251. .doOnNext { ensureSuccessfulDownload(download, mangaDir, tmpDir, chapterDirname) }
  252. // If the page list threw, it will resume here
  253. .onErrorReturn { error ->
  254. download.status = Download.ERROR
  255. notifier.onError(error.message, download.chapter.name)
  256. download
  257. }
  258. }
  259. /**
  260. * Returns the observable which gets the image from the filesystem if it exists or downloads it
  261. * otherwise.
  262. *
  263. * @param page the page to download.
  264. * @param download the download of the page.
  265. * @param tmpDir the temporary directory of the download.
  266. */
  267. private fun getOrDownloadImage(page: Page, download: Download, tmpDir: UniFile): Observable<Page> {
  268. // If the image URL is empty, do nothing
  269. if (page.imageUrl == null)
  270. return Observable.just(page)
  271. val filename = String.format("%03d", page.number)
  272. val tmpFile = tmpDir.findFile("$filename.tmp")
  273. // Delete temp file if it exists.
  274. tmpFile?.delete()
  275. // Try to find the image file.
  276. val imageFile = tmpDir.listFiles()!!.find { it.name!!.startsWith("$filename.") }
  277. // If the image is already downloaded, do nothing. Otherwise download from network
  278. val pageObservable = if (imageFile != null)
  279. Observable.just(imageFile)
  280. else
  281. downloadImage(page, download.source, tmpDir, filename)
  282. return pageObservable
  283. // When the image is ready, set image path, progress (just in case) and status
  284. .doOnNext { file ->
  285. page.uri = file.uri
  286. page.progress = 100
  287. download.downloadedImages++
  288. page.status = Page.READY
  289. }
  290. .map { page }
  291. // Mark this page as error and allow to download the remaining
  292. .onErrorReturn {
  293. page.progress = 0
  294. page.status = Page.ERROR
  295. page
  296. }
  297. }
  298. /**
  299. * Returns the observable which downloads the image from network.
  300. *
  301. * @param page the page to download.
  302. * @param source the source of the page.
  303. * @param tmpDir the temporary directory of the download.
  304. * @param filename the filename of the image.
  305. */
  306. private fun downloadImage(page: Page, source: HttpSource, tmpDir: UniFile, filename: String): Observable<UniFile> {
  307. page.status = Page.DOWNLOAD_IMAGE
  308. page.progress = 0
  309. return source.fetchImage(page)
  310. .map { response ->
  311. val file = tmpDir.createFile("$filename.tmp")
  312. try {
  313. response.body()!!.source().saveTo(file.openOutputStream())
  314. val extension = getImageExtension(response, file)
  315. file.renameTo("$filename.$extension")
  316. } catch (e: Exception) {
  317. response.close()
  318. file.delete()
  319. throw e
  320. }
  321. file
  322. }
  323. // Retry 3 times, waiting 2, 4 and 8 seconds between attempts.
  324. .retryWhen(RetryWithDelay(3, { (2 shl it - 1) * 1000 }, Schedulers.trampoline()))
  325. }
  326. /**
  327. * Returns the extension of the downloaded image from the network response, or if it's null,
  328. * analyze the file. If everything fails, assume it's a jpg.
  329. *
  330. * @param response the network response of the image.
  331. * @param file the file where the image is already downloaded.
  332. */
  333. private fun getImageExtension(response: Response, file: UniFile): String {
  334. // Read content type if available.
  335. val mime = response.body()?.contentType()?.let { ct -> "${ct.type()}/${ct.subtype()}" }
  336. // Else guess from the uri.
  337. ?: context.contentResolver.getType(file.uri)
  338. // Else read magic numbers.
  339. ?: DiskUtil.findImageMime { file.openInputStream() }
  340. return MimeTypeMap.getSingleton().getExtensionFromMimeType(mime) ?: "jpg"
  341. }
  342. /**
  343. * Checks if the download was successful.
  344. *
  345. * @param download the download to check.
  346. * @param mangaDir the manga directory of the download.
  347. * @param tmpDir the directory where the download is currently stored.
  348. * @param dirname the real (non temporary) directory name of the download.
  349. */
  350. private fun ensureSuccessfulDownload(download: Download, mangaDir: UniFile,
  351. tmpDir: UniFile, dirname: String) {
  352. // Ensure that the chapter folder has all the images.
  353. val downloadedImages = tmpDir.listFiles().orEmpty().filterNot { it.name!!.endsWith(".tmp") }
  354. download.status = if (downloadedImages.size == download.pages!!.size) {
  355. Download.DOWNLOADED
  356. } else {
  357. Download.ERROR
  358. }
  359. // Only rename the directory if it's downloaded.
  360. if (download.status == Download.DOWNLOADED) {
  361. tmpDir.renameTo(dirname)
  362. cache.addChapter(dirname, mangaDir, download.manga)
  363. }
  364. }
  365. /**
  366. * Completes a download. This method is called in the main thread.
  367. */
  368. private fun completeDownload(download: Download) {
  369. // Delete successful downloads from queue
  370. if (download.status == Download.DOWNLOADED) {
  371. // remove downloaded chapter from queue
  372. queue.remove(download)
  373. }
  374. if (areAllDownloadsFinished()) {
  375. if (notifier.isSingleChapter && !notifier.errorThrown) {
  376. notifier.onDownloadCompleted(download, queue)
  377. }
  378. DownloadService.stop(context)
  379. }
  380. }
  381. /**
  382. * Returns true if all the queued downloads are in DOWNLOADED or ERROR state.
  383. */
  384. private fun areAllDownloadsFinished(): Boolean {
  385. return queue.none { it.status <= Download.DOWNLOADING }
  386. }
  387. }