Ad

Caused By: Rx.exceptions.MissingBackpressureException

- 1 answer

I have an other problem. This time I am facing this error Caused by: rx.exceptions.MissingBackpressureException during the execution of this code:

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            [email protected] saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)
            Thread.sleep(100)
        }
        it.onCompleted()
    }
}

}

The code is just a test code of two process. The first process gets a list of Product from the web, then those products are persist into the a local database within the app. This is the main idea.

The method getProducts do the work of getting the data, in this case I just create an ArrayList of 100 products. The saveRows do the persist work.

The saveRows methods emit juts an Int that represent the saved row. I am doing this because in the UI I have a progressbar reporting the progress.

From an other point of the app I call the method startUpdate and after a few items emitted I get the describe exception

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)

I understand why this exception should be happening https://github.com/ReactiveX/RxJava/wiki/Backpressure but I do not know what I am doing wrong or how to solve it.

Can anybody advice me about this.

Ad

Answer

The problem is your Observable source emits faster than the consumer consumes. It takes 100 ms to save each product. You can add onBackpressureBuffer().

UpdateHelper().startUpdate()
    .onBackpressureBuffer() // Add this
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
      Log.d(TAG, "next $it")
    }, {
      Log.d(TAG, it.message)
    }, {
    })

Also, you can try removing Thread.sleep(100).

flatmap use OperatorMerge ( merge(map(func))): you can see that in your case, map's onNexts are sent faster than have been requested.

Ad
source: stackoverflow.com
Ad