diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5d98df26b9..61e3fe8af0 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -44,6 +44,7 @@ import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; +import rx.operators.OperationDeferFuture; import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; @@ -53,6 +54,7 @@ import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; +import rx.operators.OperationFromFuture; import rx.operators.OperationGroupBy; import rx.operators.OperationGroupByUntil; import rx.operators.OperationGroupJoin; @@ -82,6 +84,7 @@ import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; import rx.operators.OperationSkipWhile; +import rx.operators.OperationStartFuture; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; import rx.operators.OperationSwitch; @@ -114,6 +117,7 @@ import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; +import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.OnErrorNotImplementedException; import rx.util.Range; @@ -6818,6 +6822,189 @@ public Observable> gro return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } + /** + * Invokes the asynchronous function immediately, surfacing the result through an observable sequence. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the asynchronous function to run + * @return an observable which surfaces the result of the future. + * @see #startFuture(rx.util.functions.Func0, rx.Scheduler) + */ + public static Observable startFuture(Func0> functionAsync) { + return OperationStartFuture.startFuture(functionAsync); + } + + /** + * Invokes the asynchronous function, surfacing the result through an observable + * sequence and shares a BooleanSubscription between all subscribers. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the asynchronous function to run + * @return an observable which surfaces the result of the future and shares + * a common subscription between subscribers + * @see #startCancellableFuture(rx.util.functions.Func1, rx.Scheduler) + */ + public static Observable startCancellableFuture(Func1> functionAsync) { + return OperationStartFuture.startFuture(functionAsync); + } + + /** + * Invokes the asynchronous function immediately, surfacing the result through an observable sequence + * and waits on the specified scheduler. + * @param the result type + * @param functionAsync the asynchronous function to run + * @param scheduler the scheduler where the completion of the Future is awaited + * @return an observable which surfaces the result of the future. + */ + public static Observable startFuture(Func0> functionAsync, + Scheduler scheduler) { + return OperationStartFuture.startFuture(functionAsync, scheduler); + } + + /** + * Invokes the asynchronous function, surfacing the result through an observable + * sequence and shares a BooleanSubscription between all subscribers + * and waits on the specified scheduler. + * @param the result type + * @param functionAsync the asynchronous function to run + * @param scheduler the scheduler where the completion of the Future is awaited + * @return an observable which surfaces the result of the future and shares + * a common subscription between subscribers + */ + public static Observable startCancellableFuture( + Func1> functionAsync, + Scheduler scheduler) { + return OperationStartFuture.startFuture(functionAsync, scheduler); + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + * @see #deferFuture(rx.util.functions.Func0, rx.Scheduler) + */ + public static Observable deferFuture(Func0>> observableFactoryAsync) { + return OperationDeferFuture.deferFuture(observableFactoryAsync); + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes and shares a boolean + * subscription between all subscribers. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + * @see #deferCancellableFuture(rx.util.functions.Func1, rx.Scheduler) + */ + public static Observable deferCancellableFuture(Func1>> observableFactoryAsync) { + return OperationDeferFuture.deferFuture(observableFactoryAsync); + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture( + Func0>> observableFactoryAsync, + Scheduler scheduler) { + return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler); + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes and shares a boolean + * subscription between all subscribers. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferCancellableFuture( + Func1>> observableFactoryAsync, + Scheduler scheduler) { + return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler); + } + + /** + * Converts to asynchronous function into an observable sequence where + * each subscription to the resulting sequence causes the function to be started. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the function to call when an Observer subscribes + * @return an Observable which returns the single result or exception of the future + * @see #fromFuture(rx.util.functions.Func0, rx.Scheduler) + */ + public static Observable fromFuture(Func0> functionAsync) { + return create(OperationFromFuture.fromFuture(functionAsync)); + } + /** + * Converts to asynchronous function into an observable sequence where + * each subscription to the resulting sequence causes the function to be started + * and run on the given scheduler. + * @param the result type + * @param functionAsync the function to call when an Observer subscribes + * @param scheduler the scheduler where the Future returned by the functionAsync is started + * @return an Observable which returns the single result or exception of the future + */ + public static Observable fromFuture(Func0> functionAsync, Scheduler scheduler) { + return create(OperationFromFuture.fromFuture(functionAsync, scheduler)); + } + /** + * Converts to asynchronous function into an observable sequence + * where each subscription to the resulting sequence causes the function to be started and + * the passed-in BooleanSubscription is tied to the Observable sequence's subscription. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the function to call when an Observer subscribes which receives + * a BooleanSubscription that is tied to the subscription of the returned Observable + * @return an Observable which returns the single result or exception of the future + * @see #fromCancellableFuture(rx.util.functions.Func1, rx.Scheduler) + */ + public static Observable fromCancellableFuture(Func1> functionAsync) { + return create(OperationFromFuture.fromFuture(functionAsync)); + } + + /** + * Converts to asynchronous function into an observable sequence + * where each subscription to the resulting sequence causes the function to be started and + * run on the given scheduler, and + * the passed-in BooleanSubscription is tied to the Observable sequence's subscription. + * @param the result type + * @param functionAsync the function to call when an Observer subscribes which receives + * a BooleanSubscription that is tied to the subscription of the returned Observable + * @param scheduler the scheduler where the Future returned by the functionAsync is started + * @return an Observable which returns the single result or exception of the future + */ + public static Observable fromCancellableFuture( + Func1> functionAsync, + Scheduler scheduler) { + return create(OperationFromFuture.fromFuture(functionAsync, scheduler)); + } + /** * Invokes the specified function asynchronously and returns an Observable * that emits the result. diff --git a/rxjava-core/src/main/java/rx/operators/OperationDeferFuture.java b/rxjava-core/src/main/java/rx/operators/OperationDeferFuture.java new file mode 100644 index 0000000000..5673c734c0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDeferFuture.java @@ -0,0 +1,147 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Future; +import rx.Observable; +import rx.Scheduler; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Defer the execution of a factory method which produces an observable sequence. + */ +public final class OperationDeferFuture { + /** Utility class. */ + private OperationDeferFuture() { throw new IllegalStateException("No instances!"); } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture(Func0>> observableFactoryAsync) { + return Observable.defer(new DeferFutureFunc0(observableFactoryAsync)); + } + /** The function called by the defer operator. */ + private static final class DeferFutureFunc0 implements Func0> { + final Func0>> observableFactoryAsync; + + public DeferFutureFunc0(Func0>> observableFactoryAsync) { + this.observableFactoryAsync = observableFactoryAsync; + } + + @Override + public Observable call() { + return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync)); + } + + } + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes and shares a boolean + * subscription between all subscribers. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture(Func1>> observableFactoryAsync) { + return Observable.defer(new DeferFutureFunc1(observableFactoryAsync)); + } + /** The function called by the defer operator with boolean subscription. */ + private static final class DeferFutureFunc1 implements Func0> { + final Func1>> observableFactoryAsync; + + public DeferFutureFunc1(Func1>> observableFactoryAsync) { + this.observableFactoryAsync = observableFactoryAsync; + } + + @Override + public Observable call() { + return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync)); + } + + } + + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture( + Func0>> observableFactoryAsync, + Scheduler scheduler) { + return Observable.defer(new DeferFutureFunc0Scheduled(observableFactoryAsync, scheduler)); + } + /** The function called by the defer operator. */ + private static final class DeferFutureFunc0Scheduled implements Func0> { + final Func0>> observableFactoryAsync; + final Scheduler scheduler; + + public DeferFutureFunc0Scheduled(Func0>> observableFactoryAsync, + Scheduler scheduler) { + this.observableFactoryAsync = observableFactoryAsync; + this.scheduler = scheduler; + } + + @Override + public Observable call() { + return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler)); + } + + } + /** + * Returns an observable sequence that starts the specified asynchronous + * factory function whenever a new observer subscribes and shares a boolean + * subscription between all subscribers. + * @param the result type + * @param observableFactoryAsync the asynchronous function to start for each observer + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable sequence containing values produced by the asynchronous observer + * produced by the factory + */ + public static Observable deferFuture( + Func1>> observableFactoryAsync, + Scheduler scheduler) { + return Observable.defer(new DeferFutureFunc1Scheduled(observableFactoryAsync, scheduler)); + } + /** The function called by the defer operator with boolean subscription. */ + private static final class DeferFutureFunc1Scheduled implements Func0> { + final Func1>> observableFactoryAsync; + final Scheduler scheduler; + + public DeferFutureFunc1Scheduled(Func1>> observableFactoryAsync, + Scheduler scheduler) { + this.observableFactoryAsync = observableFactoryAsync; + this.scheduler = scheduler; + } + + @Override + public Observable call() { + return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler)); + } + + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationFromFuture.java b/rxjava-core/src/main/java/rx/operators/OperationFromFuture.java new file mode 100644 index 0000000000..2424f9f1cb --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationFromFuture.java @@ -0,0 +1,126 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Future; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Scheduler; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Converts to asynchronous function into an observable sequence where + * each subscription to the resulting sequence causes the function to be started. + */ +public final class OperationFromFuture { + /** Utility class. */ + private OperationFromFuture() { throw new IllegalStateException("No instances!"); } + /** + * Converts to asynchronous function into an observable sequence where + * each subscription to the resulting sequence causes the function to be started. + */ + public static OnSubscribeFunc fromFuture(Func0> functionAsync) { + return OperationDefer.defer(new FromFuture(functionAsync)); + } + /** + * Converts to asynchronous function into an observable sequence where + * each subscription to the resulting sequence causes the function to be started + * and run on the given scheduler. + */ + public static OnSubscribeFunc fromFuture(Func0> functionAsync, Scheduler scheduler) { + return OperationDefer.defer(new FromFutureScheduled(functionAsync, scheduler)); + } + /** + * Converts to asynchronous function into an observable sequence + * where each subscription to the resulting sequence causes the function to be started and + * the passed-in BooleanSubscription is tied to the Observable sequence's subscription. + */ + public static OnSubscribeFunc fromFuture(Func1> functionAsync) { + return OperationDefer.defer(new FromFutureToken(functionAsync)); + } + + /** + * Converts to asynchronous function into an observable sequence + * where each subscription to the resulting sequence causes the function to be started and + * run on the given scheduler, and + * the passed-in BooleanSubscription is tied to the Observable sequence's subscription. + */ + public static OnSubscribeFunc fromFuture( + Func1> functionAsync, + Scheduler scheduler) { + return OperationDefer.defer(new FromFutureTokenScheduled(functionAsync, scheduler)); + } + + /** From a future started by OperationStartFuture. */ + private static final class FromFuture implements Func0> { + final Func0> functionAsync; + + public FromFuture(Func0> functionAsync) { + this.functionAsync = functionAsync; + } + + @Override + public Observable call() { + return OperationStartFuture.startFuture(functionAsync); + } + } + /** From a future started by OperationStartFuture. */ + private static final class FromFutureScheduled implements Func0> { + final Func0> functionAsync; + final Scheduler scheduler; + + public FromFutureScheduled(Func0> functionAsync, Scheduler scheduler) { + this.functionAsync = functionAsync; + this.scheduler = scheduler; + } + + @Override + public Observable call() { + return OperationStartFuture.startFuture(functionAsync, scheduler); + } + } + + /** From a future started by OperationStartFuture. */ + private static final class FromFutureToken implements Func0> { + final Func1> functionAsync; + + public FromFutureToken(Func1> functionAsync) { + this.functionAsync = functionAsync; + } + + @Override + public Observable call() { + return OperationStartFuture.startFuture(functionAsync); + } + } + /** From a future started by OperationStartFuture. */ + private static final class FromFutureTokenScheduled implements Func0> { + final Func1> functionAsync; + final Scheduler scheduler; + + public FromFutureTokenScheduled(Func1> functionAsync, Scheduler scheduler) { + this.functionAsync = functionAsync; + this.scheduler = scheduler; + } + + @Override + public Observable call() { + return OperationStartFuture.startFuture(functionAsync, scheduler); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationStartFuture.java b/rxjava-core/src/main/java/rx/operators/OperationStartFuture.java new file mode 100644 index 0000000000..f9eef5cb40 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationStartFuture.java @@ -0,0 +1,134 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Future; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.BooleanSubscription; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Start an asynchronous Future immediately and observe its result through + * an observable. + */ +public final class OperationStartFuture { + /** Utility class. */ + private OperationStartFuture() { throw new IllegalStateException("No instances!"); } + /** + * Invokes the asynchronous function, surfacing the result through an observable sequence. + *

+ * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the asynchronous function to run + * @return the observable + */ + public static Observable startFuture(Func0> functionAsync) { + Future task; + try { + task = functionAsync.call(); + } catch (Throwable t) { + return Observable.error(t); + } + return Observable.from(task); + } + /** + * Invokes the asynchronous function, surfacing the result through an observable sequence + * running on the given scheduler. + * @param the result type + * @param functionAsync the asynchronous function to run + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable + */ + public static Observable startFuture(Func0> functionAsync, + Scheduler scheduler) { + Future task; + try { + task = functionAsync.call(); + } catch (Throwable t) { + return Observable.error(t); + } + return Observable.from(task, scheduler); + } + /** + * Invokes the asynchronous function, surfacing the result through an observable + * sequence and shares a BooleanSubscription between all subscribers. + * Important note subscribing to the resulting observable blocks until + * the future completes. + * @param the result type + * @param functionAsync the asynchronous function to run + * @return the observable + */ + public static Observable startFuture(Func1> functionAsync) { + final BooleanSubscription token = new BooleanSubscription(); + Future task; + try { + task = functionAsync.call(token); + } catch (Throwable t) { + return Observable.error(t); + } + final Observable result = Observable.from(task); + return Observable.create(new StartAsyncSubscribe(token, result)); + } + + /** + * Invokes the asynchronous function, surfacing the result through an observable + * sequence and shares a BooleanSubscription between all subscribers. + * @param the result type + * @param functionAsync the asynchronous function to run + * @param scheduler the scheduler where the completion of the Future is awaited + * @return the observable + */ + public static Observable startFuture( + Func1> functionAsync, + Scheduler scheduler) { + final BooleanSubscription token = new BooleanSubscription(); + Future task; + try { + task = functionAsync.call(token); + } catch (Throwable t) { + return Observable.error(t); + } + final Observable result = Observable.from(task, scheduler); + return Observable.create(new StartAsyncSubscribe(token, result)); + } + /** + * The subsription function that combines the token and the subscription to the + * source. + * @param the value type + */ + private static final class StartAsyncSubscribe implements OnSubscribeFunc { + final Subscription token; + final Observable source; + + public StartAsyncSubscribe(Subscription token, Observable source) { + this.token = token; + this.source = source; + } + + @Override + public Subscription onSubscribe(Observer t1) { + Subscription s = source.subscribe(t1); + return new CompositeSubscription(token, s); + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationDeferFutureTest.java b/rxjava-core/src/test/java/rx/operators/OperationDeferFutureTest.java new file mode 100644 index 0000000000..8660b06033 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationDeferFutureTest.java @@ -0,0 +1,104 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.fail; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import rx.Observable; +import rx.Observer; +import rx.schedulers.Schedulers; +import rx.util.functions.Func0; + +public class OperationDeferFutureTest { + @Test + @SuppressWarnings("unchecked") + public void testSimple() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + try { + final CountDownLatch ready = new CountDownLatch(1); + + Func0>> func = new Func0>>() { + @Override + public Future> call() { + return exec.submit(new Callable>() { + @Override + public Observable call() throws Exception { + if (!ready.await(1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Not started in time"); + } + return Observable.from(1); + } + }); + } + }; + + Observable result = Observable.deferFuture(func, Schedulers.threadPoolForComputation()); + + final Observer observer = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new OperationStartFutureTest.MockHelper(observer, done)); + + ready.countDown(); + + if (!done.await(1500, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } finally { + exec.shutdown(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testSimpleFactoryThrows() { + Func0>> func = new Func0>>() { + + @Override + public Future> call() { + throw new OperationStartFutureTest.CustomException(); + } + }; + + Observable result = Observable.deferFuture(func); + + final Observer observer = mock(Observer.class); + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer).onError(any(OperationStartFutureTest.CustomException.class)); + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationFromFutureTest.java b/rxjava-core/src/test/java/rx/operators/OperationFromFutureTest.java new file mode 100644 index 0000000000..8c7eef2186 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationFromFutureTest.java @@ -0,0 +1,122 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.operators.OperationStartFutureTest.CustomException; +import rx.schedulers.Schedulers; +import rx.util.functions.Func0; + +public class OperationFromFutureTest { + @Test + @SuppressWarnings("unchecked") + public void testSimple() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + + try { + final AtomicInteger runCount = new AtomicInteger(0); + + Func0> func = new Func0>() { + @Override + public Future call() { + return exec.submit(new Callable() { + @Override + public Integer call() throws Exception { + return runCount.incrementAndGet(); + } + }); + } + }; + + Observable result = Observable.fromFuture(func, Schedulers.threadPoolForComputation()); + + for (int i = 1; i <= 10; i++) { + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new OperationStartFutureTest.MockHelper(o, done)); + + if (!done.await(1500, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + inOrder.verify(o).onNext(i); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + assertEquals(i, runCount.get()); + } + + } finally { + exec.shutdown(); + } + } + @Test + @SuppressWarnings("unchecked") + public void testSimpleThrows() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + + try { + Func0> func = new Func0>() { + @Override + public Future call() { + return exec.submit(new Callable() { + @Override + public Integer call() throws Exception { + throw new CustomException(); + } + }); + } + }; + + Observable result = Observable.fromFuture(func, Schedulers.threadPoolForComputation()); + + for (int i = 1; i <= 10; i++) { + Observer o = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new OperationStartFutureTest.MockHelper(o, done)); + + if (!done.await(1500, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + verify(o, times(1)).onError(any(CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + + } finally { + exec.shutdown(); + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationStartFutureTest.java b/rxjava-core/src/test/java/rx/operators/OperationStartFutureTest.java new file mode 100644 index 0000000000..2df51a7b3f --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationStartFutureTest.java @@ -0,0 +1,278 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.fail; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.schedulers.Schedulers; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationStartFutureTest { + /** Custom exception to distinguish from any other RuntimeException. */ + static class CustomException extends RuntimeException {} + /** + * Forwards the events to the underlying observer and counts down the latch + * on terminal conditions. + * @param + */ + static class MockHelper implements Observer { + final Observer observer; + final CountDownLatch latch; + + public MockHelper(Observer observer, CountDownLatch latch) { + this.observer = observer; + this.latch = latch; + } + + @Override + public void onNext(T args) { + try { + observer.onNext(args); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable e) { + try { + observer.onError(e); + } finally { + latch.countDown(); + } + } + + + @Override + public void onCompleted() { + try { + observer.onCompleted(); + } finally { + latch.countDown(); + } + } + + } + @Test + @SuppressWarnings("unchecked") + public void testSimple() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + try { + final CountDownLatch ready = new CountDownLatch(1); + + Func0> func = new Func0>() { + + @Override + public Future call() { + return exec.submit(new Callable() { + @Override + public Integer call() throws Exception { + if (!ready.await(1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Not started in time"); + } + return 1; + } + }); + } + }; + + Observable result = Observable.startFuture(func, Schedulers.threadPoolForComputation()); + + final Observer observer = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new MockHelper(observer, done)); + + ready.countDown(); + + if (!done.await(1500, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } finally { + exec.shutdown(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testSimpleFactoryThrows() { + Func0> func = new Func0>() { + + @Override + public Future call() { + throw new CustomException(); + } + }; + + Observable result = Observable.startFuture(func); + + final Observer observer = mock(Observer.class); + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer).onError(any(CustomException.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testSimpleCancellableFactoryThrows() { + Func1> func = new Func1>() { + + @Override + public Future call(BooleanSubscription token) { + throw new CustomException(); + } + }; + + Observable result = Observable.startCancellableFuture(func); + + final Observer observer = mock(Observer.class); + result.subscribe(observer); + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer).onError(any(CustomException.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testStartCancellable() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + try { + final CountDownLatch ready = new CountDownLatch(1); + + Func1> func = new Func1>() { + + @Override + public Future call(BooleanSubscription token) { + return exec.submit(new Callable() { + @Override + public Integer call() throws Exception { + if (!ready.await(1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Not started in time"); + } + return 1; + } + }); + } + }; + + Observable result = Observable.startCancellableFuture(func, Schedulers.threadPoolForComputation()); + + final Observer observer = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + result.subscribe(new MockHelper(observer, done)); + + ready.countDown(); + + if (!done.await(1500, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } finally { + exec.shutdown(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testStartCancellableAndCancel() throws InterruptedException { + final ExecutorService exec = Executors.newCachedThreadPool(); + try { + final CountDownLatch ready = new CountDownLatch(1); + final CountDownLatch ready2 = new CountDownLatch(1); + + Func1> func = new Func1>() { + + @Override + public Future call(final BooleanSubscription token) { + return exec.submit(new Callable() { + @Override + public Integer call() throws Exception { + ready.countDown(); + + if (!ready2.await(1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Not started in time"); + } + if (token.isUnsubscribed()) { + throw new CancellationException(); + } + + return 1; + } + }); + } + }; + + Observable result = Observable.startCancellableFuture(func, Schedulers.threadPoolForComputation()); + + final Observer observer = mock(Observer.class); + + final CountDownLatch done = new CountDownLatch(1); + + Subscription s = result.subscribe(new MockHelper(observer, done)); + + if (!ready.await(1000, TimeUnit.MILLISECONDS)) { + fail("Not entered the call() in time!"); + } + + s.unsubscribe(); + + // resume call + ready2.countDown(); + + if (!done.await(1500, TimeUnit.MILLISECONDS)) { + fail("Not completed in time!"); + } + + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(CancellationException.class)); + + } finally { + exec.shutdown(); + } + } +}