2.x: make sure interval+trampoline can be stopped#5367
2.x: make sure interval+trampoline can be stopped#5367akarnokd merged 1 commit intoReactiveX:2.xfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5367 +/- ##
============================================
- Coverage 96.16% 96.14% -0.02%
- Complexity 5780 5782 +2
============================================
Files 630 630
Lines 41162 41189 +27
Branches 5721 5726 +5
============================================
+ Hits 39582 39603 +21
+ Misses 625 621 -4
- Partials 955 965 +10
Continue to review full report at Codecov.
|
| Scheduler sch = scheduler; | ||
|
|
||
| is.setResource(d); | ||
| if (sch instanceof TrampolineScheduler) { |
There was a problem hiding this comment.
Can't this fix be made in TrampolineScheduler itself?
There was a problem hiding this comment.
No, because the schedulePeriodic has to return a Disposable in order to allow it to be stopped but the first task stays in the trampoline loop and stays there never returning from the call. The disposability has to happen before that. This is a similar case to synchronous cancellation with streams that have to send a Disposable/Subscription first to allow cancellation before going into some form of loop that would otherwise not return. The classic Rx.NET and early RxJava suffered from this on a grand scale. This issue only affects interval and trampoline scheduler together.
|
Yeah I see, so alternative solution would be to accept Disposable as
parameter instead of returning it in scheduleX() methods right?
I'm on mobile, but I believe Scheduler is still abstract class as in 1.x
and such overloads could probably be added without breaking compatibility?
…On Thu, May 25, 2017, 21:42 David Karnok ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java
<#5367 (comment)>:
>
- is.setResource(d);
+ if (sch instanceof TrampolineScheduler) {
No, because the schedulePeriodic has to return a Disposable in order to
allow it to be stopped but the first task stays in the trampoline loop and
stays there never returning from the call. The disposability has to happen
before that. This is a similar case to synchronous cancellation with
streams that have to send a Disposable/Subscription first to allow
cancellation before going into some form of loop that would otherwise not
return. The classic Rx.NET and early RxJava suffered from this on a grand
scale. This issue only affects interval and trampoline scheduler together.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#5367 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AA7B3Byz-cRI2svm3vh7EApa-vqk-X8zks5r9cuGgaJpZM4NmHEm>
.
|
No, it would require a consumer callback to be called, similar to how
Doesn't work. In order to not break existing custom schedulers, the new non-abstract methods have to call the old ones which return |
|
No, it would require a consumer callback to be called, similar to
how autoConnect or connect has one to get the Disposable out before it gets
submitted to an executor.
Wouldn't adding inner Disposable into outer one do the same thing?
Doesn't work. In order to not break existing custom schedulers, the new
non-abstract methods have to call the old ones which
return Disposable instead of providing it some way before.
Which seems possible..
Though okay, it sounds like a huge change and if there are no other
possible problematic cases as Intervals, this PR LGTM
…On Thu, May 25, 2017, 22:06 David Karnok ***@***.***> wrote:
Yeah I see, so alternative solution would be to accept Disposable as
parameter instead of returning it in scheduleX() methods right?
No, it would require a consumer callback to be called, similar to how
autoConnect or connect has one to get the Disposable out before it gets
submitted to an executor.
I'm on mobile, but I believe Scheduler is still abstract class as in 1.x
and such overloads could probably be added without breaking compatibility?
Doesn't work. In order to not break existing custom schedulers, the new
non-abstract methods have to call the old ones which return Disposable
instead of providing it some way before.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#5367 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AA7B3OyImoGbCjK_dtQ-1wQ8eaCjCetwks5r9dEmgaJpZM4NmHEm>
.
|
This PR fixes the case where a
trampolinescheduler is used with theintervalorintervalRangeoperator, the periodic emissions can't be cancelled properly. The synchronous and blocking nature of the periodic schedule is that theDisposableis never connected with the downstream and theintervalstays in a drain-sleep loop per emission indefinitely.This PR changes the operators to use the
Workerof a trampoline scheduler that is available before the drain-sleep loop thus a downstream cancellation can stop the loop. Any other "async" scheduler will still use the direct periodic scheduling facility.In addition, the trampoline loop has been fixed to check for the worker
disposedstate in the inner loop.