|
@@ -5,24 +5,49 @@ import okhttp3.OkHttpClient
|
|
|
import okhttp3.Request
|
|
|
import okhttp3.Response
|
|
|
import rx.Observable
|
|
|
-import rx.subscriptions.Subscriptions
|
|
|
-import java.io.IOException
|
|
|
+import rx.Producer
|
|
|
+import rx.Subscription
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean
|
|
|
|
|
|
fun Call.asObservable(): Observable<Response> {
|
|
|
return Observable.create { subscriber ->
|
|
|
- subscriber.add(Subscriptions.create { cancel() })
|
|
|
+ // Since Call is a one-shot type, clone it for each new subscriber.
|
|
|
+ val call = if (!isExecuted) this else {
|
|
|
+ // TODO use clone method in OkHttp 3.5
|
|
|
+ val field = javaClass.getDeclaredField("client").apply { isAccessible = true }
|
|
|
+ val client = field.get(this) as OkHttpClient
|
|
|
+ client.newCall(request())
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wrap the call in a helper which handles both unsubscription and backpressure.
|
|
|
+ val requestArbiter = object : AtomicBoolean(), Producer, Subscription {
|
|
|
+ override fun request(n: Long) {
|
|
|
+ if (n == 0L || !compareAndSet(false, true)) return
|
|
|
+
|
|
|
+ try {
|
|
|
+ val response = call.execute()
|
|
|
+ if (!subscriber.isUnsubscribed) {
|
|
|
+ subscriber.onNext(response)
|
|
|
+ subscriber.onCompleted()
|
|
|
+ }
|
|
|
+ } catch (error: Exception) {
|
|
|
+ if (!subscriber.isUnsubscribed) {
|
|
|
+ subscriber.onError(error)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- val response = execute()
|
|
|
- if (!subscriber.isUnsubscribed) {
|
|
|
- subscriber.onNext(response)
|
|
|
- subscriber.onCompleted()
|
|
|
+ override fun unsubscribe() {
|
|
|
+ call.cancel()
|
|
|
}
|
|
|
- } catch (error: IOException) {
|
|
|
- if (!subscriber.isUnsubscribed) {
|
|
|
- subscriber.onError(error)
|
|
|
+
|
|
|
+ override fun isUnsubscribed(): Boolean {
|
|
|
+ return call.isCanceled
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ subscriber.add(requestArbiter)
|
|
|
+ subscriber.setProducer(requestArbiter)
|
|
|
}
|
|
|
}
|
|
|
|