diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index 12a9acc060..103f40ca60 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -68,7 +68,7 @@ static final class TimeoutTimedOtherSubscriber implements FlowableSubscriber< final FullArbiter arbiter; - final AtomicReference timer = new AtomicReference(); + Disposable timer; volatile long index; @@ -110,16 +110,11 @@ public void onNext(T t) { } void scheduleTimeout(final long idx) { - Disposable d = timer.get(); - if (d != null) { - d.dispose(); + if (timer != null) { + timer.dispose(); } - if (timer.compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new TimeoutTask(idx), timeout, unit); - - DisposableHelper.replace(timer, d); - } + timer = worker.schedule(new TimeoutTask(idx), timeout, unit); } void subscribeNext() { @@ -170,11 +165,10 @@ public void run() { if (idx == index) { done = true; s.cancel(); - DisposableHelper.dispose(timer); + worker.dispose(); subscribeNext(); - worker.dispose(); } } } @@ -188,7 +182,7 @@ static final class TimeoutTimedSubscriber implements FlowableSubscriber, D Subscription s; - final AtomicReference timer = new AtomicReference(); + Disposable timer; volatile long index; @@ -224,16 +218,11 @@ public void onNext(T t) { } void scheduleTimeout(final long idx) { - Disposable d = timer.get(); - if (d != null) { - d.dispose(); + if (timer != null) { + timer.dispose(); } - if (timer.compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(new TimeoutTask(idx), timeout, unit); - - DisposableHelper.replace(timer, d); - } + timer = worker.schedule(new TimeoutTask(idx), timeout, unit); } @Override