Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

Expand All @@ -37,8 +38,10 @@
public final class FlowableCombineLatest<T, R>
extends Flowable<R> {

@Nullable
final Publisher<? extends T>[] array;

@Nullable
final Iterable<? extends Publisher<? extends T>> iterable;

final Function<? super Object[], ? extends R> combiner;
Expand All @@ -47,8 +50,8 @@ public final class FlowableCombineLatest<T, R>

final boolean delayErrors;

public FlowableCombineLatest(Publisher<? extends T>[] array,
Function<? super Object[], ? extends R> combiner,
public FlowableCombineLatest(@NonNull Publisher<? extends T>[] array,
@NonNull Function<? super Object[], ? extends R> combiner,
int bufferSize, boolean delayErrors) {
this.array = array;
this.iterable = null;
Expand All @@ -57,8 +60,8 @@ public FlowableCombineLatest(Publisher<? extends T>[] array,
this.delayErrors = delayErrors;
}

public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
Function<? super Object[], ? extends R> combiner,
public FlowableCombineLatest(@NonNull Iterable<? extends Publisher<? extends T>> iterable,
@NonNull Function<? super Object[], ? extends R> combiner,
int bufferSize, boolean delayErrors) {
this.array = null;
this.iterable = iterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Arrays;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.disposables.Disposable;
Expand All @@ -33,21 +35,22 @@
* @param <R> the output type
*/
public final class FlowableWithLatestFromMany<T, R> extends AbstractFlowableWithUpstream<T, R> {

@Nullable
final Publisher<?>[] otherArray;

@Nullable
final Iterable<? extends Publisher<?>> otherIterable;

final Function<? super Object[], R> combiner;

public FlowableWithLatestFromMany(Publisher<T> source, Publisher<?>[] otherArray, Function<? super Object[], R> combiner) {
public FlowableWithLatestFromMany(@NonNull Publisher<T> source, @NonNull Publisher<?>[] otherArray, Function<? super Object[], R> combiner) {
super(source);
this.otherArray = otherArray;
this.otherIterable = null;
this.combiner = combiner;
}

public FlowableWithLatestFromMany(Publisher<T> source, Iterable<? extends Publisher<?>> otherIterable, Function<? super Object[], R> combiner) {
public FlowableWithLatestFromMany(@NonNull Publisher<T> source, @NonNull Iterable<? extends Publisher<?>> otherIterable, @NonNull Function<? super Object[], R> combiner) {
super(source);
this.otherArray = null;
this.otherIterable = otherIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void onSuccess(T value) {

this.it = iter;

if (outputFused && iter != null) {
if (outputFused) {
a.onNext(null);
a.onComplete();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.concurrent.atomic.*;

import io.reactivex.*;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
Expand All @@ -33,20 +35,23 @@
*/
public final class ObservableWithLatestFromMany<T, R> extends AbstractObservableWithUpstream<T, R> {

@Nullable
final ObservableSource<?>[] otherArray;

@Nullable
final Iterable<? extends ObservableSource<?>> otherIterable;

@NonNull
final Function<? super Object[], R> combiner;

public ObservableWithLatestFromMany(ObservableSource<T> source, ObservableSource<?>[] otherArray, Function<? super Object[], R> combiner) {
public ObservableWithLatestFromMany(@NonNull ObservableSource<T> source, @NonNull ObservableSource<?>[] otherArray, @NonNull Function<? super Object[], R> combiner) {
super(source);
this.otherArray = otherArray;
this.otherIterable = null;
this.combiner = combiner;
}

public ObservableWithLatestFromMany(ObservableSource<T> source, Iterable<? extends ObservableSource<?>> otherIterable, Function<? super Object[], R> combiner) {
public ObservableWithLatestFromMany(@NonNull ObservableSource<T> source, @NonNull Iterable<? extends ObservableSource<?>> otherIterable, @NonNull Function<? super Object[], R> combiner) {
super(source);
this.otherArray = null;
this.otherIterable = otherIterable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public Disposable schedule(@NonNull Runnable action) {
return EmptyDisposable.INSTANCE;
}

return poolWorker.scheduleActual(action, 0, null, serial);
return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
}
@NonNull
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -106,7 +107,8 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
* @return the ScheduledRunnable instance
*/
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Expand All @@ -126,7 +128,9 @@ public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, Time
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

Expand Down
18 changes: 18 additions & 0 deletions src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,22 @@ public void run() {

assertEquals(0, calls[0]);
}

/**
* Regression test to ensure there is no NPE when the worker has been disposed
*/
@Test
public void npeRegression() throws Exception {
Scheduler s = getScheduler();
NewThreadWorker w = (NewThreadWorker) s.createWorker();
w.dispose();

//This method used to throw a NPE when the worker has been disposed and the parent is null
w.scheduleActual(new Runnable() {
@Override
public void run() {
}
}, 0, TimeUnit.MILLISECONDS, null);

}
}