Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2249,7 +2249,7 @@ public final <R> R to(Func1<? super Completable, R> converter) {
* @return the new Observable created
*/
public final <T> Observable<T> toObservable() {
return Observable.create(new Observable.OnSubscribe<T>() {
return Observable.unsafeCreate(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
unsafeSubscribe(s);
Expand Down
212 changes: 139 additions & 73 deletions src/main/java/rx/Observable.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public interface Transformer<T, R> extends Func1<Single<T>, Single<R>> {
*/
private static <T> Observable<T> asObservable(Single<T> t) {
// is this sufficient, or do I need to keep the outer Single and subscribe to it?
return Observable.create(new SingleToObservable<T>(t.onSubscribe));
return Observable.unsafeCreate(new SingleToObservable<T>(t.onSubscribe));
}

/* *********************************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum EmptyObservableHolder implements OnSubscribe<Object> {
;

/** The singleton instance. */
static final Observable<Object> EMPTY = Observable.create(INSTANCE);
static final Observable<Object> EMPTY = Observable.unsafeCreate(INSTANCE);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum NeverObservableHolder implements OnSubscribe<Object> {
;

/** The singleton instance. */
static final Observable<Object> NEVER = Observable.create(INSTANCE);
static final Observable<Object> NEVER = Observable.unsafeCreate(INSTANCE);

/**
* Returns a type-corrected singleton instance of the never Observable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import rx.plugins.RxJavaHooks;
import rx.subscriptions.SerialSubscription;

public final class OnSubscribeFromEmitter<T> implements OnSubscribe<T> {
public final class OnSubscribeCreate<T> implements OnSubscribe<T> {

final Action1<Emitter<T>> Emitter;

final Emitter.BackpressureMode backpressure;

public OnSubscribeFromEmitter(Action1<Emitter<T>> Emitter, Emitter.BackpressureMode backpressure) {
public OnSubscribeCreate(Action1<Emitter<T>> Emitter, Emitter.BackpressureMode backpressure) {
this.Emitter = Emitter;
this.backpressure = backpressure;
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public void onError(Throwable e) {

@Override
void onOverflow() {
onError(new MissingBackpressureException("fromEmitter: could not emit value due to lack of requests"));
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public static <T, R> Observable<R> createFrom(Observable<? extends T> source,
Func1<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
if (source instanceof ScalarSynchronousObservable) {
T scalar = ((ScalarSynchronousObservable<? extends T>) source).get();
return Observable.create(new OnSubscribeScalarFlattenIterable<T, R>(scalar, mapper));
return Observable.unsafeCreate(new OnSubscribeScalarFlattenIterable<T, R>(scalar, mapper));
}
return Observable.create(new OnSubscribeFlattenIterable<T, R>(source, mapper, prefetch));
return Observable.unsafeCreate(new OnSubscribeFlattenIterable<T, R>(source, mapper, prefetch));
}

static final class FlattenIterableSubscriber<T, R> extends Subscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void onNext(T1 args) {
leftMap().put(id, subjSerial);
}

Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj, cancel));
Observable<T2> window = Observable.unsafeCreate(new WindowObservableFunc<T2>(subj, cancel));

Observable<D1> duration = leftDuration.call(args);

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* limitations under the License.
*/

import static rx.Observable.create; // NOPMD
import static rx.Observable.unsafeCreate; // NOPMD

import java.util.concurrent.atomic.*;

Expand Down Expand Up @@ -133,11 +133,11 @@ public static <T> Observable<T> retry(Observable<T> source, final long count) {
}

public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
}

public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
}

public static <T> Observable<T> repeat(Observable<T> source) {
Expand All @@ -163,15 +163,15 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count, S
}

public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
}

public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
}

public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
}

private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/internal/operators/OperatorPublish.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static <T, R> Observable<R> create(final Observable<? extends T> source,

public static <T, R> Observable<R> create(final Observable<? extends T> source,
final Func1<? super Observable<T>, ? extends Observable<R>> selector, final boolean delayError) {
return create(new OnSubscribe<R>() {
return unsafeCreate(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
final OnSubscribePublishMulticast<T> op = new OnSubscribePublishMulticast<T>(RxRingBuffer.SIZE, delayError);
Expand Down Expand Up @@ -155,7 +155,7 @@ public void setProducer(Producer p) {
child.add(op);
child.add(subscriber);

selector.call(Observable.create(op)).unsafeSubscribe(subscriber);
selector.call(Observable.unsafeCreate(op)).unsafeSubscribe(subscriber);

source.unsafeSubscribe(op.subscriber());
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Object call() {
public static <T, U, R> Observable<R> multicastSelector(
final Func0<? extends ConnectableObservable<U>> connectableFactory,
final Func1<? super Observable<U>, ? extends Observable<R>> selector) {
return Observable.create(new OnSubscribe<R>() {
return Observable.unsafeCreate(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
ConnectableObservable<U> co;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void call() {
};
}

return create(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
return unsafeCreate(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
}

/** The OnSubscribe callback for the Observable constructor. */
Expand Down Expand Up @@ -225,7 +225,7 @@ public String toString() {
* @return the new observable
*/
public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
return create(new OnSubscribe<R>() {
return unsafeCreate(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
Observable<? extends R> o = func.call(t);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/observables/AsyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* A utility class to create {@code OnSubscribe<T>} functions that respond correctly to back
* pressure requests from subscribers. This is an improvement over
* {@link rx.Observable#create(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
* {@link rx.Observable#unsafeCreate(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
* any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe
* function allows for the asynchronous processing of requests.
*
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/observables/ConnectableObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void call(Subscription t1) {
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
*/
public Observable<T> refCount() {
return create(new OnSubscribeRefCount<T>(this));
return unsafeCreate(new OnSubscribeRefCount<T>(this));
}

/**
Expand Down Expand Up @@ -125,6 +125,6 @@ public Observable<T> autoConnect(int numberOfSubscribers, Action1<? super Subscr
this.connect(connection);
return this;
}
return create(new OnSubscribeAutoConnect<T>(this, numberOfSubscribers, connection));
return unsafeCreate(new OnSubscribeAutoConnect<T>(this, numberOfSubscribers, connection));
}
}
2 changes: 1 addition & 1 deletion src/main/java/rx/observables/SyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* A utility class to create {@code OnSubscribe<T>} functions that responds correctly to back
* pressure requests from subscribers. This is an improvement over
* {@link rx.Observable#create(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
* {@link rx.Observable#unsafeCreate(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
* any means of managing back pressure requests out-of-the-box.
*
* @param <S>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public abstract class RxJavaObservableExecutionHook { // NOPMD
/**
* Invoked during the construction by {@link Observable#create(OnSubscribe)}
* Invoked during the construction by {@link Observable#unsafeCreate(OnSubscribe)}
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass through the function.
Expand Down
2 changes: 1 addition & 1 deletion src/perf/java/rx/OneItemPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void call(SingleSubscriber<? super T> t) {
@Setup
public void setup() {
scalar = Observable.just(1);
one = Observable.create(new OnSubscribe<Integer>() {
one = Observable.unsafeCreate(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> t) {
t.setProducer(new SingleProducer<Integer>(t, 1));
Expand Down
2 changes: 1 addition & 1 deletion src/perf/java/rx/jmh/InputWithIncrementingInteger.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void setup(final Blackhole bh) {
final int size = getSize();
observable = Observable.range(0, size);

firehose = Observable.create(new OnSubscribe<Integer>() {
firehose = Observable.unsafeCreate(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
Expand Down
4 changes: 2 additions & 2 deletions src/perf/java/rx/operators/FromComparison.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public void setup() {

Arrays.fill(array, 1);

iterableSource = Observable.create(new OnSubscribeFromIterable<Integer>(Arrays.asList(array)));
arraySource = Observable.create(new OnSubscribeFromArray<Integer>(array));
iterableSource = Observable.unsafeCreate(new OnSubscribeFromIterable<Integer>(Arrays.asList(array)));
arraySource = Observable.unsafeCreate(new OnSubscribeFromArray<Integer>(array));
}

@Benchmark
Expand Down
4 changes: 2 additions & 2 deletions src/perf/java/rx/operators/OperatorRangePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static class InputUsingRequest {

@Setup
public void setup(final Blackhole bh) {
observable = Observable.create(new OnSubscribeRange(0, size));
observable = Observable.unsafeCreate(new OnSubscribeRange(0, size));
this.bh = bh;
}

Expand Down Expand Up @@ -91,7 +91,7 @@ public static class InputWithoutRequest {

@Setup
public void setup(final Blackhole bh) {
observable = Observable.create(new OnSubscribeRange(0, size));
observable = Observable.unsafeCreate(new OnSubscribeRange(0, size));
this.bh = bh;

}
Expand Down
6 changes: 3 additions & 3 deletions src/perf/java/rx/operators/OperatorSerializePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void serializedSingleStream(Input input) throws InterruptedException {
@Benchmark
public void serializedTwoStreamsHighlyContended(final Input input) throws InterruptedException {
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable.create(new OnSubscribe<Integer>() {
Observable.unsafeCreate(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
Expand Down Expand Up @@ -101,7 +101,7 @@ public Integer call(Long t1) {
@Benchmark
public void serializedTwoStreamsSlightlyContended(final InputWithInterval input) throws InterruptedException {
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable.create(new OnSubscribe<Integer>() {
Observable.unsafeCreate(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
Expand All @@ -118,7 +118,7 @@ public void call(Subscriber<? super Integer> s) {
@Benchmark
public void serializedTwoStreamsOneFastOneSlow(final InputWithInterval input) throws InterruptedException {
LatchedObserver<Integer> o = input.newLatchedObserver();
Observable.create(new OnSubscribe<Integer>() {
Observable.unsafeCreate(new OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> s) {
Expand Down
2 changes: 1 addition & 1 deletion src/perf/java/rx/operators/OperatorTakeLastOnePerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void takeLastOneUsingTakeLast(Input input) {

@Benchmark
public void takeLastOneUsingTakeLastOne(Input input) {
Observable.create(new OnSubscribeTakeLastOne<Integer>(input.observable)).subscribe(input.observer);
Observable.unsafeCreate(new OnSubscribeTakeLastOne<Integer>(input.observable)).subscribe(input.observer);
}

}
4 changes: 2 additions & 2 deletions src/test/java/rx/BackpressureTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private static Observable<Integer> incrementingIntegers(final AtomicInteger coun
}

private static Observable<Integer> incrementingIntegers(final AtomicInteger counter, final ConcurrentLinkedQueue<Thread> threadsSeen) {
return Observable.create(new OnSubscribe<Integer>() {
return Observable.unsafeCreate(new OnSubscribe<Integer>() {

final AtomicLong requested = new AtomicLong();

Expand Down Expand Up @@ -637,7 +637,7 @@ public void request(long n) {
* @return
*/
private static Observable<Integer> firehose(final AtomicInteger counter) {
return Observable.create(new OnSubscribe<Integer>() {
return Observable.unsafeCreate(new OnSubscribe<Integer>() {

int i;

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void call(CompletableSubscriber cs) {
cs.onError(e);
}
})
.andThen(Observable.<String>create(new Observable.OnSubscribe<String>() {
.andThen(Observable.<String>unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> s) {
hasRun.set(true);
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/ConcatTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testConcatCovariance4() {
Media media = new Media();
HorrorMovie horrorMovie2 = new HorrorMovie();

Observable<Movie> o1 = Observable.create(new OnSubscribe<Movie>() {
Observable<Movie> o1 = Observable.unsafeCreate(new OnSubscribe<Movie>() {

@Override
public void call(Subscriber<? super Movie> o) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/EventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private EventStream() {
throw new IllegalStateException("No instances!");
}
public static Observable<Event> getEventStream(final String type, final int numInstances) {
return Observable.create(new OnSubscribe<Event>() {
return Observable.unsafeCreate(new OnSubscribe<Event>() {

@Override
public void call(final Subscriber<? super Event> subscriber) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/rx/MergeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testMergeCovariance3() {
@Test
public void testMergeCovariance4() {

Observable<Movie> o1 = Observable.create(new OnSubscribe<Movie>() {
Observable<Movie> o1 = Observable.unsafeCreate(new OnSubscribe<Movie>() {

@Override
public void call(Subscriber<? super Movie> o) {
Expand Down
Loading