Caused By: Rx.exceptions.MissingBackpressureException
I have an other problem. This time I am facing this error Caused by: rx.exceptions.MissingBackpressureException
during the execution of this code:
class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>
init {
numberOfFileToUpdate = PublishSubject.create()
}
public fun startUpdate(): Observable<Int>{
return getProducts().flatMap { products: ArrayList<Product> ->
numberOfFileToUpdate.onNext(products.size)
[email protected] saveRows(products)
}
}
private fun getProducts(): Observable<ArrayList<Product>> {
return Observable.create {
var products: ArrayList<Product> = ArrayList()
var i = 0
while (i++ < 100) {
products.add(Product())
}
it.onNext(products)
it.onCompleted()
}
}
private fun saveRows(products: ArrayList<Product>): Observable<Int> {
return Observable.create<Int> {
var totalNumberOfRow = products.size
while (totalNumberOfRow-- > 0){
it.onNext(products.size - totalNumberOfRow)
Thread.sleep(100)
}
it.onCompleted()
}
}
}
The code is just a test code of two process. The first process gets a list of Product
from the web, then those products are persist into the a local database within the app. This is the main idea.
The method getProducts
do the work of getting the data, in this case I just create an ArrayList of 100 products. The saveRows
do the persist work.
The saveRows
methods emit juts an Int
that represent the saved row. I am doing this because in the UI I have a progressbar reporting the progress.
From an other point of the app I call the method startUpdate
and after a few items emitted I get the describe exception
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)
I understand why this exception should be happening https://github.com/ReactiveX/RxJava/wiki/Backpressure
but I do not know what I am doing wrong or how to solve it.
Can anybody advice me about this.
Answer
The problem is your Observable source emits faster than the consumer consumes. It takes 100 ms to save each product. You can add onBackpressureBuffer().
UpdateHelper().startUpdate()
.onBackpressureBuffer() // Add this
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.d(TAG, "next $it")
}, {
Log.d(TAG, it.message)
}, {
})
Also, you can try removing Thread.sleep(100)
.
flatmap use OperatorMerge ( merge(map(func))
): you can see that in your case, map
's onNexts are sent faster than have been requested.
Related Questions
- → should I choose reactjs+f7 or f7+vue.js?
- → Phonegap Android write to sd card
- → Local reference jquery script in nanohttpd (Android)
- → Click to navigate on mobile devices
- → How to allow api access to android or ios app only(laravel)?
- → Access the Camera and CameraRoll on Android using React Native?
- → React native change listening port
- → What is the default unit of style in React Native?
- → Google play market autocomplete icon
- → Warning: Each child in an array or iterator should have a unique "key" prop. Check the render method of `ListView`
- → Using Laravel with Genymotion
- → react native using like web-based ajax function
- → react native pdf View