Skip to content

1.x: observeOn now replenishes with constant rate#3795

Merged
akarnokd merged 1 commit intoReactiveX:1.xfrom
akarnokd:ObserveOnStableFetch
Apr 8, 2016
Merged

1.x: observeOn now replenishes with constant rate#3795
akarnokd merged 1 commit intoReactiveX:1.xfrom
akarnokd:ObserveOnStableFetch

Conversation

@akarnokd
Copy link
Member

This PR makes sure observeOn requests replenishments in a fixed and predictable quantity of 75% of the bufferSize, that is, if an emission counter reaches 0.75 * bufferSize, that amount is requested and the emission counter is reset to zero. This requires saving the emission count between drain runs. If the bufferSize is 1 or 2, the replenishment will trigger after every 1 or 2 items.

Note that there is only one sensitive operator-builder, AsyncOnSubscribe, which is mostly affected by the request pattern as it facilitates user code to respond with an Observable sequence of the requested amount.

In addition, since observeOn now supports setting the buffer size, it can act as a rebatching operator via the help of Schedulers.immediate().

@artem-zinnatullin
Copy link
Contributor

👍

this.on = NotificationLite.instance();
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
this.limit = calculatedSize - (calculatedSize >> 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a less efficient (but more readable): (int) (calculatedSize * 0.75) ?

@stevegury
Copy link
Member

Other than the previous comment, 👍

@akarnokd akarnokd force-pushed the ObserveOnStableFetch branch from 23862c1 to e2331e3 Compare April 7, 2016 06:55
@akarnokd
Copy link
Member Author

akarnokd commented Apr 7, 2016

That formula doesn't work if calculatedSize == 1. I've updated the PR with a comment on the calculation.

@stevegury
Copy link
Member

👍

@akarnokd akarnokd merged commit 53c31cd into ReactiveX:1.x Apr 8, 2016
@akarnokd akarnokd deleted the ObserveOnStableFetch branch April 8, 2016 20:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants