diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index 2560071d7c..b48e0ca8c9 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -99,6 +99,9 @@ void cleanup() { lock.lock(); try { if (baseDisposable == currentBase) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } baseDisposable.dispose(); baseDisposable = new CompositeDisposable(); subscriptionCount.set(0); @@ -209,6 +212,10 @@ public void run() { try { if (baseDisposable == current) { if (subscriptionCount.decrementAndGet() == 0) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + baseDisposable.dispose(); // need a new baseDisposable because once // disposed stays that way diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 210a753ca1..fa6f494acf 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -32,7 +32,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; -public final class FlowableReplay extends ConnectableFlowable implements HasUpstreamPublisher { +public final class FlowableReplay extends ConnectableFlowable implements HasUpstreamPublisher, Disposable { /** The source observable. */ final Flowable source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -161,6 +161,17 @@ protected void subscribeActual(Subscriber s) { onSubscribe.subscribe(s); } + @Override + public void dispose() { + current.lazySet(null); + } + + @Override + public boolean isDisposed() { + Disposable d = current.get(); + return d == null || d.isDisposed(); + } + @Override public void connect(Consumer connection) { boolean doConnect; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 207bf96ff7..218760ff8c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -162,6 +162,10 @@ void cleanup() { lock.lock(); try { if (baseDisposable == currentBase) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + baseDisposable.dispose(); baseDisposable = new CompositeDisposable(); subscriptionCount.set(0); @@ -208,6 +212,10 @@ public void run() { try { if (baseDisposable == current) { if (subscriptionCount.decrementAndGet() == 0) { + if (source instanceof Disposable) { + ((Disposable)source).dispose(); + } + baseDisposable.dispose(); // need a new baseDisposable because once // disposed stays that way diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 2d2f13dac8..af25fa9bf2 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -30,7 +30,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Timed; -public final class ObservableReplay extends ConnectableObservable implements HasUpstreamObservableSource { +public final class ObservableReplay extends ConnectableObservable implements HasUpstreamObservableSource, Disposable { /** The source observable. */ final ObservableSource source; /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ @@ -158,6 +158,17 @@ public ObservableSource source() { return source; } + @Override + public void dispose() { + current.lazySet(null); + } + + @Override + public boolean isDisposed() { + Disposable d = current.get(); + return d == null || d.isDisposed(); + } + @Override protected void subscribeActual(Observer observer) { onSubscribe.subscribe(observer); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index ae4c38cfef..513d1a9c15 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -17,11 +17,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import org.junit.*; +import org.junit.Test; import org.mockito.InOrder; import org.reactivestreams.*; @@ -29,6 +30,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.ReplayProcessor; import io.reactivex.schedulers.*; @@ -619,4 +621,154 @@ protected void subscribeActual(Subscriber observer) { assertEquals(1, calls[0]); } + + Flowable source; + + @Test + public void replayNoLeak() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return new byte[100 * 1000 * 1000]; + } + }) + .replay(1) + .refCount(); + + source.subscribe(); + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + @Test + public void replayNoLeak2() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return new byte[100 * 1000 * 1000]; + } + }).concatWith(Flowable.never()) + .replay(1) + .refCount(); + + Disposable s1 = source.subscribe(); + Disposable s2 = source.subscribe(); + + s1.dispose(); + s2.dispose(); + + s1 = null; + s2 = null; + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + static final class ExceptionData extends Exception { + private static final long serialVersionUID = -6763898015338136119L; + + public final Object data; + + public ExceptionData(Object data) { + this.data = data; + } + } + + @Test + public void publishNoLeak() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new ExceptionData(new byte[100 * 1000 * 1000]); + } + }) + .publish() + .refCount(); + + source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + @Test + public void publishNoLeak2() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return new byte[100 * 1000 * 1000]; + } + }).concatWith(Flowable.never()) + .publish() + .refCount(); + + Disposable s1 = source.test(); + Disposable s2 = source.test(); + + s1.dispose(); + s2.dispose(); + + s1 = null; + s2 = null; + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + @Test + public void replayIsUnsubscribed() { + ConnectableFlowable co = Flowable.just(1) + .replay(); + + assertTrue(((Disposable)co).isDisposed()); + + Disposable s = co.connect(); + + assertFalse(((Disposable)co).isDisposed()); + + s.dispose(); + + assertTrue(((Disposable)co).isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 3938d3b0ed..08dea0e7d1 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; +import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -29,6 +30,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.*; @@ -619,4 +621,153 @@ protected void subscribeActual(Observer observer) { assertEquals(1, calls[0]); } + Observable source; + + @Test + public void replayNoLeak() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return new byte[100 * 1000 * 1000]; + } + }) + .replay(1) + .refCount(); + + source.subscribe(); + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + @Test + public void replayNoLeak2() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return new byte[100 * 1000 * 1000]; + } + }).concatWith(Observable.never()) + .replay(1) + .refCount(); + + Disposable s1 = source.subscribe(); + Disposable s2 = source.subscribe(); + + s1.dispose(); + s2.dispose(); + + s1 = null; + s2 = null; + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + static final class ExceptionData extends Exception { + private static final long serialVersionUID = -6763898015338136119L; + + public final Object data; + + public ExceptionData(Object data) { + this.data = data; + } + } + + @Test + public void publishNoLeak() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new ExceptionData(new byte[100 * 1000 * 1000]); + } + }) + .publish() + .refCount(); + + source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()); + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + @Test + public void publishNoLeak2() throws Exception { + System.gc(); + Thread.sleep(100); + + long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return new byte[100 * 1000 * 1000]; + } + }).concatWith(Observable.never()) + .publish() + .refCount(); + + Disposable s1 = source.test(); + Disposable s2 = source.test(); + + s1.dispose(); + s2.dispose(); + + s1 = null; + s2 = null; + + System.gc(); + Thread.sleep(100); + + long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + + source = null; + assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after); + } + + @Test + public void replayIsUnsubscribed() { + ConnectableObservable co = Observable.just(1).concatWith(Observable.never()) + .replay(); + + assertTrue(((Disposable)co).isDisposed()); + + Disposable s = co.connect(); + + assertFalse(((Disposable)co).isDisposed()); + + s.dispose(); + + assertTrue(((Disposable)co).isDisposed()); + } }