Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2808,6 +2808,50 @@ public final Single<T> delaySubscription(Observable<?> other) {
return create(new SingleOnSubscribeDelaySubscriptionOther<T>(this, other));
}

/**
* Returns a Single which makes sure when a subscriber cancels the subscription,
* the dispose is called on the specified scheduler
* @param scheduler the target scheduler where to execute the cancellation
* @return the new Single instance
*/
public final Single<T> unsubscribeOn(final Scheduler scheduler) {
return create(new OnSubscribe<T>() {
@Override
public void call(final SingleSubscriber<? super T> t) {
final SingleSubscriber<T> single = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
t.onSuccess(value);
}

@Override
public void onError(Throwable error) {
t.onError(error);
}
};

t.add(Subscriptions.create(new Action0() {
@Override
public void call() {
final Scheduler.Worker w = scheduler.createWorker();
w.schedule(new Action0() {
@Override
public void call() {
try {
single.unsubscribe();
} finally {
w.unsubscribe();
}
}
});
}
}));

Single.this.subscribe(single);
}
});
}

// -------------------------------------------------------------------------
// Fluent test support, super handy and reduces test preparation boilerplate
// -------------------------------------------------------------------------
Expand Down
62 changes: 62 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import rx.Single.OnSubscribe;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.RxThreadFactory;
import rx.observers.*;
import rx.plugins.RxJavaHooks;
import rx.schedulers.*;
Expand Down Expand Up @@ -2230,4 +2231,65 @@ public void call(Throwable t) {

assertEquals(1, calls[0]);
}

@Test
public void unsubscribeOnSuccess() throws InterruptedException {
final AtomicReference<String> name = new AtomicReference<String>();

final CountDownLatch cdl = new CountDownLatch(1);

TestSubscriber<Integer> ts = TestSubscriber.create();

Single.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
name.set(Thread.currentThread().getName());
cdl.countDown();
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.computation())
.subscribe(ts);

cdl.await();

ts.awaitTerminalEvent();
ts.assertReceivedOnNext(Arrays.asList(1));

assertTrue(name.get().startsWith("RxComputation"));
}

@Test
public void unsubscribeOnError() throws InterruptedException {
final AtomicReference<String> name = new AtomicReference<String>();

final CountDownLatch cdl = new CountDownLatch(1);

TestSubscriber<Integer> ts = TestSubscriber.create();

Single.<Integer>error(new RuntimeException())
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
name.set(Thread.currentThread().getName());
cdl.countDown();
}
})
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.computation())
.subscribe(ts);

cdl.await();

ts.awaitTerminalEvent();
ts.assertError(RuntimeException.class);

assertTrue(name.get().startsWith("RxComputation"));
}
}