diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index 3aa1269f43..a0d65953df 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -177,9 +177,9 @@ public void request(long n) { @Override public void cancel() { - DisposableHelper.dispose(timer); - s.cancel(); + + DisposableHelper.dispose(timer); } @Override @@ -333,9 +333,9 @@ public void request(long n) { @Override public void cancel() { - w.dispose(); clear(); s.cancel(); + w.dispose(); } void clear() { @@ -497,17 +497,15 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - w.dispose(); synchronized (this) { buffer = null; } actual.onError(t); + w.dispose(); } @Override public void onComplete() { - w.dispose(); - U b; synchronized (this) { b = buffer; @@ -519,6 +517,8 @@ public void onComplete() { if (enter()) { QueueDrainHelper.drainMaxLoop(queue, actual, false, this, this); } + + w.dispose(); } @Override @@ -543,11 +543,11 @@ public void cancel() { @Override public void dispose() { - w.dispose(); synchronized (this) { buffer = null; } s.cancel(); + w.dispose(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java index 10bd0263c9..7730144753 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java @@ -108,8 +108,8 @@ public void onError(Throwable t) { return; } done = true; - DisposableHelper.dispose(timer); actual.onError(t); + worker.dispose(); } @Override @@ -127,8 +127,8 @@ public void onComplete() { de.emit(); } DisposableHelper.dispose(timer); - worker.dispose(); actual.onComplete(); + worker.dispose(); } } @@ -141,9 +141,8 @@ public void request(long n) { @Override public void cancel() { - DisposableHelper.dispose(timer); - worker.dispose(); s.cancel(); + worker.dispose(); } void emit(long idx, T t, DebounceEmitter emitter) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java index 2230a8bda7..d46aaad2dd 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java @@ -121,8 +121,8 @@ public void request(long n) { @Override public void cancel() { - w.dispose(); s.cancel(); + w.dispose(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java index 18fba29f40..43e4dd2e9e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java @@ -123,8 +123,8 @@ public void onError(Throwable t) { return; } done = true; - DisposableHelper.dispose(timer); actual.onError(t); + worker.dispose(); } @Override @@ -133,9 +133,8 @@ public void onComplete() { return; } done = true; - DisposableHelper.dispose(timer); - worker.dispose(); actual.onComplete(); + worker.dispose(); } @Override @@ -147,9 +146,8 @@ public void request(long n) { @Override public void cancel() { - DisposableHelper.dispose(timer); - worker.dispose(); s.cancel(); + worker.dispose(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index 78afe74da0..9619e245a1 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -154,9 +154,8 @@ public void onError(Throwable t) { return; } done = true; - worker.dispose(); - DisposableHelper.dispose(timer); arbiter.onError(t, s); + worker.dispose(); } @Override @@ -165,16 +164,14 @@ public void onComplete() { return; } done = true; - worker.dispose(); - DisposableHelper.dispose(timer); arbiter.onComplete(s); + worker.dispose(); } @Override public void dispose() { - worker.dispose(); - DisposableHelper.dispose(timer); s.cancel(); + worker.dispose(); } @Override @@ -256,9 +253,9 @@ public void onError(Throwable t) { return; } done = true; - dispose(); actual.onError(t); + worker.dispose(); } @Override @@ -267,16 +264,15 @@ public void onComplete() { return; } done = true; - dispose(); actual.onComplete(); + worker.dispose(); } @Override public void dispose() { - worker.dispose(); - DisposableHelper.dispose(timer); s.cancel(); + worker.dispose(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 9fbb043127..204c09e1d3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -159,8 +159,8 @@ public void onError(Throwable t) { drainLoop(); } - dispose(); actual.onError(t); + dispose(); } @Override @@ -170,8 +170,8 @@ public void onComplete() { drainLoop(); } - dispose(); actual.onComplete(); + dispose(); } @Override @@ -396,8 +396,8 @@ public void onNext(T t) { } else { window = null; s.cancel(); - dispose(); actual.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); + dispose(); return; } } else { @@ -424,8 +424,8 @@ public void onError(Throwable t) { drainLoop(); } - dispose(); actual.onError(t); + dispose(); } @Override @@ -435,8 +435,8 @@ public void onComplete() { drainLoop(); } - dispose(); actual.onComplete(); + dispose(); } @Override @@ -479,13 +479,13 @@ void drainLoop() { if (d && (empty || isHolder)) { window = null; q.clear(); - dispose(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + dispose(); return; } @@ -509,8 +509,8 @@ void drainLoop() { window = null; queue.clear(); s.cancel(); - dispose(); a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); + dispose(); return; } } @@ -550,8 +550,8 @@ void drainLoop() { } else { window = null; s.cancel(); - dispose(); actual.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); + dispose(); return; } } else { @@ -683,8 +683,8 @@ public void onError(Throwable t) { drainLoop(); } - dispose(); actual.onError(t); + dispose(); } @Override @@ -694,8 +694,8 @@ public void onComplete() { drainLoop(); } - dispose(); actual.onComplete(); + dispose(); } @Override @@ -747,7 +747,6 @@ void drainLoop() { if (d && (empty || sw)) { q.clear(); - dispose(); Throwable e = error; if (e != null) { for (UnicastProcessor w : ws) { @@ -759,6 +758,7 @@ void drainLoop() { } } ws.clear(); + dispose(); return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java index bd9f3cd4be..10f1a2aa34 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java @@ -143,16 +143,15 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - DisposableHelper.dispose(timer); synchronized (this) { buffer = null; } actual.onError(t); + DisposableHelper.dispose(timer); } @Override public void onComplete() { - DisposableHelper.dispose(timer); U b; synchronized (this) { b = buffer; @@ -165,6 +164,7 @@ public void onComplete() { QueueDrainHelper.drainLoop(queue, actual, false, this, this); } } + DisposableHelper.dispose(timer); } @Override @@ -186,8 +186,8 @@ public void run() { next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - dispose(); actual.onError(e); + dispose(); return; } @@ -249,9 +249,9 @@ public void onSubscribe(Disposable s) { b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - w.dispose(); s.dispose(); EmptyDisposable.error(e, actual); + w.dispose(); return; } @@ -286,9 +286,9 @@ public void onNext(T t) { @Override public void onError(Throwable t) { done = true; - w.dispose(); clear(); actual.onError(t); + w.dispose(); } @Override @@ -312,9 +312,9 @@ public void onComplete() { public void dispose() { if (!cancelled) { cancelled = true; - w.dispose(); clear(); s.dispose(); + w.dispose(); } } @@ -340,8 +340,8 @@ public void run() { b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - dispose(); actual.onError(e); + dispose(); return; } @@ -414,9 +414,9 @@ public void onSubscribe(Disposable s) { b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - w.dispose(); s.dispose(); EmptyDisposable.error(e, actual); + w.dispose(); return; } @@ -457,8 +457,8 @@ public void onNext(T t) { b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - dispose(); actual.onError(e); + dispose(); return; } @@ -478,11 +478,11 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - w.dispose(); synchronized (this) { buffer = null; } actual.onError(t); + w.dispose(); } @Override @@ -512,11 +512,11 @@ public void accept(Observer a, U v) { public void dispose() { if (!cancelled) { cancelled = true; + s.dispose(); w.dispose(); synchronized (this) { buffer = null; } - s.dispose(); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java index 0a3a9722ed..7cdd59e3bb 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java @@ -101,8 +101,8 @@ public void onError(Throwable t) { return; } done = true; - DisposableHelper.dispose(timer); actual.onError(t); + worker.dispose(); } @Override @@ -119,22 +119,20 @@ public void onComplete() { if (de != null) { de.run(); } - DisposableHelper.dispose(timer); - worker.dispose(); actual.onComplete(); + worker.dispose(); } } @Override public void dispose() { - DisposableHelper.dispose(timer); - worker.dispose(); s.dispose(); + worker.dispose(); } @Override public boolean isDisposed() { - return timer.get() == DisposableHelper.DISPOSED; + return worker.isDisposed(); } void emit(long idx, T t, DebounceEmitter emitter) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java index 6cd14d5805..4470883b96 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java @@ -116,8 +116,8 @@ public void run() { @Override public void dispose() { - w.dispose(); s.dispose(); + w.dispose(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java index 08443bbd2f..b54b671a86 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java @@ -184,6 +184,7 @@ void drainNormal() { s.dispose(); q.clear(); a.onError(ex); + worker.dispose(); return; } boolean empty = v == null; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java index a9c2729582..f2512180da 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java @@ -102,8 +102,8 @@ public void onError(Throwable t) { RxJavaPlugins.onError(t); } else { done = true; - DisposableHelper.dispose(this); actual.onError(t); + worker.dispose(); } } @@ -111,22 +111,20 @@ public void onError(Throwable t) { public void onComplete() { if (!done) { done = true; - DisposableHelper.dispose(this); - worker.dispose(); actual.onComplete(); + worker.dispose(); } } @Override public void dispose() { - DisposableHelper.dispose(this); - worker.dispose(); s.dispose(); + worker.dispose(); } @Override public boolean isDisposed() { - return DisposableHelper.isDisposed(get()); + return worker.isDisposed(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java index 9997bf2a2c..854b40a295 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java @@ -154,9 +154,8 @@ public void onError(Throwable t) { return; } done = true; - worker.dispose(); - DisposableHelper.dispose(this); arbiter.onError(t, s); + worker.dispose(); } @Override @@ -165,21 +164,19 @@ public void onComplete() { return; } done = true; - worker.dispose(); - DisposableHelper.dispose(this); arbiter.onComplete(s); + worker.dispose(); } @Override public void dispose() { - worker.dispose(); - DisposableHelper.dispose(this); s.dispose(); + worker.dispose(); } @Override public boolean isDisposed() { - return DisposableHelper.isDisposed(get()); + return worker.isDisposed(); } } @@ -241,8 +238,8 @@ void scheduleTimeout(final long idx) { public void run() { if (idx == index) { done = true; - DisposableHelper.dispose(TimeoutTimedObserver.this); s.dispose(); + DisposableHelper.dispose(TimeoutTimedObserver.this); actual.onError(new TimeoutException()); @@ -262,9 +259,9 @@ public void onError(Throwable t) { return; } done = true; - dispose(); actual.onError(t); + dispose(); } @Override @@ -273,21 +270,20 @@ public void onComplete() { return; } done = true; - dispose(); actual.onComplete(); + dispose(); } @Override public void dispose() { - worker.dispose(); - DisposableHelper.dispose(this); s.dispose(); + worker.dispose(); } @Override public boolean isDisposed() { - return DisposableHelper.isDisposed(get()); + return worker.isDisposed(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index ec17e41ddb..fb4e219b5d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -367,8 +367,8 @@ public void onError(Throwable t) { drainLoop(); } - disposeTimer(); actual.onError(t); + disposeTimer(); } @Override @@ -378,8 +378,8 @@ public void onComplete() { drainLoop(); } - disposeTimer(); actual.onComplete(); + disposeTimer(); } @Override @@ -588,8 +588,8 @@ public void onError(Throwable t) { drainLoop(); } - disposeWorker(); actual.onError(t); + disposeWorker(); } @Override @@ -599,8 +599,8 @@ public void onComplete() { drainLoop(); } - disposeWorker(); actual.onComplete(); + disposeWorker(); } @Override @@ -652,7 +652,6 @@ void drainLoop() { if (d && (empty || sw)) { q.clear(); - disposeWorker(); Throwable e = error; if (e != null) { for (UnicastSubject w : ws) { @@ -663,6 +662,7 @@ void drainLoop() { w.onComplete(); } } + disposeWorker(); ws.clear(); return; } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java index d8190ec30a..826e3c7d9f 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java @@ -160,8 +160,8 @@ public void dispose() { // complete the actionQueue when worker is unsubscribed to make // room for the next worker in the workerQueue. if (unsubscribed.compareAndSet(false, true)) { - actualWorker.dispose(); actionProcessor.onComplete(); + actualWorker.dispose(); } }