Skip to content

Commit 2093897

Browse files
authored
Merge branch '2.x' into 2.x
2 parents 25f8eb8 + 96e5a60 commit 2093897

29 files changed

Lines changed: 826 additions & 69 deletions

CHANGES.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,45 @@
22

33
The cnagelog of version 1.x can be found at https://github.com/ReactiveX/RxJava/blob/1.x/CHANGES.md
44

5+
### Version 2.1.1 - June 21, 2017 ([Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava2%7Crxjava%7C2.1.1%7C))
6+
7+
#### Notable changes
8+
9+
The emitter API (such as `FlowableEmitter`, `SingleEmitter`, etc.) now features a new method, `tryOnError` that tries to emit the `Throwable` if the sequence is not cancelled/disposed. Unlike the regular `onError`, if the downstream is no longer willing to accept events, the method returns false and doesn't signal an `UndeliverableException`.
10+
11+
#### API enhancements
12+
13+
- [Pull 5344](https://github.com/ReactiveX/RxJava/pull/5344): Add `tryOnError` to `create/XEmitter` API.
14+
- [Pull 5386](https://github.com/ReactiveX/RxJava/pull/5386): Add `subscribeOn` overload to avoid same-pool deadlock with create.
15+
16+
#### Documentation changes
17+
18+
- [Pull 5343](https://github.com/ReactiveX/RxJava/pull/5343): Fix Javadoc for `Maybe.toSingle`.
19+
- [Pull 5347](https://github.com/ReactiveX/RxJava/pull/5347): Fix Javadoc for `FunctionX`
20+
- [Pull 5351](https://github.com/ReactiveX/RxJava/pull/5351): Update some marbles of `Observable`
21+
- [Commit b4aeb6e3](https://github.com/ReactiveX/RxJava/commit/0b0355e3bc09326c8005fd26d09e7c1eb4aeb6e3): Replace `Action1` with `Consumer` in docs.
22+
- [Pull 5383](https://github.com/ReactiveX/RxJava/pull/5383): Fixed Javadoc for `SingleFlatMapIterableObservable`.
23+
- [Pull 5403](https://github.com/ReactiveX/RxJava/pull/5403): Fix the copy-paste error in the Javadoc of `Single.doAfterTeminate` mentioning `doAfterSuccess`.
24+
- [Pull 5405](https://github.com/ReactiveX/RxJava/pull/5405): `DefaultObserver` javadoc fix: use subscribe, remove disposable code.
25+
- [Pull 5407](https://github.com/ReactiveX/RxJava/pull/5407): `DefaultSubscriber` javadoc sample fix.
26+
- [Pull 5406](https://github.com/ReactiveX/RxJava/pull/5406): Fix javadoc for `Observable.reduce()` and `Observable.reduceWith()`.
27+
- [Pull 5409](https://github.com/ReactiveX/RxJava/pull/5409): Corrected `Single.delay` documentation.
28+
29+
#### Bugfixes
30+
31+
- [Pull 5367](https://github.com/ReactiveX/RxJava/pull/5367): Make sure `interval+trampoline` can be stopped.
32+
- [Pull 5378](https://github.com/ReactiveX/RxJava/pull/5378): Make `SingleMap` not allow map function return null.
33+
- [Pull 5379](https://github.com/ReactiveX/RxJava/pull/5379): Add missing null checks on values returned by user functions.
34+
- [Pull 5415](https://github.com/ReactiveX/RxJava/pull/5415): Fix `doOnNext` failure not triggering `doOnError` when fused.
35+
- [Pull 5419](https://github.com/ReactiveX/RxJava/pull/5419): Fix periodic scheduling with negative period causing IAE.
36+
- [Pull 5427](https://github.com/ReactiveX/RxJava/pull/5427): Fix `buffer(time, maxSize)` duplicating buffers on time-size race.
37+
38+
#### Other
39+
40+
- [Pull 5324](https://github.com/ReactiveX/RxJava/pull/5324): Mark `VolatileSizeArrayList` as `RandomAccess` list
41+
- [Pull 5354](https://github.com/ReactiveX/RxJava/pull/5354): Fix typo in error message in `BaseTestConsumer.assertValueSequence`.
42+
- [Pull 5391](https://github.com/ReactiveX/RxJava/pull/5391): Changed minimum value of `rx2.buffer-size` to 1.
43+
544
### Version 2.1.0 - April 29, 2017 ([Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava2%7Crxjava%7C2.1.0%7C))
645

746
#### Summary

src/main/java/io/reactivex/Observable.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
import io.reactivex.schedulers.*;
3535

3636
/**
37-
* The Observable class that is designed similar to the Reactive-Streams Pattern, minus the backpressure,
38-
* and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
37+
* The Observable class is the non-backpressured, optionally multi-valued base reactive class that
38+
* offers factory methods, intermediate operators and the ability to consume synchronous
39+
* and/or asynchronous reactive dataflows.
3940
* <p>
40-
* Reactive-Streams operates with {@code ObservableSource}s which {@code Observable} extends. Many operators
41-
* therefore accept general {@code ObservableSource}s directly and allow direct interoperation with other
42-
* Reactive-Streams implementations.
41+
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
42+
* for such non-backpressured flows, which {@code Observable} itself implements as well.
4343
* <p>
4444
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
4545
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
@@ -49,11 +49,50 @@
4949
* <p>
5050
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
5151
* <p>
52-
* For more information see the <a href="http://reactivex.io/documentation/ObservableSource.html">ReactiveX
53-
* documentation</a>.
54-
*
52+
* The design of this class was derived from the
53+
* <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive-Streams design and specification</a>
54+
* by removing any backpressure-related infrastructure and implementation detail, replacing the
55+
* {@code org.reactivestreams.Subscription} with {@link Disposable} as the primary means to cancel
56+
* a flow.
57+
* <p>
58+
* The {@code Observable} follows the protocol
59+
* <pre><code>
60+
* onSubscribe onNext* (onError | onComplete)?
61+
* </code></pre>
62+
* where
63+
* the stream can be disposed through the {@code Disposable} instance provided to consumers through
64+
* {@code Observer.onSubscribe}.
65+
* <p>
66+
* Unlike the {@code Observable} of version 1.x, {@link #subscribe(Observer)} does not allow external cancellation
67+
* of a subscription and the {@code Observer} instance is expected to expose such capability.
68+
* <p>Example:
69+
* <pre><code>
70+
* Disposable d = Observable.just("Hello world!")
71+
* .delay(1, TimeUnit.SECONDS)
72+
* .subscribeWith(new DisposableObserver&lt;String>() {
73+
* &#64;Override public void onStart() {
74+
* System.out.println("Start!");
75+
* }
76+
* &#64;Override public void onNext(Integer t) {
77+
* System.out.println(t);
78+
* }
79+
* &#64;Override public void onError(Throwable t) {
80+
* t.printStackTrace();
81+
* }
82+
* &#64;Override public void onComplete() {
83+
* System.out.println("Done!");
84+
* }
85+
* });
86+
*
87+
* Thread.sleep(500);
88+
* // the sequence now can be cancelled via dispose()
89+
* d.dispose();
90+
* </code></pre>
91+
*
5592
* @param <T>
5693
* the type of the items emitted by the Observable
94+
* @see Flowable
95+
* @see io.reactivex.observers.DisposableObserver
5796
*/
5897
public abstract class Observable<T> implements ObservableSource<T> {
5998

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -460,12 +460,12 @@ public void onNext(T t) {
460460
if (b.size() < maxSize) {
461461
return;
462462
}
463-
}
464463

465-
if (restartTimerOnMaxSize) {
466464
buffer = null;
467465
producerIndex++;
466+
}
468467

468+
if (restartTimerOnMaxSize) {
469469
timer.dispose();
470470
}
471471

@@ -480,17 +480,12 @@ public void onNext(T t) {
480480
return;
481481
}
482482

483+
synchronized (this) {
484+
buffer = b;
485+
consumerIndex++;
486+
}
483487
if (restartTimerOnMaxSize) {
484-
synchronized (this) {
485-
buffer = b;
486-
consumerIndex++;
487-
}
488-
489488
timer = w.schedulePeriodically(this, timespan, timespan, unit);
490-
} else {
491-
synchronized (this) {
492-
buffer = b;
493-
}
494489
}
495490
}
496491

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.functions.*;
2222
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2323
import io.reactivex.internal.subscribers.*;
24+
import io.reactivex.internal.util.ExceptionHelper;
2425
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
public final class FlowableDoOnEach<T> extends AbstractFlowableWithUpstream<T, T> {
@@ -149,11 +150,33 @@ public int requestFusion(int mode) {
149150
@Nullable
150151
@Override
151152
public T poll() throws Exception {
152-
T v = qs.poll();
153+
T v;
154+
155+
try {
156+
v = qs.poll();
157+
} catch (Throwable ex) {
158+
Exceptions.throwIfFatal(ex);
159+
try {
160+
onError.accept(ex);
161+
} catch (Throwable exc) {
162+
throw new CompositeException(ex, exc);
163+
}
164+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
165+
}
153166

154167
if (v != null) {
155168
try {
156-
onNext.accept(v);
169+
try {
170+
onNext.accept(v);
171+
} catch (Throwable ex) {
172+
Exceptions.throwIfFatal(ex);
173+
try {
174+
onError.accept(ex);
175+
} catch (Throwable exc) {
176+
throw new CompositeException(ex, exc);
177+
}
178+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
179+
}
157180
} finally {
158181
onAfterTerminate.run();
159182
}
@@ -282,11 +305,33 @@ public int requestFusion(int mode) {
282305
@Nullable
283306
@Override
284307
public T poll() throws Exception {
285-
T v = qs.poll();
308+
T v;
309+
310+
try {
311+
v = qs.poll();
312+
} catch (Throwable ex) {
313+
Exceptions.throwIfFatal(ex);
314+
try {
315+
onError.accept(ex);
316+
} catch (Throwable exc) {
317+
throw new CompositeException(ex, exc);
318+
}
319+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
320+
}
286321

287322
if (v != null) {
288323
try {
289-
onNext.accept(v);
324+
try {
325+
onNext.accept(v);
326+
} catch (Throwable ex) {
327+
Exceptions.throwIfFatal(ex);
328+
try {
329+
onError.accept(ex);
330+
} catch (Throwable exc) {
331+
throw new CompositeException(ex, exc);
332+
}
333+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
334+
}
290335
} finally {
291336
onAfterTerminate.run();
292337
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ static final class FlatMapCompletableMainSubscriber<T> extends BasicIntQueueSubs
7272

7373
Subscription s;
7474

75+
volatile boolean cancelled;
76+
7577
FlatMapCompletableMainSubscriber(Subscriber<? super T> observer,
7678
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
7779
int maxConcurrency) {
@@ -117,7 +119,7 @@ public void onNext(T value) {
117119

118120
InnerConsumer inner = new InnerConsumer();
119121

120-
if (set.add(inner)) {
122+
if (!cancelled && set.add(inner)) {
121123
cs.subscribe(inner);
122124
}
123125
}
@@ -164,6 +166,7 @@ public void onComplete() {
164166

165167
@Override
166168
public void cancel() {
169+
cancelled = true;
167170
s.cancel();
168171
set.dispose();
169172
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ static final class FlatMapCompletableMainSubscriber<T> extends AtomicInteger
7979

8080
Subscription s;
8181

82+
volatile boolean disposed;
83+
8284
FlatMapCompletableMainSubscriber(CompletableObserver observer,
8385
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors,
8486
int maxConcurrency) {
@@ -124,7 +126,7 @@ public void onNext(T value) {
124126

125127
InnerObserver inner = new InnerObserver();
126128

127-
if (set.add(inner)) {
129+
if (!disposed && set.add(inner)) {
128130
cs.subscribe(inner);
129131
}
130132
}
@@ -171,6 +173,7 @@ public void onComplete() {
171173

172174
@Override
173175
public void dispose() {
176+
disposed = true;
174177
s.cancel();
175178
set.dispose();
176179
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void onNext(T t) {
128128

129129
InnerObserver inner = new InnerObserver();
130130

131-
if (set.add(inner)) {
131+
if (!cancelled && set.add(inner)) {
132132
ms.subscribe(inner);
133133
}
134134
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void onNext(T t) {
128128

129129
InnerObserver inner = new InnerObserver();
130130

131-
if (set.add(inner)) {
131+
if (!cancelled && set.add(inner)) {
132132
ms.subscribe(inner);
133133
}
134134
}

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -458,12 +458,11 @@ public void onNext(T t) {
458458
if (b.size() < maxSize) {
459459
return;
460460
}
461-
}
462-
463-
if (restartTimerOnMaxSize) {
464461
buffer = null;
465462
producerIndex++;
463+
}
466464

465+
if (restartTimerOnMaxSize) {
467466
timer.dispose();
468467
}
469468

@@ -478,17 +477,12 @@ public void onNext(T t) {
478477
return;
479478
}
480479

480+
synchronized (this) {
481+
buffer = b;
482+
consumerIndex++;
483+
}
481484
if (restartTimerOnMaxSize) {
482-
synchronized (this) {
483-
buffer = b;
484-
consumerIndex++;
485-
}
486-
487485
timer = w.schedulePeriodically(this, timespan, timespan, unit);
488-
} else {
489-
synchronized (this) {
490-
buffer = b;
491-
}
492486
}
493487
}
494488

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ static final class FlatMapCompletableMainObserver<T> extends BasicIntQueueDispos
6464

6565
Disposable d;
6666

67+
volatile boolean disposed;
68+
6769
FlatMapCompletableMainObserver(Observer<? super T> observer, Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
6870
this.actual = observer;
6971
this.mapper = mapper;
@@ -99,7 +101,7 @@ public void onNext(T value) {
99101

100102
InnerObserver inner = new InnerObserver();
101103

102-
if (set.add(inner)) {
104+
if (!disposed && set.add(inner)) {
103105
cs.subscribe(inner);
104106
}
105107
}
@@ -138,6 +140,7 @@ public void onComplete() {
138140

139141
@Override
140142
public void dispose() {
143+
disposed = true;
141144
d.dispose();
142145
set.dispose();
143146
}

0 commit comments

Comments
 (0)