Skip to content
Merged
117 changes: 77 additions & 40 deletions src/main/java/io/reactivex/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package io.reactivex.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;

import io.reactivex.internal.functions.ObjectHelper;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
Expand All @@ -31,13 +33,13 @@ public final class RxJavaPlugins {

static volatile Function<Runnable, Runnable> onScheduleHandler;

static volatile Function<Scheduler, Scheduler> onInitComputationHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitComputationHandler;

static volatile Function<Scheduler, Scheduler> onInitSingleHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitSingleHandler;

static volatile Function<Scheduler, Scheduler> onInitIoHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitIoHandler;

static volatile Function<Scheduler, Scheduler> onInitNewThreadHandler;
static volatile Function<Callable<Scheduler>, Scheduler> onInitNewThreadHandler;

static volatile Function<Scheduler, Scheduler> onComputationHandler;

Expand Down Expand Up @@ -121,31 +123,31 @@ public static Consumer<Throwable> getErrorHandler() {
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitComputationSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitComputationSchedulerHandler() {
return onInitComputationHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitIoSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitIoSchedulerHandler() {
return onInitIoHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitNewThreadSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitNewThreadSchedulerHandler() {
return onInitNewThreadHandler;
}

/**
* Returns the current hook function.
* @return the hook function, may be null
*/
public static Function<Scheduler, Scheduler> getInitSingleSchedulerHandler() {
public static Function<Callable<Scheduler>, Scheduler> getInitSingleSchedulerHandler() {
return onInitSingleHandler;
}

Expand Down Expand Up @@ -183,54 +185,62 @@ public static Function<Scheduler, Scheduler> getSingleSchedulerHandler() {

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initComputationScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitComputationHandler;
public static Scheduler initComputationScheduler(Callable<Scheduler> defaultScheduler) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not it throw on a null callable? What's the point of calling with null?

Copy link
Contributor Author

@peter-tackage peter-tackage Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done for consistency with the existing expectations in RxJavaPluginsTest.clearIsPassthrough(), specifically:

assertNull(RxJavaPlugins.initComputationScheduler(null));
assertNull(RxJavaPlugins.initIoScheduler(null));
assertNull(RxJavaPlugins.initNewThreadScheduler(null));
assertNull(RxJavaPlugins.initSingleScheduler(null));

Should this be changed to only return null if the Callable returns null (and throw if the Callable itself is null)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null should not be allowed as a return value from the Callable nor from the init Function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a change, because previously the Scheduler value for RxJavaPlugins.init* was allowed to be null, as per - assertNull(RxJavaPlugins.initSingleScheduler(null));.

I will add an additional set of tests for the new behavior (something along the lines of assemblyHookCrashes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitComputationHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler); // JIT will skip this
return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initIoScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitIoHandler;
public static Scheduler initIoScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitIoHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initNewThreadScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitNewThreadHandler;
public static Scheduler initNewThreadScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitNewThreadHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
* Calls the associated hook function.
* @param defaultScheduler the hook's input value
* @return the value returned by the hook
* @param defaultScheduler a {@link Callable} which returns the hook's input value
* @return the value returned by the hook, not null
* @throws NullPointerException if the callable parameter or its result are null
*/
public static Scheduler initSingleScheduler(Scheduler defaultScheduler) {
Function<Scheduler, Scheduler> f = onInitSingleHandler;
public static Scheduler initSingleScheduler(Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<Callable<Scheduler>, Scheduler> f = onInitSingleHandler;
if (f == null) {
return defaultScheduler;
return callRequireNonNull(defaultScheduler);
}
return apply(f, defaultScheduler);
return applyRequireNonNull(f, defaultScheduler);
}

/**
Expand Down Expand Up @@ -393,9 +403,9 @@ public static void setErrorHandler(Consumer<Throwable> handler) {

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitComputationSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitComputationSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -404,9 +414,9 @@ public static void setInitComputationSchedulerHandler(Function<Scheduler, Schedu

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitIoSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitIoSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -415,9 +425,9 @@ public static void setInitIoSchedulerHandler(Function<Scheduler, Scheduler> hand

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitNewThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitNewThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -426,9 +436,9 @@ public static void setInitNewThreadSchedulerHandler(Function<Scheduler, Schedule

/**
* Sets the specific hook function.
* @param handler the hook function to set, null allowed
* @param handler the hook function to set, null allowed, but the function may not return null
*/
public static void setInitSingleSchedulerHandler(Function<Scheduler, Scheduler> handler) {
public static void setInitSingleSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand Down Expand Up @@ -953,6 +963,33 @@ static <T, U, R> R apply(BiFunction<T, U, R> f, T t, U u) {
}
}

/**
* Wraps the call to the Scheduler creation callable in try-catch and propagates thrown
* checked exceptions as RuntimeException and enforces that result is not null.
* @param s the {@link Callable} which returns a {@link Scheduler}, not null (not verified). Cannot return null
* @return the result of the callable call, not null
* @throws NullPointerException if the callable parameter returns null
*/
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}

/**
* Wraps the call to the Scheduler creation function in try-catch and propagates thrown
* checked exceptions as RuntimeException and enforces that result is not null.
* @param f the function to call, not null (not verified). Cannot return null
* @param s the parameter value to the function
* @return the result of the function call, not null
* @throws NullPointerException if the function parameter returns null
*/
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null");
}

/** Helper class, no instances. */
private RxJavaPlugins() {
throw new IllegalStateException("No instances!");
Expand Down
47 changes: 42 additions & 5 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.schedulers;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.Scheduler;
Expand Down Expand Up @@ -44,16 +45,52 @@ public final class Schedulers {

static final Scheduler NEW_THREAD;

static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleScheduler());
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}

static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationScheduler());
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}

IO = RxJavaPlugins.initIoScheduler(new IoScheduler());
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return SingleHolder.DEFAULT;
}
});

COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return ComputationHolder.DEFAULT;
}
});

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(NewThreadScheduler.instance());
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}

/** Utility class. */
Expand Down
Loading