diff --git a/src/main/java/io/reactivex/internal/observers/ResumeSingleObserver.java b/src/main/java/io/reactivex/internal/observers/ResumeSingleObserver.java new file mode 100644 index 0000000000..599d83d666 --- /dev/null +++ b/src/main/java/io/reactivex/internal/observers/ResumeSingleObserver.java @@ -0,0 +1,53 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.SingleObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * A SingleObserver implementation used for subscribing to the actual SingleSource + * and replace the current Disposable in a parent AtomicReference. + * + * @param the value type + */ +public final class ResumeSingleObserver implements SingleObserver { + + final AtomicReference parent; + + final SingleObserver actual; + + public ResumeSingleObserver(AtomicReference parent, SingleObserver actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } +} \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDisposeOn.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDisposeOn.java index 89aa2b094e..3625a230db 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDisposeOn.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDisposeOn.java @@ -14,7 +14,9 @@ package io.reactivex.internal.operators.completable; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class CompletableDisposeOn extends Completable { @@ -29,34 +31,65 @@ public CompletableDisposeOn(CompletableSource source, Scheduler scheduler) { @Override protected void subscribeActual(final CompletableObserver s) { - source.subscribe(new CompletableObserver() { + source.subscribe(new CompletableObserverImplementation(s, scheduler)); + } + + static final class CompletableObserverImplementation implements CompletableObserver, Disposable, Runnable { + final CompletableObserver s; + + final Scheduler scheduler; + + Disposable d; + + volatile boolean disposed; - @Override - public void onComplete() { - s.onComplete(); + CompletableObserverImplementation(CompletableObserver s, Scheduler scheduler) { + this.s = s; + this.scheduler = scheduler; + } + + @Override + public void onComplete() { + if (disposed) { + return; } + s.onComplete(); + } - @Override - public void onError(Throwable e) { - s.onError(e); + @Override + public void onError(Throwable e) { + if (disposed) { + RxJavaPlugins.onError(e); + return; } + s.onError(e); + } + + @Override + public void onSubscribe(final Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; - @Override - public void onSubscribe(final Disposable d) { - s.onSubscribe(Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - d.dispose(); - } - }); - } - })); + s.onSubscribe(this); } + } + + @Override + public void dispose() { + disposed = true; + scheduler.scheduleDirect(this); + } + + @Override + public boolean isDisposed() { + return disposed; + } - }); + @Override + public void run() { + d.dispose(); + d = DisposableHelper.DISPOSED; + } } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java index ba06409fca..4c86176b5e 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDoOnEvent.java @@ -52,7 +52,7 @@ public void onError(Throwable e) { onEvent.accept(e); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - e = new CompositeException(ex, e); + e = new CompositeException(e, ex); } s.onError(e); diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java index 883d1fcdfd..c55ad1fd24 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java @@ -37,6 +37,7 @@ public CompletableResumeNext(CompletableSource source, protected void subscribeActual(final CompletableObserver s) { final SequentialDisposable sd = new SequentialDisposable(); + s.onSubscribe(sd); source.subscribe(new CompletableObserver() { @Override diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java index b77183be22..2f66075f7f 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableSubscribeOn.java @@ -32,15 +32,10 @@ public CompletableSubscribeOn(CompletableSource source, Scheduler scheduler) { @Override protected void subscribeActual(final CompletableObserver s) { - final SubscribeOnObserver parent = new SubscribeOnObserver(s); + final SubscribeOnObserver parent = new SubscribeOnObserver(s, source); s.onSubscribe(parent); - Disposable f = scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - source.subscribe(parent); - } - }); + Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); @@ -48,7 +43,7 @@ public void run() { static final class SubscribeOnObserver extends AtomicReference - implements CompletableObserver, Disposable { + implements CompletableObserver, Disposable, Runnable { private static final long serialVersionUID = 7000911171163930287L; @@ -56,11 +51,19 @@ static final class SubscribeOnObserver final SequentialDisposable task; - SubscribeOnObserver(CompletableObserver actual) { + final CompletableSource source; + + SubscribeOnObserver(CompletableObserver actual, CompletableSource source) { this.actual = actual; + this.source = source; this.task = new SequentialDisposable(); } + @Override + public void run() { + source.subscribe(this); + } + @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); @@ -84,7 +87,7 @@ public void dispose() { @Override public boolean isDisposed() { - return DisposableHelper.isDisposed(this); + return DisposableHelper.isDisposed(get()); } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java index dff076cc59..c1bc6d1f2d 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java @@ -14,13 +14,13 @@ package io.reactivex.internal.operators.completable; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -41,103 +41,156 @@ public CompletableUsing(Callable resourceSupplier, } - @Override - protected void subscribeActual(final CompletableObserver s) { - final R resource; // NOPMD + protected void subscribeActual(CompletableObserver observer) { + R resource; try { resource = resourceSupplier.call(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - EmptyDisposable.error(e, s); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); return; } - CompletableSource cs; + CompletableSource source; try { - cs = ObjectHelper.requireNonNull(completableFunction.apply(resource), "The completableFunction returned a null Completable"); - } catch (Throwable e) { - try { - disposer.accept(resource); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - e = new CompositeException(e, ex); + source = ObjectHelper.requireNonNull(completableFunction.apply(resource), "The completableFunction returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + if (eager) { + try { + disposer.accept(resource); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + EmptyDisposable.error(new CompositeException(ex, exc), observer); + return; + } } - EmptyDisposable.error(e, s); + EmptyDisposable.error(ex, observer); + + if (!eager) { + try { + disposer.accept(resource); + } catch (Throwable exc) { + Exceptions.throwIfFatal(exc); + RxJavaPlugins.onError(exc); + } + } return; } - final AtomicBoolean once = new AtomicBoolean(); + source.subscribe(new UsingObserver(observer, resource, disposer, eager)); + } - cs.subscribe(new CompletableObserver() { - Disposable d; - void disposeThis() { - d.dispose(); - if (once.compareAndSet(false, true)) { - try { - disposer.accept(resource); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } + static final class UsingObserver + extends AtomicReference + implements CompletableObserver, Disposable { + + + private static final long serialVersionUID = -674404550052917487L; + + final CompletableObserver actual; + + final Consumer disposer; + + final boolean eager; + + Disposable d; + + UsingObserver(CompletableObserver actual, R resource, Consumer disposer, boolean eager) { + super(resource); + this.actual = actual; + this.disposer = disposer; + this.eager = eager; + } + + @Override + public void dispose() { + d.dispose(); + d = DisposableHelper.DISPOSED; + disposeResourceAfter(); + } + + @SuppressWarnings("unchecked") + void disposeResourceAfter() { + Object resource = getAndSet(this); + if (resource != this) { + try { + disposer.accept((R)resource); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); } } + } - @Override - public void onComplete() { - if (eager) { - if (once.compareAndSet(false, true)) { - try { - disposer.accept(resource); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.onError(ex); - return; - } - } - } + @Override + public boolean isDisposed() { + return d.isDisposed(); + } - s.onComplete(); + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; - if (!eager) { - disposeThis(); - } + actual.onSubscribe(this); } + } - @Override - public void onError(Throwable e) { - if (eager) { - if (once.compareAndSet(false, true)) { - try { - disposer.accept(resource); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - e = new CompositeException(ex, e); - } + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable e) { + d = DisposableHelper.DISPOSED; + if (eager) { + Object resource = getAndSet(this); + if (resource != this) { + try { + disposer.accept((R)resource); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); } + } else { + return; } + } - s.onError(e); + actual.onError(e); - if (!eager) { - disposeThis(); - } + if (!eager) { + disposeResourceAfter(); } + } - @Override - public void onSubscribe(Disposable d) { - this.d = d; - s.onSubscribe(Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - disposeThis(); + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + d = DisposableHelper.DISPOSED; + if (eager) { + Object resource = getAndSet(this); + if (resource != this) { + try { + disposer.accept((R)resource); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(ex); + return; } - })); + } else { + return; + } } - }); + + actual.onComplete(); + + if (!eager) { + disposeResourceAfter(); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromRunnable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromRunnable.java index f6009cdf9e..dfd95abfea 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromRunnable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromRunnable.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.maybe; +import java.util.concurrent.Callable; + import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.Exceptions; @@ -23,7 +25,7 @@ * * @param the value type */ -public final class MaybeFromRunnable extends Maybe { +public final class MaybeFromRunnable extends Maybe implements Callable { final Runnable runnable; @@ -55,4 +57,10 @@ protected void subscribeActual(MaybeObserver observer) { } } } + + @Override + public T call() throws Exception { + runnable.run(); + return null; + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java index d6233944f7..13ab0f4011 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java @@ -59,20 +59,21 @@ protected void subscribeActual(final SingleObserver s) { count = sources.length; } - final AtomicBoolean once = new AtomicBoolean(); final CompositeDisposable set = new CompositeDisposable(); + + AmbSingleObserver shared = new AmbSingleObserver(s, set); s.onSubscribe(set); for (int i = 0; i < count; i++) { SingleSource s1 = sources[i]; - if (once.get()) { + if (shared.get()) { return; } if (s1 == null) { set.dispose(); Throwable e = new NullPointerException("One of the sources is null"); - if (once.compareAndSet(false, true)) { + if (shared.compareAndSet(false, true)) { s.onError(e); } else { RxJavaPlugins.onError(e); @@ -80,32 +81,44 @@ protected void subscribeActual(final SingleObserver s) { return; } - s1.subscribe(new SingleObserver() { + s1.subscribe(shared); + } + } - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } + static final class AmbSingleObserver extends AtomicBoolean implements SingleObserver { - @Override - public void onSuccess(T value) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onSuccess(value); - } - } + private static final long serialVersionUID = -1944085461036028108L; - @Override - public void onError(Throwable e) { - if (once.compareAndSet(false, true)) { - set.dispose(); - s.onError(e); - } else { - RxJavaPlugins.onError(e); - } - } + final CompositeDisposable set; - }); + final SingleObserver s; + + AmbSingleObserver(SingleObserver s, CompositeDisposable set) { + this.s = s; + this.set = set; + } + + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } + + @Override + public void onSuccess(T value) { + if (compareAndSet(false, true)) { + set.dispose(); + s.onSuccess(value); + } + } + + @Override + public void onError(Throwable e) { + if (compareAndSet(false, true)) { + set.dispose(); + s.onError(e); + } else { + RxJavaPlugins.onError(e); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java index 538580c510..8726b03d67 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java @@ -18,6 +18,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.observers.ResumeSingleObserver; public final class SingleDelayWithCompletable extends Single { @@ -66,7 +67,7 @@ public void onError(Throwable e) { @Override public void onComplete() { - source.subscribe(new DelayWithMainObserver(this, actual)); + source.subscribe(new ResumeSingleObserver(this, actual)); } @Override @@ -79,31 +80,4 @@ public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } - - static final class DelayWithMainObserver implements SingleObserver { - - final AtomicReference parent; - - final SingleObserver actual; - - DelayWithMainObserver(AtomicReference parent, SingleObserver actual) { - this.parent = parent; - this.actual = actual; - } - - @Override - public void onSubscribe(Disposable d) { - DisposableHelper.replace(parent, d); - } - - @Override - public void onSuccess(T value) { - actual.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - actual.onError(e); - } - } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithObservable.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithObservable.java index 65d714cad0..68573259ed 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithObservable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithObservable.java @@ -18,7 +18,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.operators.single.SingleDelayWithCompletable.DelayWithMainObserver; +import io.reactivex.internal.observers.ResumeSingleObserver; import io.reactivex.plugins.RxJavaPlugins; public final class SingleDelayWithObservable extends Single { @@ -85,7 +85,7 @@ public void onComplete() { return; } done = true; - source.subscribe(new DelayWithMainObserver(this, actual)); + source.subscribe(new ResumeSingleObserver(this, actual)); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithPublisher.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithPublisher.java index bfc9169762..cb35b7c547 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithPublisher.java @@ -20,7 +20,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.operators.single.SingleDelayWithCompletable.DelayWithMainObserver; +import io.reactivex.internal.observers.ResumeSingleObserver; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -93,7 +93,7 @@ public void onComplete() { return; } done = true; - source.subscribe(new DelayWithMainObserver(this, actual)); + source.subscribe(new ResumeSingleObserver(this, actual)); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithSingle.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithSingle.java index 966c176093..cf6fa4f021 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithSingle.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelayWithSingle.java @@ -18,7 +18,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.operators.single.SingleDelayWithCompletable.DelayWithMainObserver; +import io.reactivex.internal.observers.ResumeSingleObserver; public final class SingleDelayWithSingle extends Single { @@ -62,7 +62,7 @@ public void onSubscribe(Disposable d) { @Override public void onSuccess(U value) { - source.subscribe(new DelayWithMainObserver(this, actual)); + source.subscribe(new ResumeSingleObserver(this, actual)); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java index bc1a8daa3b..8bbb183e6c 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnError.java @@ -49,7 +49,7 @@ public void onError(Throwable e) { onError.accept(e); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - e = new CompositeException(ex, e); + e = new CompositeException(e, ex); } s.onError(e); } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java index 163a4a34a2..3434f882cd 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoOnEvent.java @@ -59,7 +59,7 @@ public void onError(Throwable e) { onEvent.accept(null, e); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - e = new CompositeException(ex, e); + e = new CompositeException(e, ex); } s.onError(e); } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java b/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java index ee3022d7b2..beb6404f3c 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java @@ -67,6 +67,7 @@ public void onError(Throwable e) { return; } if (count.compareAndSet(state, 2)) { + set.dispose(); s.onError(e); return; } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java index 3e8c2b3602..fbac47eb9d 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java @@ -153,43 +153,8 @@ void drain() { long e = 0L; if (r == Long.MAX_VALUE) { - for (;;) { - if (cancelled) { - return; - } - - R v; - - try { - v = iter.next(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - a.onError(ex); - return; - } - - a.onNext(v); - - if (cancelled) { - return; - } - - - boolean b; - - try { - b = iter.hasNext(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - a.onError(ex); - return; - } - - if (!b) { - a.onComplete(); - return; - } - } + slowPath(a, iter); + return; } while (e != r) { @@ -247,6 +212,46 @@ void drain() { } } + void slowPath(Subscriber a, Iterator iter) { + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + } + @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java b/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java index d93160969e..6dbc41a41a 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleObserveOn.java @@ -13,8 +13,11 @@ package io.reactivex.internal.operators.single; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; public final class SingleObserveOn extends Single { @@ -29,38 +32,64 @@ public SingleObserveOn(SingleSource source, Scheduler scheduler) { @Override protected void subscribeActual(final SingleObserver s) { + source.subscribe(new ObserveOnSingleObserver(s, scheduler)); + } - final CompositeDisposable mad = new CompositeDisposable(); - s.onSubscribe(mad); + static final class ObserveOnSingleObserver extends AtomicReference + implements SingleObserver, Disposable, Runnable { + private static final long serialVersionUID = 3528003840217436037L; - source.subscribe(new SingleObserver() { + final SingleObserver actual; - @Override - public void onError(final Throwable e) { - mad.add(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onError(e); - } - })); - } + final Scheduler scheduler; + + T value; + Throwable error; + + ObserveOnSingleObserver(SingleObserver actual, Scheduler scheduler) { + this.actual = actual; + this.scheduler = scheduler; + } - @Override - public void onSubscribe(Disposable d) { - mad.add(d); + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); } + } - @Override - public void onSuccess(final T value) { - mad.add(scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - s.onSuccess(value); - } - })); + @Override + public void onSuccess(T value) { + this.value = value; + Disposable d = scheduler.scheduleDirect(this); + DisposableHelper.replace(this, d); + } + + @Override + public void onError(Throwable e) { + this.error = e; + Disposable d = scheduler.scheduleDirect(this); + DisposableHelper.replace(this, d); + } + + @Override + public void run() { + Throwable ex = error; + if (ex != null) { + actual.onError(ex); + } else { + actual.onSuccess(value); } + } - }); - } + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java b/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java index 1ebc69dc0f..c8d52a2cb4 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleOnErrorReturn.java @@ -48,7 +48,7 @@ public void onError(Throwable e) { v = valueSupplier.apply(e); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - s.onError(new CompositeException(ex, e)); + s.onError(new CompositeException(e, ex)); return; } } else { diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java b/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java index 52c957ceaa..0b0730f387 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleResumeNext.java @@ -13,11 +13,15 @@ package io.reactivex.internal.operators.single; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.SequentialDisposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.observers.ResumeSingleObserver; public final class SingleResumeNext extends Single { final SingleSource source; @@ -32,62 +36,58 @@ public SingleResumeNext(SingleSource source, @Override protected void subscribeActual(final SingleObserver s) { + source.subscribe(new ResumeMainSingleObserver(s, nextFunction)); + } - final SequentialDisposable sd = new SequentialDisposable(); - s.onSubscribe(sd); + static final class ResumeMainSingleObserver extends AtomicReference + implements SingleObserver, Disposable { + private static final long serialVersionUID = -5314538511045349925L; - source.subscribe(new SingleObserver() { + final SingleObserver actual; - @Override - public void onSubscribe(Disposable d) { - sd.replace(d); - } + final Function> nextFunction; - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } + public ResumeMainSingleObserver(SingleObserver actual, + Function> nextFunction) { + this.actual = actual; + this.nextFunction = nextFunction; + } - @Override - public void onError(Throwable e) { - SingleSource next; - - try { - next = nextFunction.apply(e); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.onError(new CompositeException(ex, e)); - return; - } - - if (next == null) { - NullPointerException npe = new NullPointerException("The next Single supplied was null"); - npe.initCause(e); - s.onError(npe); - return; - } - - next.subscribe(new SingleObserver() { - - @Override - public void onSubscribe(Disposable d) { - sd.replace(d); - } - - @Override - public void onSuccess(T value) { - s.onSuccess(value); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - }); + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + SingleSource source; + + try { + source = ObjectHelper.requireNonNull(nextFunction.apply(e), "The nextFunction returned a null SingleSource."); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + actual.onError(new CompositeException(e, ex)); + return; } - }); - } + source.subscribe(new ResumeSingleObserver(this, actual)); + } + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java index 9a9b6eae4f..56e1f22658 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleSubscribeOn.java @@ -31,16 +31,10 @@ public SingleSubscribeOn(SingleSource source, Scheduler scheduler) @Override protected void subscribeActual(final SingleObserver s) { - - final SubscribeOnObserver parent = new SubscribeOnObserver(s); + final SubscribeOnObserver parent = new SubscribeOnObserver(s, source); s.onSubscribe(parent); - Disposable f = scheduler.scheduleDirect(new Runnable() { - @Override - public void run() { - source.subscribe(parent); - } - }); + Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); @@ -48,7 +42,7 @@ public void run() { static final class SubscribeOnObserver extends AtomicReference - implements SingleObserver, Disposable { + implements SingleObserver, Disposable, Runnable { private static final long serialVersionUID = 7000911171163930287L; @@ -56,8 +50,11 @@ static final class SubscribeOnObserver final SequentialDisposable task; - SubscribeOnObserver(SingleObserver actual) { + final SingleSource source; + + SubscribeOnObserver(SingleObserver actual, SingleSource source) { this.actual = actual; + this.source = source; this.task = new SequentialDisposable(); } @@ -84,7 +81,12 @@ public void dispose() { @Override public boolean isDisposed() { - return DisposableHelper.isDisposed(this); + return DisposableHelper.isDisposed(get()); + } + + @Override + public void run() { + source.subscribe(this); } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java index 7e297cc9aa..7433309780 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java @@ -127,7 +127,7 @@ void otherError(Throwable e) { static final class TakeUntilOtherSubscriber extends AtomicReference - implements Subscriber, Disposable { + implements Subscriber { private static final long serialVersionUID = 5170026210238877381L; @@ -161,14 +161,8 @@ public void onComplete() { parent.otherError(new CancellationException()); } - @Override public void dispose() { SubscriptionHelper.cancel(this); } - - @Override - public boolean isDisposed() { - return SubscriptionHelper.isCancelled(get()); - } } } diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 07ff9c8b99..8efd219c2d 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -1063,6 +1063,60 @@ protected void subscribeActual(SingleObserver observer) { } } + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeSingleToObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Single source = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + /** * Check if the given transformed reactive type reports multiple onSubscribe calls to * RxJavaPlugins. diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java index fa12144bac..6f0efc7b4f 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java @@ -13,7 +13,9 @@ package io.reactivex.internal.operators.completable; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; + +import java.io.IOException; import org.junit.Test; @@ -96,4 +98,178 @@ public void subscribe(CompletableEmitter e) throws Exception { assertTrue(d.isDisposed()); } + + @Test + public void callbackThrows() { + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void onErrorNull() { + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + e.onError(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + e.onComplete(); + } + })); + } + + @Test + public void onErrorThrows() { + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + Disposable d = Disposables.empty(); + e.setDisposable(d); + + try { + e.onError(new IOException()); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + assertTrue(d.isDisposed()); + assertTrue(e.isDisposed()); + } + }).subscribe(new CompletableObserver() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onError(Throwable e) { + throw new TestException(); + } + + @Override + public void onComplete() { + + } + }); + } + + @Test + public void onCompleteThrows() { + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + Disposable d = Disposables.empty(); + e.setDisposable(d); + + try { + e.onComplete(); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + assertTrue(d.isDisposed()); + assertTrue(e.isDisposed()); + } + }).subscribe(new CompletableObserver() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + throw new TestException(); + } + }); + } + + @Test + public void onErrorThrows2() { + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + try { + e.onError(new IOException()); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + assertTrue(e.isDisposed()); + } + }).subscribe(new CompletableObserver() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onError(Throwable e) { + throw new TestException(); + } + + @Override + public void onComplete() { + + } + }); + } + + @Test + public void onCompleteThrows2() { + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + try { + e.onComplete(); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + assertTrue(e.isDisposed()); + } + }).subscribe(new CompletableObserver() { + + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + throw new TestException(); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableDisposeOnTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableDisposeOnTest.java new file mode 100644 index 0000000000..0a9a5a1aad --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableDisposeOnTest.java @@ -0,0 +1,140 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Action; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.TestScheduler; +import io.reactivex.subjects.PublishSubject; + +public class CompletableDisposeOnTest { + + @Test + public void cancelDelayed() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject ps = PublishSubject.create(); + + ps.ignoreElements() + .unsubscribeOn(scheduler) + .test() + .cancel(); + + assertTrue(ps.hasObservers()); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + assertFalse(ps.hasObservers()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().ignoreElements().unsubscribeOn(new TestScheduler())); + } + + @Test + public void completeAfterCancel() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.ignoreElements() + .unsubscribeOn(scheduler) + .test(); + + to.dispose(); + + ps.onComplete(); + + to.assertEmpty(); + } + + @Test + public void errorAfterCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.ignoreElements() + .unsubscribeOn(scheduler) + .test(); + + to.dispose(); + + ps.onError(new TestException()); + + to.assertEmpty(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void normal() { + TestScheduler scheduler = new TestScheduler(); + + final int[] call = { 0 }; + + Completable.complete() + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + call[0]++; + } + }) + .unsubscribeOn(scheduler) + .test() + .assertResult(); + + scheduler.triggerActions(); + + assertEquals(0, call[0]); + } + + @Test + public void error() { + TestScheduler scheduler = new TestScheduler(); + + final int[] call = { 0 }; + + Completable.error(new TestException()) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + call[0]++; + } + }) + .unsubscribeOn(scheduler) + .test() + .assertFailure(TestException.class); + + scheduler.triggerActions(); + + assertEquals(0, call[0]); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java new file mode 100644 index 0000000000..04205513a0 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java @@ -0,0 +1,55 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Consumer; +import io.reactivex.observers.TestObserver; + +public class CompletableDoOnTest { + + @Test + public void successAcceptThrows() { + Completable.complete().doOnEvent(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorAcceptThrows() { + TestObserver to = Completable.error(new TestException("Outer")).doOnEvent(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromPublisherTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromPublisherTest.java index 58d8460b1b..f1088b85cd 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableFromPublisherTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableFromPublisherTest.java @@ -13,10 +13,10 @@ package io.reactivex.internal.operators.completable; -import io.reactivex.Completable; -import io.reactivex.Flowable; import org.junit.Test; +import io.reactivex.*; + public class CompletableFromPublisherTest { @Test(expected = NullPointerException.class) public void fromPublisherNull() { @@ -43,4 +43,9 @@ public void fromPublisherThrows() { .test() .assertFailure(UnsupportedOperationException.class); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Completable.fromPublisher(Flowable.just(1))); + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableLiftTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableLiftTest.java new file mode 100644 index 0000000000..e8cca8d928 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableLiftTest.java @@ -0,0 +1,39 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; + +public class CompletableLiftTest { + + @Test + public void callbackThrows() { + try { + Completable.complete() + .lift(new CompletableOperator() { + @Override + public CompletableObserver apply(CompletableObserver o) throws Exception { + throw new TestException(); + } + }) + .test(); + } catch (NullPointerException ex) { + assertTrue(ex.toString(), ex.getCause() instanceof TestException); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java index 1247c1036d..565770ab05 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableMergeTest.java @@ -15,9 +15,16 @@ import static org.junit.Assert.*; +import java.util.List; + import org.junit.Test; import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; public class CompletableMergeTest { @Test @@ -30,4 +37,98 @@ public void invalidPrefetch() { } } + @Test + public void cancelAfterFirst() { + final TestObserver to = new TestObserver(); + + Completable.mergeArray(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + s.onComplete(); + to.cancel(); + } + }, Completable.complete()) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void cancelAfterFirstDelayError() { + final TestObserver to = new TestObserver(); + + Completable.mergeArrayDelayError(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + s.onComplete(); + to.cancel(); + } + }, Completable.complete()) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void onErrorAfterComplete() { + List errors = TestHelper.trackPluginErrors(); + try { + final CompletableObserver[] co = { null }; + + Completable.mergeArrayDelayError(Completable.complete(), new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + s.onComplete(); + co[0] = s; + } + }) + .test() + .assertResult(); + + co[0].onError(new TestException()); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void completeAfterMain() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.mergeArray(Completable.complete(), pp.ignoreElements()) + .test(); + + pp.onComplete(); + + to.assertResult(); + } + + @Test + public void completeAfterMainDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.mergeArrayDelayError(Completable.complete(), pp.ignoreElements()) + .test(); + + pp.onComplete(); + + to.assertResult(); + } + + @Test + public void errorAfterMainDelayError() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.mergeArrayDelayError(Completable.complete(), pp.ignoreElements()) + .test(); + + pp.onError(new TestException()); + + to.assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableOnErrorXTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableOnErrorXTest.java new file mode 100644 index 0000000000..56690dbafb --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableOnErrorXTest.java @@ -0,0 +1,48 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +public class CompletableOnErrorXTest { + + @Test + public void normalReturn() { + Completable.complete() + .onErrorComplete() + .test() + .assertResult(); + } + + @Test + public void normalResumeNext() { + final int[] call = { 0 }; + Completable.complete() + .onErrorResumeNext(new Function() { + @Override + public CompletableSource apply(Throwable e) throws Exception { + call[0]++; + return Completable.complete(); + } + }) + .test() + .assertResult(); + + assertEquals(0, call[0]); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableSubscribeOnTest.java index 6ee083286d..453b1d2f07 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableSubscribeOnTest.java @@ -21,9 +21,11 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.functions.Function; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; +import io.reactivex.subjects.PublishSubject; public class CompletableSubscribeOnTest { @@ -46,4 +48,19 @@ public void normal() { RxJavaPlugins.reset(); } } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().ignoreElements().subscribeOn(new TestScheduler())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletable(new Function() { + @Override + public CompletableSource apply(Completable c) throws Exception { + return c.subscribeOn(Schedulers.single()); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java index cd0e1f8d44..8e9ed3a29a 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java @@ -15,13 +15,16 @@ import static org.junit.Assert.*; +import java.util.List; import java.util.concurrent.*; import org.junit.Test; -import io.reactivex.Completable; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Action; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; import io.reactivex.subjects.PublishSubject; @@ -74,4 +77,73 @@ public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { assertFalse(subject.hasObservers()); } + @Test + public void otherErrors() { + Completable.never() + .timeout(1, TimeUnit.MILLISECONDS, Completable.error(new TestException())) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void mainSuccess() { + Completable.complete() + .timeout(1, TimeUnit.DAYS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void mainError() { + Completable.error(new TestException()) + .timeout(1, TimeUnit.DAYS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void errorTimeoutRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + final TestScheduler scheduler = new TestScheduler(); + + final PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps.ignoreElements() + .timeout(1, TimeUnit.MILLISECONDS, scheduler, Completable.complete()).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertTerminated(); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + + } finally { + RxJavaPlugins.reset(); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java new file mode 100644 index 0000000000..9db406f18b --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableTimerTest.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.schedulers.TestScheduler; + +public class CompletableTimerTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Completable.timer(1, TimeUnit.SECONDS, new TestScheduler())); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java new file mode 100644 index 0000000000..e8162e656a --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java @@ -0,0 +1,569 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +public class CompletableUsingTest { + + @Test + public void resourceSupplierThrows() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.complete(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorEager() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.error(new TestException()); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void emptyEager() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.complete(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, true) + .test() + .assertResult(); + } + + @Test + public void errorNonEager() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.error(new TestException()); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, false) + .test() + .assertFailure(TestException.class); + } + + @Test + public void emptyNonEager() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.complete(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, false) + .test() + .assertResult(); + } + + @Test + public void supplierCrashEager() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + throw new TestException(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void supplierCrashNonEager() { + + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + throw new TestException(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, false) + .test() + .assertFailure(TestException.class); + } + + @Test + public void supplierAndDisposerCrashEager() { + TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + throw new TestException("Main"); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException("Disposer"); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List list = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(list, 0, TestException.class, "Main"); + TestHelper.assertError(list, 1, TestException.class, "Disposer"); + } + + @Test + public void supplierAndDisposerCrashNonEager() { + List errors = TestHelper.trackPluginErrors(); + try { + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + throw new TestException("Main"); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException("Disposer"); + } + }, false) + .test() + .assertFailureAndMessage(TestException.class, "Main"); + + TestHelper.assertError(errors, 0, TestException.class, "Disposer"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void dispose() { + final int[] call = {0 }; + + TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.never(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + call[0]++; + } + }, false) + .test(); + + to.cancel(); + + assertEquals(1, call[0]); + } + + @Test + public void disposeCrashes() { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.never(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException(); + } + }, false) + .test(); + + to.cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.never(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, false)); + } + + @Test + public void justDisposerCrashes() { + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.complete(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException("Disposer"); + } + }, true) + .test() + .assertFailure(TestException.class); + } + + + @Test + public void emptyDisposerCrashes() { + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.complete(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException("Disposer"); + } + }, true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorDisposerCrash() { + TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.error(new TestException("Main")); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + throw new TestException("Disposer"); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List list = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(list, 0, TestException.class, "Main"); + TestHelper.assertError(list, 1, TestException.class, "Disposer"); + } + + @Test + public void doubleOnSubscribe() { + List errors = TestHelper.trackPluginErrors(); + try { + Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return Completable.wrap(new CompletableSource() { + @Override + public void subscribe(CompletableObserver s) { + Disposable d1 = Disposables.empty(); + + s.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + s.onSubscribe(d2); + + assertFalse(d1.isDisposed()); + + assertTrue(d2.isDisposed()); + } + }); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, false).test(); + TestHelper.assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void successDisposeRace() { + for (int i = 0; i < 500; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return ps.ignoreElements(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + } + }, true) + .test(); + + ps.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void errorDisposeRace() { + for (int i = 0; i < 500; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return ps.ignoreElements(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + } + }, true) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void emptyDisposeRace() { + for (int i = 0; i < 500; i++) { + + final PublishSubject ps = PublishSubject.create(); + + final TestObserver to = Completable.using(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }, new Function() { + @Override + public CompletableSource apply(Object v) throws Exception { + return ps.ignoreElements(); + } + }, new Consumer() { + @Override + public void accept(Object d) throws Exception { + + } + }, true) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromRunnableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromRunnableTest.java index e5c5d1460a..4fb5d6ba3d 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromRunnableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromRunnableTest.java @@ -13,11 +13,18 @@ package io.reactivex.internal.operators.maybe; -import io.reactivex.Maybe; +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; -import static org.junit.Assert.assertEquals; +import io.reactivex.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; public class MaybeFromRunnableTest { @Test(expected = NullPointerException.class) @@ -95,4 +102,62 @@ public void run() { .test() .assertFailure(UnsupportedOperationException.class); } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Exception { + final int[] counter = { 0 }; + + Maybe m = Maybe.fromRunnable(new Runnable() { + @Override + public void run() { + counter[0]++; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Callable); + + assertNull(((Callable)m).call()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Maybe.fromRunnable(new Runnable() { + @Override + public void run() { + cdl1.countDown(); + try { + cdl2.await(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.cancel(); + + cdl2.countDown(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertError(errors, 0, RuntimeException.class); + + assertTrue(errors.get(0).toString(), errors.get(0).getCause() instanceof InterruptedException); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingleTest.java new file mode 100644 index 0000000000..222b1c8b84 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingleTest.java @@ -0,0 +1,30 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.Maybe; +import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; + +public class MaybeIsEmptySingleTest { + + @Test + public void source() { + Maybe m = Maybe.just(1); + + assertSame(m, (((HasUpstreamMaybeSource)m.isEmpty()).source())); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeSubscribeOnTest.java new file mode 100644 index 0000000000..43ad69cfcf --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeSubscribeOnTest.java @@ -0,0 +1,27 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.schedulers.Schedulers; + +public class MaybeSubscribeOnTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1).subscribeOn(Schedulers.single())); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeToFlowableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToFlowableTest.java new file mode 100644 index 0000000000..b81b526272 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToFlowableTest.java @@ -0,0 +1,43 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.assertSame; + +import org.junit.Test; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; + +public class MaybeToFlowableTest { + + @Test + public void source() { + Maybe m = Maybe.just(1); + + assertSame(m, (((HasUpstreamMaybeSource)m.toFlowable()).source())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Maybe m) throws Exception { + return m.toFlowable(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeToObservableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToObservableTest.java new file mode 100644 index 0000000000..6b9fd12aba --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToObservableTest.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; + +public class MaybeToObservableTest { + + @Test + public void source() { + Maybe m = Maybe.just(1); + + assertSame(m, (((HasUpstreamMaybeSource)m.toObservable()).source())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Maybe m) throws Exception { + return m.toObservable(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java index 6f0062a794..9d0100a124 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java @@ -19,9 +19,13 @@ import org.junit.Test; -import io.reactivex.Single; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.*; public class SingleAmbTest { @Test @@ -119,4 +123,142 @@ public void ambArrayEmpty() { public void ambSingleSource() { assertSame(Single.never(), Single.ambArray(Single.never())); } + + @SuppressWarnings("unchecked") + @Test + public void error() { + Single.ambArray(Single.error(new TestException()), Single.just(1)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void nullSourceSuccessRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + + final Subject ps = ReplaySubject.create(); + ps.onNext(1); + + @SuppressWarnings("unchecked") + final Single source = Single.ambArray(ps.singleOrError(), Single.never(), Single.never(), null); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, NullPointerException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void multipleErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + + final Subject ps1 = PublishSubject.create(); + final Subject ps2 = PublishSubject.create(); + + Single.ambArray(ps1.singleOrError(), ps2.singleOrError()).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void successErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + + final Subject ps1 = PublishSubject.create(); + final Subject ps2 = PublishSubject.create(); + + Single.ambArray(ps1.singleOrError(), ps2.singleOrError()).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onNext(1); + ps1.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void manySources() { + Single[] sources = new Single[32]; + Arrays.fill(sources, Single.never()); + sources[31] = Single.just(31); + + Single.amb(Arrays.asList(sources)) + .test() + .assertResult(31); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleContainstTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleContainstTest.java new file mode 100644 index 0000000000..9e32bf2bef --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleContainstTest.java @@ -0,0 +1,44 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.Single; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.BiPredicate; + +public class SingleContainstTest { + + @Test + public void comparerThrows() { + Single.just(1) + .contains(2, new BiPredicate() { + @Override + public boolean test(Object a, Object b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void error() { + Single.error(new TestException()) + .contains(2) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java index 33484772fc..112ce428a4 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java @@ -13,7 +13,9 @@ package io.reactivex.internal.operators.single; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; + +import java.io.IOException; import org.junit.Test; @@ -103,4 +105,181 @@ public void subscribe(SingleEmitter e) throws Exception { public void unsafeCreate() { Single.unsafeCreate(Single.just(1)); } + + @Test + public void createCallbackThrows() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + s.onSuccess(1); + } + })); + } + + @Test + public void createNullSuccess() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + s.onSuccess(null); + s.onSuccess(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void createNullError() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + s.onError(null); + s.onError(null); + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void createConsumerThrows() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + try { + s.onSuccess(1); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + } + }) + .subscribe(new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onSuccess(Object value) { + throw new TestException(); + } + + @Override + public void onError(Throwable e) { + + } + }); + } + + @Test + public void createConsumerThrowsResource() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + Disposable d = Disposables.empty(); + s.setDisposable(d); + try { + s.onSuccess(1); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + assertTrue(d.isDisposed()); + } + }) + .subscribe(new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onSuccess(Object value) { + throw new TestException(); + } + + @Override + public void onError(Throwable e) { + + } + }); + } + + @Test + public void createConsumerThrowsOnError() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + try { + s.onError(new IOException()); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + } + }) + .subscribe(new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + } + + @Override + public void onSuccess(Object value) { + } + + @Override + public void onError(Throwable e) { + throw new TestException(); + } + }); + } + + @Test + public void createConsumerThrowsResourceOnError() { + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + Disposable d = Disposables.empty(); + s.setDisposable(d); + try { + s.onError(new IOException()); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + + assertTrue(d.isDisposed()); + } + }) + .subscribe(new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + + } + + @Override + public void onSuccess(Object value) { + } + + @Override + public void onError(Throwable e) { + throw new TestException(); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDelayTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDelayTest.java index 0ee7d826d4..1a50c8430f 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleDelayTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDelayTest.java @@ -13,21 +13,23 @@ package io.reactivex.internal.operators.single; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; -import io.reactivex.functions.Consumer; import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; +import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.BiConsumer; +import io.reactivex.functions.*; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; public class SingleDelayTest { @Test @@ -129,4 +131,93 @@ public void accept(Throwable throwable) throws Exception { assertNotEquals(Thread.currentThread(), thread.get()); } + @Test + public void withPublisherDispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().delaySubscription(Flowable.just(1))); + } + + @Test + public void withPublisherError() { + Single.just(1) + .delaySubscription(Flowable.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void withPublisherError2() { + List errors = TestHelper.trackPluginErrors(); + + try { + Single.just(1) + .delaySubscription(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException()); + } + }) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void withObservableDispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().delaySubscription(Observable.just(1))); + } + + @Test + public void withObservableError() { + Single.just(1) + .delaySubscription(Observable.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void withObservableError2() { + List errors = TestHelper.trackPluginErrors(); + + try { + Single.just(1) + .delaySubscription(new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + s.onNext(1); + s.onError(new TestException()); + } + }) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void withSingleErrors() { + Single.just(1) + .delaySubscription(Single.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void withSingleDispose() { + TestHelper.checkDisposed(Single.just(1).delaySubscription(Single.just(2))); + } + + @Test + public void withCompletableDispose() { + TestHelper.checkDisposed(Completable.complete().andThen(Single.just(1))); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java index 84f344278d..397b279226 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java @@ -21,9 +21,12 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; public class SingleDoOnTest { @@ -150,4 +153,178 @@ public void accept(Disposable s) throws Exception { } } + + @Test + public void onErrorSuccess() { + final int[] call = { 0 }; + + Single.just(1) + .doOnError(new Consumer() { + @Override + public void accept(Throwable v) throws Exception { + call[0]++; + } + }) + .test() + .assertResult(1); + + assertEquals(0, call[0]); + } + + @Test + public void onErrorCrashes() { + TestObserver to = Single.error(new TestException("Outer")) + .doOnError(new Consumer() { + @Override + public void accept(Throwable v) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void doOnEventThrowsSuccess() { + Single.just(1) + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doOnEventThrowsError() { + TestObserver to = Single.error(new TestException("Main")) + .doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Main"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void doOnDisposeDispose() { + final int[] calls = { 0 }; + TestHelper.checkDisposed(PublishSubject.create().singleOrError().doOnDispose(new Action() { + @Override + public void run() throws Exception { + calls[0]++; + } + })); + + assertEquals(1, calls[0]); + } + + @Test + public void doOnDisposeSuccess() { + final int[] calls = { 0 }; + + Single.just(1) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + calls[0]++; + } + }) + .test() + .assertResult(1); + + assertEquals(0, calls[0]); + } + + @Test + public void doOnDisposeError() { + final int[] calls = { 0 }; + + Single.error(new TestException()) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + calls[0]++; + } + }) + .test() + .assertFailure(TestException.class); + + assertEquals(0, calls[0]); + } + + @Test + public void doOnDisposeDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single s) throws Exception { + return s.doOnDispose(Functions.EMPTY_ACTION); + } + }); + } + + @Test + public void doOnDisposeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + PublishSubject ps = PublishSubject.create(); + + ps.singleOrError().doOnDispose(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doOnSuccessErrors() { + final int[] call = { 0 }; + + Single.error(new TestException()) + .doOnSuccess(new Consumer() { + @Override + public void accept(Object v) throws Exception { + call[0]++; + } + }) + .test() + .assertFailure(TestException.class); + + assertEquals(0, call[0]); + } + + @Test + public void doOnSuccessCrash() { + Single.just(1) + .doOnSuccess(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleEqualsTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleEqualsTest.java new file mode 100644 index 0000000000..bb864bfc54 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleEqualsTest.java @@ -0,0 +1,39 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; + +public class SingleEqualsTest { + + @Test + public void bothError() { + List errors = TestHelper.trackPluginErrors(); + try { + Single.equals(Single.error(new TestException("One")), Single.error(new TestException("Two"))) + .test() + .assertFailureAndMessage(TestException.class, "One"); + + TestHelper.assertError(errors, 0, TestException.class, "Two"); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleErrorTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleErrorTest.java new file mode 100644 index 0000000000..46646c855d --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleErrorTest.java @@ -0,0 +1,36 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.Callable; + +import org.junit.Test; + +import io.reactivex.Single; +import io.reactivex.exceptions.TestException; + +public class SingleErrorTest { + + @Test + public void errorCallableThrows() { + Single.error(new Callable() { + @Override + public Throwable call() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapCompletableTest.java new file mode 100644 index 0000000000..50f7a6116a --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapCompletableTest.java @@ -0,0 +1,32 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +public class SingleFlatMapCompletableTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1).flatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + return Completable.complete(); + } + })); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java index 1394a2d9ca..de63a285c9 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowableTest.java @@ -13,15 +13,21 @@ package io.reactivex.internal.operators.single; +import static org.junit.Assert.*; + import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.util.CrashingIterable; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; import io.reactivex.subscribers.*; public class SingleFlatMapIterableFlowableTest { @@ -187,4 +193,395 @@ public Iterable apply(Integer v) throws Exception { .test() .assertFailureAndMessage(TestException.class, "hasNext()", 0); } + + @Test + public void async1() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .hide() + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async2() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async3() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .take(500 * 1000) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async4() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .take(500 * 1000) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void fusedEmptyCheck() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }).subscribe(new Subscriber() { + QueueSubscription qd; + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription d) { + qd = (QueueSubscription)d; + + assertEquals(QueueSubscription.ASYNC, qd.requestFusion(QueueSubscription.ANY)); + } + + @Override + public void onNext(Integer value) { + assertFalse(qd.isEmpty()); + + qd.clear(); + + assertTrue(qd.isEmpty()); + + qd.cancel(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } + + @Test + public void hasNextThrowsUnbounded() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 2, 100); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()", 0); + } + + @Test + public void nextThrowsUnbounded() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 100, 1); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "next()"); + } + + @Test + public void hasNextThrows() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 2, 100); + } + }) + .test(2L) + .assertFailureAndMessage(TestException.class, "hasNext()", 0); + } + + @Test + public void nextThrows() { + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 100, 1); + } + }) + .test(2L) + .assertFailureAndMessage(TestException.class, "next()"); + } + + @Test + public void requestBefore() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + ps.singleElement().flattenAsFlowable( + new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }) + .test(5L) + .assertEmpty(); + } + } + + @Test + public void requestCreateInnerRace() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + ps.onNext(1); + + final TestSubscriber ts = ps.singleElement().flattenAsFlowable( + new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(a); + } + }) + .test(0L); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + for (int i = 0; i < 500; i++) { + ts.request(1); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 500; i++) { + ts.request(1); + } + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void cancelCreateInnerRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + ps.onNext(1); + + final TestSubscriber ts = ps.singleElement().flattenAsFlowable( + new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }) + .test(0L); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void slowPathCancelAfterHasNext() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + final TestSubscriber ts = new TestSubscriber(0L); + + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (count++ == 2) { + ts.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + }) + .subscribe(ts); + + ts.request(3); + ts.assertValues(1, 1).assertNoErrors().assertNotComplete(); + } + + @Test + public void fastPathCancelAfterHasNext() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + final TestSubscriber ts = new TestSubscriber(0L); + + Single.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (count++ == 2) { + ts.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + }) + .subscribe(ts); + + ts.request(Long.MAX_VALUE); + ts.assertValues(1, 1).assertNoErrors().assertNotComplete(); + } + + @Test + public void requestIteratorRace() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + final TestSubscriber ts = ps.singleOrError().flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(a); + } + }).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + ts.request(1); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onNext(1); + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java index e600b6b01e..c3c3c671d3 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservableTest.java @@ -13,16 +13,22 @@ package io.reactivex.internal.operators.single; +import static org.junit.Assert.*; + import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; -import io.reactivex.Single; +import io.reactivex.*; +import io.reactivex.Observer; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.internal.util.CrashingIterable; import io.reactivex.observers.*; +import io.reactivex.schedulers.Schedulers; public class SingleFlatMapIterableObservableTest { @@ -165,4 +171,151 @@ public Iterable apply(Integer v) throws Exception { .test() .assertFailureAndMessage(TestException.class, "hasNext()", 0); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Single o) throws Exception { + return o.flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Collections.singleton(1); + } + }); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Collections.singleton(1); + } + })); + } + + @Test + public void async1() { + Single.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .hide() + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async2() { + Single.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async3() { + Single.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .take(500 * 1000) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async4() { + Single.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .take(500 * 1000) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void fusedEmptyCheck() { + Single.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }).subscribe(new Observer() { + QueueDisposable qd; + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Disposable d) { + qd = (QueueDisposable)d; + + assertEquals(QueueDisposable.ASYNC, qd.requestFusion(QueueDisposable.ANY)); + } + + @Override + public void onNext(Integer value) { + assertFalse(qd.isEmpty()); + + qd.clear(); + + assertTrue(qd.isEmpty()); + + qd.dispose(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java index 4613d81968..f1a8a3b95c 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapMaybeTest.java @@ -13,12 +13,12 @@ package io.reactivex.internal.operators.single; -import io.reactivex.Maybe; -import io.reactivex.MaybeSource; -import io.reactivex.Single; -import io.reactivex.functions.Function; import org.junit.Test; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + public class SingleFlatMapMaybeTest { @Test(expected = NullPointerException.class) public void flatMapMaybeNull() { @@ -94,4 +94,53 @@ public void flatMapMaybeError() { .test() .assertError(exception); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1).flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(1); + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Single v) throws Exception { + return v.flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(1); + } + }); + } + }); + } + + @Test + public void mapsToError() { + Single.just(1).flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.error(new TestException()); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapsToEmpty() { + Single.just(1).flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java index 97a13447cb..f7a5e2b739 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFlatMapTest.java @@ -201,4 +201,26 @@ public void flatMapError() { .test() .assertError(exception); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1).flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(2); + } + })); + } + + @Test + public void mappedSingleOnError() { + Single.just(1).flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.error(new TestException()); + } + }) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleFromPublisherTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleFromPublisherTest.java index 23c73c28cb..6185b8bb1a 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleFromPublisherTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleFromPublisherTest.java @@ -13,14 +13,18 @@ package io.reactivex.internal.operators.single; -import java.util.NoSuchElementException; - import static org.junit.Assert.*; + +import java.util.*; + import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; public class SingleFromPublisherTest { @@ -72,4 +76,34 @@ public void dispose() { public void isDisposed() { TestHelper.checkDisposed(Single.fromPublisher(Flowable.never())); } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + + try { + Single.fromPublisher(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + BooleanSubscription s2 = new BooleanSubscription(); + s.onSubscribe(s2); + assertTrue(s2.isCancelled()); + + s.onNext(1); + s.onComplete(); + s.onNext(2); + s.onError(new TestException()); + s.onComplete(); + } + }) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, IllegalStateException.class, "Subscription already set!"); + TestHelper.assertError(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleHideTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleHideTest.java new file mode 100644 index 0000000000..011cc7d903 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleHideTest.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subjects.PublishSubject; + +public class SingleHideTest { + + @Test + public void error() { + Single.error(new TestException()) + .hide() + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().hide()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single s) throws Exception { + return s.hide(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleObserveOnTest.java new file mode 100644 index 0000000000..9faf2daae7 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleObserveOnTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; + +public class SingleObserveOnTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1).observeOn(Schedulers.single())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single s) throws Exception { + return s.observeOn(Schedulers.single()); + } + }); + } + + @Test + public void error() { + Single.error(new TestException()) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleOnErrorXTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleOnErrorXTest.java new file mode 100644 index 0000000000..074ef9fc9b --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleOnErrorXTest.java @@ -0,0 +1,84 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; + +public class SingleOnErrorXTest { + + @Test + public void returnSuccess() { + Single.just(1) + .onErrorReturnItem(2) + .test() + .assertResult(1); + } + + @Test + public void resumeThrows() { + TestObserver to = Single.error(new TestException("Outer")) + .onErrorReturn(new Function() { + @Override + public Integer apply(Throwable e) throws Exception { + throw new TestException("Inner"); + } + }) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Outer"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void resumeErrors() { + Single.error(new TestException("Main")) + .onErrorResumeNext(Single.error(new TestException("Resume"))) + .test() + .assertFailureAndMessage(TestException.class, "Resume"); + } + + @Test + public void resumeDispose() { + TestHelper.checkDisposed(Single.error(new TestException("Main")) + .onErrorResumeNext(Single.just(1))); + } + + @Test + public void resumeDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single s) throws Exception { + return s.onErrorResumeNext(Single.just(1)); + } + }); + } + + @Test + public void resumeSuccess() { + Single.just(1) + .onErrorResumeNext(Single.just(2)) + .test() + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleSubscribeOnTest.java index e269284daa..5cfb30e7dc 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleSubscribeOnTest.java @@ -21,9 +21,11 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.schedulers.TestScheduler; +import io.reactivex.schedulers.*; +import io.reactivex.subjects.PublishSubject; public class SingleSubscribeOnTest { @@ -46,4 +48,18 @@ public void normal() { RxJavaPlugins.reset(); } } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().subscribeOn(new TestScheduler())); + } + + @Test + public void error() { + Single.error(new TestException()) + .subscribeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java index 2791803f5c..fc28aff0db 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java @@ -13,13 +13,17 @@ package io.reactivex.internal.operators.single; +import java.util.List; import java.util.concurrent.CancellationException; import org.junit.Test; +import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; public class SingleTakeUntilTest { @@ -211,4 +215,48 @@ public void otherErrorCompletable() { ts.assertFailure(TestException.class); } + @Test + public void withPublisherDispose() { + TestHelper.checkDisposed(Single.never().takeUntil(Flowable.never())); + } + + @Test + public void onErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + final PublishProcessor ps1 = PublishProcessor.create(); + final PublishProcessor ps2 = PublishProcessor.create(); + + TestObserver to = ps1.singleOrError().takeUntil(ps2).test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java b/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java similarity index 63% rename from src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java rename to src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java index 5a739ba18c..2d08688dd0 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTests.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTimeoutTest.java @@ -19,11 +19,13 @@ import org.junit.Test; +import io.reactivex.Single; +import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; -public class SingleTimeoutTests { +public class SingleTimeoutTest { @Test public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { @@ -41,4 +43,30 @@ public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() { assertFalse(subject.hasObservers()); } + @Test + public void otherErrors() { + Single.never() + .timeout(1, TimeUnit.MILLISECONDS, Single.error(new TestException())) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void mainSuccess() { + Single.just(1) + .timeout(1, TimeUnit.DAYS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + + @Test + public void mainError() { + Single.error(new TestException()) + .timeout(1, TimeUnit.DAYS) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java new file mode 100644 index 0000000000..2a7405d529 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleTimerTest.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.schedulers.TestScheduler; + +public class SingleTimerTest { + + @Test + public void disposed() { + TestHelper.checkDisposed(Single.timer(1, TimeUnit.SECONDS, new TestScheduler())); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleToObservableTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleToObservableTest.java new file mode 100644 index 0000000000..cec40f4f86 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleToObservableTest.java @@ -0,0 +1,38 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.subjects.PublishSubject; + +public class SingleToObservableTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().toObservable()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Single s) throws Exception { + return s.toObservable(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/single/SingleNullTests.java b/src/test/java/io/reactivex/single/SingleNullTests.java index e1832469c1..0599395695 100644 --- a/src/test/java/io/reactivex/single/SingleNullTests.java +++ b/src/test/java/io/reactivex/single/SingleNullTests.java @@ -13,16 +13,18 @@ package io.reactivex.single; + import java.lang.reflect.*; import java.util.*; import java.util.concurrent.*; +import static org.junit.Assert.*; import org.junit.*; import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.SingleOperator; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.schedulers.Schedulers; @@ -705,14 +707,18 @@ public void onErrorResumeNextFunctionNull() { error.onErrorResumeNext((Function>)null); } - @Test(expected = NullPointerException.class) + @Test public void onErrorResumeNextFunctionReturnsNull() { - error.onErrorResumeNext(new Function>() { - @Override - public Single apply(Throwable e) { - return null; - } - }).blockingGet(); + try { + error.onErrorResumeNext(new Function>() { + @Override + public Single apply(Throwable e) { + return null; + } + }).blockingGet(); + } catch (CompositeException ex) { + assertTrue(ex.toString(), ex.getExceptions().get(1) instanceof NullPointerException); + } } @Test(expected = NullPointerException.class) diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index d2b9bce365..cafd53e205 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -531,5 +531,15 @@ public Object apply(final Object[] o) throws Exception { } }).test().assertResult(5); } + + @Test + public void to() { + assertEquals(1, Single.just(1).to(new Function, Integer>() { + @Override + public Integer apply(Single v) throws Exception { + return 1; + } + }).intValue()); + } }