Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
import rx.operators.OperationDeferFuture;
import rx.operators.OperationDelay;
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
Expand All @@ -53,6 +54,7 @@
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationFromFuture;
import rx.operators.OperationGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
Expand Down Expand Up @@ -82,6 +84,7 @@
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
import rx.operators.OperationSkipWhile;
import rx.operators.OperationStartFuture;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSum;
import rx.operators.OperationSwitch;
Expand Down Expand Up @@ -114,6 +117,7 @@
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.OnErrorNotImplementedException;
import rx.util.Range;
Expand Down Expand Up @@ -6818,6 +6822,189 @@ public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> gro
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
}

/**
* Invokes the asynchronous function immediately, surfacing the result through an observable sequence.
* <p>
* <em>Important note</em> subscribing to the resulting observable blocks until
* the future completes.
* @param <T> the result type
* @param functionAsync the asynchronous function to run
* @return an observable which surfaces the result of the future.
* @see #startFuture(rx.util.functions.Func0, rx.Scheduler)
*/
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync) {
return OperationStartFuture.startFuture(functionAsync);
}

/**
* Invokes the asynchronous function, surfacing the result through an observable
* sequence and shares a BooleanSubscription between all subscribers.
* <p>
* <em>Important note</em> subscribing to the resulting observable blocks until
* the future completes.
* @param <T> the result type
* @param functionAsync the asynchronous function to run
* @return an observable which surfaces the result of the future and shares
* a common subscription between subscribers
* @see #startCancellableFuture(rx.util.functions.Func1, rx.Scheduler)
*/
public static <T> Observable<T> startCancellableFuture(Func1<? super BooleanSubscription, ? extends Future<? extends T>> functionAsync) {
return OperationStartFuture.startFuture(functionAsync);
}

/**
* Invokes the asynchronous function immediately, surfacing the result through an observable sequence
* and waits on the specified scheduler.
* @param <T> the result type
* @param functionAsync the asynchronous function to run
* @param scheduler the scheduler where the completion of the Future is awaited
* @return an observable which surfaces the result of the future.
*/
public static <T> Observable<T> startFuture(Func0<? extends Future<? extends T>> functionAsync,
Scheduler scheduler) {
return OperationStartFuture.startFuture(functionAsync, scheduler);
}

/**
* Invokes the asynchronous function, surfacing the result through an observable
* sequence and shares a BooleanSubscription between all subscribers
* and waits on the specified scheduler.
* @param <T> the result type
* @param functionAsync the asynchronous function to run
* @param scheduler the scheduler where the completion of the Future is awaited
* @return an observable which surfaces the result of the future and shares
* a common subscription between subscribers
*/
public static <T> Observable<T> startCancellableFuture(
Func1<? super BooleanSubscription, ? extends Future<? extends T>> functionAsync,
Scheduler scheduler) {
return OperationStartFuture.startFuture(functionAsync, scheduler);
}

/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes.
* <p>
* <em>Important note</em> subscribing to the resulting observable blocks until
* the future completes.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
* @see #deferFuture(rx.util.functions.Func0, rx.Scheduler)
*/
public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
return OperationDeferFuture.deferFuture(observableFactoryAsync);
}

/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes and shares a boolean
* subscription between all subscribers.
* <p>
* <em>Important note</em> subscribing to the resulting observable blocks until
* the future completes.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
* @see #deferCancellableFuture(rx.util.functions.Func1, rx.Scheduler)
*/
public static <T> Observable<T> deferCancellableFuture(Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
return OperationDeferFuture.deferFuture(observableFactoryAsync);
}

/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @param scheduler the scheduler where the completion of the Future is awaited
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
*/
public static <T> Observable<T> deferFuture(
Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler);
}

/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes and shares a boolean
* subscription between all subscribers.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @param scheduler the scheduler where the completion of the Future is awaited
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
*/
public static <T> Observable<T> deferCancellableFuture(
Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
return OperationDeferFuture.deferFuture(observableFactoryAsync, scheduler);
}

