Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@
import rx.operators.OperationZip;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -5226,6 +5226,59 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Create a connectable observable sequence that shares a single
* subscription to the underlying sequence and starts with initialValue.
* @param initialValue the initial value of the underlying BehaviorSubject
* @return a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
*/
public ConnectableObservable<T> publish(T initialValue) {
return OperationMulticast.multicast(this, BehaviorSubject.<T> create(initialValue));
}

/**
* Create an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will receive all notifications of the source from the time
* of the subscription on.
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence.
*/
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return PublishSubject.create();
}
}, selector);
}

/**
* Create an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will receive all notifications of the source from the time
* of the subscription on
* @param initialValue the initial value of the underlying BehaviorSubject
* @return an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue
*/
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return BehaviorSubject.create(initialValue);
}
}, selector);
}

/**
* Returns a {@link ConnectableObservable} that emits only the last item
* emitted by the source Observable.
Expand All @@ -5238,6 +5291,30 @@ public ConnectableObservable<T> publish() {
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Create an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence containing only the last
* notification.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will only receive the last notification of the source
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence containing only the last
* notification.
*/
public <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return AsyncSubject.create();
}
}, selector);
}

/**
* Synonymous with <code>reduce()</code>.
Expand Down