diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index e028649a3d..6aa6dbd839 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -971,6 +971,27 @@ public final Throwable blockingGet(long timeout, TimeUnit unit) { return observer.blockingGetError(timeout, unit); } + /** + * Subscribes to this Completable only once, when the first CompletableObserver + * subscribes to the result Completable, caches its terminal event + * and relays/replays it to observers. + *

+ * Note that this operator doesn't allow disposing the connection + * of the upstream source. + *

+ *
Scheduler:
+ *
{@code cache} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Completable instance + * @since 2.0.4 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable cache() { + return RxJavaPlugins.onAssembly(new CompletableCache(this)); + } + /** * Calls the given transformer function with this instance and returns the function's resulting * Completable. diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableCache.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableCache.java new file mode 100644 index 0000000000..2ca40bfb2b --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableCache.java @@ -0,0 +1,172 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; + +/** + * Consume the upstream source exactly once and cache its terminal event. + * + * @since 2.0.4 - experimental + */ +@Experimental +public final class CompletableCache extends Completable implements CompletableObserver { + + static final InnerCompletableCache[] EMPTY = new InnerCompletableCache[0]; + + static final InnerCompletableCache[] TERMINATED = new InnerCompletableCache[0]; + + final CompletableSource source; + + final AtomicReference observers; + + final AtomicBoolean once; + + Throwable error; + + public CompletableCache(CompletableSource source) { + this.source = source; + this.observers = new AtomicReference(EMPTY); + this.once = new AtomicBoolean(); + } + + @Override + protected void subscribeActual(CompletableObserver s) { + InnerCompletableCache inner = new InnerCompletableCache(s); + s.onSubscribe(inner); + + if (add(inner)) { + if (inner.isDisposed()) { + remove(inner); + } + + if (once.compareAndSet(false, true)) { + source.subscribe(this); + } + } else { + Throwable ex = error; + if (ex != null) { + s.onError(ex); + } else { + s.onComplete(); + } + } + } + + @Override + public void onSubscribe(Disposable d) { + // not used + } + + @Override + public void onError(Throwable e) { + error = e; + for (InnerCompletableCache inner : observers.getAndSet(TERMINATED)) { + if (!inner.get()) { + inner.actual.onError(e); + } + } + } + + @Override + public void onComplete() { + for (InnerCompletableCache inner : observers.getAndSet(TERMINATED)) { + if (!inner.get()) { + inner.actual.onComplete(); + } + } + } + + boolean add(InnerCompletableCache inner) { + for (;;) { + InnerCompletableCache[] a = observers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + InnerCompletableCache[] b = new InnerCompletableCache[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + if (observers.compareAndSet(a, b)) { + return true; + } + } + } + + void remove(InnerCompletableCache inner) { + for (;;) { + InnerCompletableCache[] a = observers.get(); + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + + for (int i = 0; i < n; i++) { + if (a[i] == inner) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + InnerCompletableCache[] b; + + if (n == 1) { + b = EMPTY; + } else { + b = new InnerCompletableCache[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + + if (observers.compareAndSet(a, b)) { + break; + } + } + } + + final class InnerCompletableCache + extends AtomicBoolean + implements Disposable { + + private static final long serialVersionUID = 8943152917179642732L; + + final CompletableObserver actual; + + InnerCompletableCache(CompletableObserver actual) { + this.actual = actual; + } + + @Override + public boolean isDisposed() { + return get(); + } + + @Override + public void dispose() { + if (compareAndSet(false, true)) { + remove(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/JavadocWording.java b/src/test/java/io/reactivex/JavadocWording.java index a27fe44e83..a1afe21e0a 100644 --- a/src/test/java/io/reactivex/JavadocWording.java +++ b/src/test/java/io/reactivex/JavadocWording.java @@ -662,7 +662,7 @@ public void completableDocRefersToCompletableTypes() throws Exception { && !m.signature.contains("TestObserver")) { if (idx < 11 || !m.javadoc.substring(idx - 11, idx + 8).equals("CompletableObserver")) { - e.append("java.lang.RuntimeException: Maybe doc mentions Observer but not using Observable\r\n at io.reactivex.") + e.append("java.lang.RuntimeException: Completable doc mentions Observer but not using Observable\r\n at io.reactivex.") .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableCacheTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableCacheTest.java new file mode 100644 index 0000000000..1af19d2ec8 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableCacheTest.java @@ -0,0 +1,266 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.observers.TestObserver; +import io.reactivex.subjects.PublishSubject; + +public class CompletableCacheTest implements Consumer, Action { + + volatile int count; + + @Override + public void accept(Object t) throws Exception { + count++; + } + + @Override + public void run() throws Exception { + count++; + } + + @Test + public void normal() { + Completable c = Completable.complete() + .doOnSubscribe(this) + .cache(); + + assertEquals(0, count); + + c.test().assertResult(); + + assertEquals(1, count); + + c.test().assertResult(); + + assertEquals(1, count); + + c.test().assertResult(); + + assertEquals(1, count); + } + + @Test + public void error() { + Completable c = Completable.error(new TestException()) + .doOnSubscribe(this) + .cache(); + + assertEquals(0, count); + + c.test().assertFailure(TestException.class); + + assertEquals(1, count); + + c.test().assertFailure(TestException.class); + + assertEquals(1, count); + + c.test().assertFailure(TestException.class); + + assertEquals(1, count); + } + + @Test + public void crossDispose() { + PublishSubject ps = PublishSubject.create(); + + final TestObserver ts1 = new TestObserver(); + + final TestObserver ts2 = new TestObserver() { + @Override + public void onComplete() { + super.onComplete(); + ts1.cancel(); + } + }; + + Completable c = ps.ignoreElements().cache(); + + c.subscribe(ts2); + c.subscribe(ts1); + + ps.onComplete(); + + ts1.assertEmpty(); + ts2.assertResult(); + } + + @Test + public void crossDisposeOnError() { + PublishSubject ps = PublishSubject.create(); + + final TestObserver ts1 = new TestObserver(); + + final TestObserver ts2 = new TestObserver() { + @Override + public void onError(Throwable ex) { + super.onError(ex); + ts1.cancel(); + } + }; + + Completable c = ps.ignoreElements().cache(); + + c.subscribe(ts2); + c.subscribe(ts1); + + ps.onError(new TestException()); + + ts1.assertEmpty(); + ts2.assertFailure(TestException.class); + } + + @Test + public void dispose() { + PublishSubject ps = PublishSubject.create(); + + Completable c = ps.ignoreElements().cache(); + + assertFalse(ps.hasObservers()); + + TestObserver ts1 = c.test(); + + assertTrue(ps.hasObservers()); + + ts1.cancel(); + + assertTrue(ps.hasObservers()); + + TestObserver ts2 = c.test(); + + TestObserver ts3 = c.test(); + ts3.cancel(); + + TestObserver ts4 = c.test(true); + ts3.cancel(); + + ps.onComplete(); + + ts1.assertEmpty(); + + ts2.assertResult(); + + ts3.assertEmpty(); + + ts4.assertEmpty(); + } + + @Test + public void subscribeRace() { + for (int i = 0; i < 500; i++) { + PublishSubject ps = PublishSubject.create(); + + final Completable c = ps.ignoreElements().cache(); + + final TestObserver ts1 = new TestObserver(); + + final TestObserver ts2 = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + c.subscribe(ts1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + c.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + + ps.onComplete(); + + ts1.assertResult(); + ts2.assertResult(); + } + } + + @Test + public void subscribeDisposeRace() { + for (int i = 0; i < 500; i++) { + PublishSubject ps = PublishSubject.create(); + + final Completable c = ps.ignoreElements().cache(); + + final TestObserver ts1 = c.test(); + + final TestObserver ts2 = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts1.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + c.subscribe(ts2); + } + }; + + TestHelper.race(r1, r2); + + ps.onComplete(); + + ts1.assertEmpty(); + ts2.assertResult(); + } + } + + @Test + public void doubleDispose() { + PublishSubject ps = PublishSubject.create(); + + final TestObserver ts = new TestObserver(); + + ps.ignoreElements().cache() + .subscribe(new CompletableObserver() { + + @Override + public void onSubscribe(Disposable d) { + ts.onSubscribe(EmptyDisposable.INSTANCE); + d.dispose(); + d.dispose(); + } + + @Override + public void onComplete() { + ts.onComplete(); + } + + @Override + public void onError(Throwable e) { + ts.onError(e); + } + }); + + ps.onComplete(); + + ts.assertEmpty(); + } +}