/**
* Converts to asynchronous function into an observable sequence where
* each subscription to the resulting sequence causes the function to be started.
* <p>
* <em>Important note</em> subscribing to the resulting observable blocks until
* the future completes.
* @param <T> the result type
* @param functionAsync the function to call when an Observer subscribes
* @return an Observable which returns the single result or exception of the future
* @see #fromFuture(rx.util.functions.Func0, rx.Scheduler)
*/
public static <T> Observable<T> fromFuture(Func0<? extends Future<? extends T>> functionAsync) {
return create(OperationFromFuture.fromFuture(functionAsync));
}
/**
* Converts to asynchronous function into an observable sequence where
* each subscription to the resulting sequence causes the function to be started
* and run on the given scheduler.
* @param <T> the result type
* @param functionAsync the function to call when an Observer subscribes
* @param scheduler the scheduler where the Future returned by the functionAsync is started
* @return an Observable which returns the single result or exception of the future
*/
public static <T> Observable<T> fromFuture(Func0<? extends Future<? extends T>> functionAsync, Scheduler scheduler) {
return create(OperationFromFuture.fromFuture(functionAsync, scheduler));
}
/**
* Converts to asynchronous function into an observable sequence
* where each subscription to the resulting sequence causes the function to be started and
* the passed-in BooleanSubscription is tied to the Observable sequence's subscription.
* <p>
* <em>Important note</em> subscribing to the resulting observable blocks until
* the future completes.
* @param <T> the result type
* @param functionAsync the function to call when an Observer subscribes which receives
* a BooleanSubscription that is tied to the subscription of the returned Observable
* @return an Observable which returns the single result or exception of the future
* @see #fromCancellableFuture(rx.util.functions.Func1, rx.Scheduler)
*/
public static <T> Observable<T> fromCancellableFuture(Func1<? super BooleanSubscription, ? extends Future<? extends T>> functionAsync) {
return create(OperationFromFuture.fromFuture(functionAsync));
}

/**
* Converts to asynchronous function into an observable sequence
* where each subscription to the resulting sequence causes the function to be started and
* run on the given scheduler, and
* the passed-in BooleanSubscription is tied to the Observable sequence's subscription.
* @param <T> the result type
* @param functionAsync the function to call when an Observer subscribes which receives
* a BooleanSubscription that is tied to the subscription of the returned Observable
* @param scheduler the scheduler where the Future returned by the functionAsync is started
* @return an Observable which returns the single result or exception of the future
*/
public static <T> Observable<T> fromCancellableFuture(
Func1<? super BooleanSubscription, ? extends Future<? extends T>> functionAsync,
Scheduler scheduler) {
return create(OperationFromFuture.fromFuture(functionAsync, scheduler));
}

/**
* Invokes the specified function asynchronously and returns an Observable
* that emits the result.
Expand Down
147 changes: 147 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDeferFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.concurrent.Future;
import rx.Observable;
import rx.Scheduler;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Defer the execution of a factory method which produces an observable sequence.
*/
public final class OperationDeferFuture {
/** Utility class. */
private OperationDeferFuture() { throw new IllegalStateException("No instances!"); }

/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
*/
public static <T> Observable<T> deferFuture(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
return Observable.defer(new DeferFutureFunc0<T>(observableFactoryAsync));
}
/** The function called by the defer operator. */
private static final class DeferFutureFunc0<T> implements Func0<Observable<T>> {
final Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync;

public DeferFutureFunc0(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
this.observableFactoryAsync = observableFactoryAsync;
}

@Override
public Observable<T> call() {
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync));
}

}
/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes and shares a boolean
* subscription between all subscribers.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
*/
public static <T> Observable<T> deferFuture(Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
return Observable.defer(new DeferFutureFunc1<T>(observableFactoryAsync));
}
/** The function called by the defer operator with boolean subscription. */
private static final class DeferFutureFunc1<T> implements Func0<Observable<T>> {
final Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync;

public DeferFutureFunc1(Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync) {
this.observableFactoryAsync = observableFactoryAsync;
}

@Override
public Observable<T> call() {
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync));
}

}

/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @param scheduler the scheduler where the completion of the Future is awaited
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
*/
public static <T> Observable<T> deferFuture(
Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
return Observable.defer(new DeferFutureFunc0Scheduled<T>(observableFactoryAsync, scheduler));
}
/** The function called by the defer operator. */
private static final class DeferFutureFunc0Scheduled<T> implements Func0<Observable<T>> {
final Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync;
final Scheduler scheduler;

public DeferFutureFunc0Scheduled(Func0<? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
this.observableFactoryAsync = observableFactoryAsync;
this.scheduler = scheduler;
}

@Override
public Observable<T> call() {
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler));
}

}
/**
* Returns an observable sequence that starts the specified asynchronous
* factory function whenever a new observer subscribes and shares a boolean
* subscription between all subscribers.
* @param <T> the result type
* @param observableFactoryAsync the asynchronous function to start for each observer
* @param scheduler the scheduler where the completion of the Future is awaited
* @return the observable sequence containing values produced by the asynchronous observer
* produced by the factory
*/
public static <T> Observable<T> deferFuture(
Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
return Observable.defer(new DeferFutureFunc1Scheduled<T>(observableFactoryAsync, scheduler));
}
/** The function called by the defer operator with boolean subscription. */
private static final class DeferFutureFunc1Scheduled<T> implements Func0<Observable<T>> {
final Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync;
final Scheduler scheduler;

public DeferFutureFunc1Scheduled(Func1<? super BooleanSubscription, ? extends Future<? extends Observable<? extends T>>> observableFactoryAsync,
Scheduler scheduler) {
this.observableFactoryAsync = observableFactoryAsync;
this.scheduler = scheduler;
}

@Override
public Observable<T> call() {
return Observable.merge(OperationStartFuture.startFuture(observableFactoryAsync, scheduler));
}

}
}
Loading