Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ void cleanup() {
lock.lock();
try {
if (baseDisposable == currentBase) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}
baseDisposable.dispose();
baseDisposable = new CompositeDisposable();
subscriptionCount.set(0);
Expand Down Expand Up @@ -209,6 +212,10 @@ public void run() {
try {
if (baseDisposable == current) {
if (subscriptionCount.decrementAndGet() == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}

baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timed;

public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T> {
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, Disposable {
/** The source observable. */
final Flowable<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
Expand Down Expand Up @@ -161,6 +161,17 @@ protected void subscribeActual(Subscriber<? super T> s) {
onSubscribe.subscribe(s);
}

@Override
public void dispose() {
current.lazySet(null);
}

@Override
public boolean isDisposed() {
Disposable d = current.get();
return d == null || d.isDisposed();
}

@Override
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ void cleanup() {
lock.lock();
try {
if (baseDisposable == currentBase) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}

baseDisposable.dispose();
baseDisposable = new CompositeDisposable();
subscriptionCount.set(0);
Expand Down Expand Up @@ -208,6 +212,10 @@ public void run() {
try {
if (baseDisposable == current) {
if (subscriptionCount.decrementAndGet() == 0) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
}

baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timed;

public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, Disposable {
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
Expand Down Expand Up @@ -158,6 +158,17 @@ public ObservableSource<T> source() {
return source;
}

@Override
public void dispose() {
current.lazySet(null);
}

@Override
public boolean isDisposed() {
Disposable d = current.get();
return d == null || d.isDisposed();
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
onSubscribe.subscribe(observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import org.junit.*;
import org.junit.Test;
import org.mockito.InOrder;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.processors.ReplayProcessor;
import io.reactivex.schedulers.*;
Expand Down Expand Up @@ -619,4 +621,154 @@ protected void subscribeActual(Subscriber<? super Integer> observer) {

assertEquals(1, calls[0]);
}

Flowable<Object> source;

@Test
public void replayNoLeak() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
})
.replay(1)
.refCount();

source.subscribe();

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void replayNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
}).concatWith(Flowable.never())
.replay(1)
.refCount();

Disposable s1 = source.subscribe();
Disposable s2 = source.subscribe();

s1.dispose();
s2.dispose();

s1 = null;
s2 = null;

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

static final class ExceptionData extends Exception {
private static final long serialVersionUID = -6763898015338136119L;

public final Object data;

public ExceptionData(Object data) {
this.data = data;
}
}

@Test
public void publishNoLeak() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new ExceptionData(new byte[100 * 1000 * 1000]);
}
})
.publish()
.refCount();

source.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer());

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void publishNoLeak2() throws Exception {
System.gc();
Thread.sleep(100);

long start = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return new byte[100 * 1000 * 1000];
}
}).concatWith(Flowable.never())
.publish()
.refCount();

Disposable s1 = source.test();
Disposable s2 = source.test();

s1.dispose();
s2.dispose();

s1 = null;
s2 = null;

System.gc();
Thread.sleep(100);

long after = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

source = null;
assertTrue(String.format("%,3d -> %,3d%n", start, after), start + 20 * 1000 * 1000 > after);
}

@Test
public void replayIsUnsubscribed() {
ConnectableFlowable<Integer> co = Flowable.just(1)
.replay();

assertTrue(((Disposable)co).isDisposed());

Disposable s = co.connect();

assertFalse(((Disposable)co).isDisposed());

s.dispose();

assertTrue(((Disposable)co).isDisposed());
}
}
Loading