Posted in Software Engineering

What is Backpressure?

Backpressuring is simply the process of handling a fast item producer. If an Observable produces 1_000_000 items per second how a subscriber which can handle only 100 items per second does process the items? The Observable class has an unbounded buffer size, it means it will buffer everything and pushes it to the subscriber, that’s where you get the OutOfMemoryException.
By applying Backpressure onto a stream, it’ll be possible to handle items as needed, unnecessary items can be discarded or even let the producer know when to create and push the new items.

What is the problem?

bigTable.selectAll() // <=== or a hot observable like mouse movement

.map(/** mapping **/)

.observeOn(yourSchedular())

.subscribe(data -> { doSomethingHere(data) });

The problem of the code above is, it’s fetching all the rows from the database and pushes it to downstream, which results in high memory usage because it buffers all the data into memory. Do we want all of the data? Yes! but do we need all of it at once? No

I see this kind of usage in lots of projects, I’m pretty sure most of us have done something like this before even though knowing something is wrong, querying a database and assuming there won’t be lots of data, then it will and we end up with a poor app performance in production. If we are lucky enough we’ll get an OOM Exception but most of the times the app behaves slow and sluggish.

What is the solution?

Backpressure to rescue!! back in RxJava 1, the Observable class was responsible for the backpressure of streams, since RxJava 2 there is a separate class for handling the backpressure, Flowable.

How to create a Flowable?

There are multiple ways for creating a backpressure stream:

  1. Converting the Observable to Flowable with the x.toFloawable() method
Observable.range(1, 1_000_000).toFlowable(BackpressureStrategy.Drop)

With the Drop strategy the downstream doesn’t get all the one million items, it gets the items as it handles the previous items.

example output:
1
2
3
...
100
101
drops some of the items here
100,051
100,052
100,053
...
drops again
523,020
523,021
...
and so on

Note if you subscribe without changing the schedular you will get the whole one million items, since it’s synchronous the producers is blocked by the subscriber.

BackpressureStrategy

There are a few backpressure strategies:

  • Drop: Discards the unrequested items if it exceeds the buffer size
  • Buffer: Buffers all the items from the producer, watch for OOMs
  • Latest: Keeps only the most recent item
  • Error: throws a MissingBackpressureException in case of over emission
  • Missing: No strategy, it would throw a MissingBackpressureException sooner or later somewhere on the downstream

2. Use the Flowable.create() factory method:

We won’t get much more functionality than the x.toFlowable() here. let’s skip this one.

3. Use the Flowable.generate() factory method:

This is what we were looking for, the generate method has a few overloads, this is the one which can satisfy our needs.

Flowable.generate(

() -> 0, //initial state

(state, emitter) -> { //current state

emitter.onNext(state);

return current + 1; // next state

}

)

This code generates a stream of positive numbers: 0,1,2,3,4,5,6,…

The first parameter is a Callable to return an initial state. The second one is a BiFunction which gets called upon on every request to create a new item, its parameters are the current state and an emitter. So let’s apply it to our database code:

Flowable<List<Data>> select(int page, int pageSize) {

return Flowable.generate(

() -> page, //initial page

(currentPage, emitter) -> {

emitter.onNext(table.select("Select * From myTable LIMIT $pageSize OFFSET ${page * pageSize}"));

return currentPage + 1; // next page

});

}

Why there is no string templating in recent java releases, java 9, 10, 11?!! WTH Java

Now we can call it like this:

myTable.select(1, 10)

.map(/** mapping **/)

.flatMap(items -> {}, 1)// <== 1 indicates how many concurrent task should be executed

// observeOn uses a default 128 buffer size so we overwrite it

.observeOn(Schedulers.single(), false, 1)

.subscribe(new DefaultSubscriber<List<Data>>() {

@Override

protected void onStart() {

// super.onStart(); the the default implementation requests Long.MAX_VALUE

request(1);

}



@Override

public void onNext(List<Data> data) {

doSomethingHere(data);

request(1); // if you want more data

}



@Override

public void onError(Throwable t) {

t.printStackTrace();



}



@Override

public void onComplete() {

System.out.println("onComplete");

}

});

That’s all there’s to it. 😃

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s