diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index a96fa469cf..310e56b849 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -61,6 +62,7 @@ public static long clockDriftTolerance() { * * @return a Worker representing a serial queue of actions to be executed */ + @NonNull public abstract Worker createWorker(); /** @@ -69,7 +71,7 @@ public static long clockDriftTolerance() { * @return the 'current time' * @since 2.0 */ - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @@ -105,7 +107,8 @@ public void shutdown() { * @return the Disposable instance that let's one cancel this particular task. * @since 2.0 */ - public Disposable scheduleDirect(Runnable run) { + @NonNull + public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @@ -122,7 +125,8 @@ public Disposable scheduleDirect(Runnable run) { * @return the Disposable that let's one cancel this particular delayed task. * @since 2.0 */ - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + @NonNull + public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -159,7 +163,8 @@ public void run() { * @return the Disposable that let's one cancel this particular delayed task. * @since 2.0 */ - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + @NonNull + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); @@ -249,7 +254,8 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo */ @SuppressWarnings("unchecked") @Experimental - public S when(Function>, Completable> combine) { + @NonNull + public S when(@NonNull Function>, Completable> combine) { return (S) new SchedulerWhen(combine, this); } @@ -268,7 +274,8 @@ public abstract static class Worker implements Disposable { * Runnable to schedule * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ - public Disposable schedule(Runnable run) { + @NonNull + public Disposable schedule(@NonNull Runnable run) { return schedule(run, 0L, TimeUnit.NANOSECONDS); } @@ -287,7 +294,8 @@ public Disposable schedule(Runnable run) { * the time unit of {@code delayTime} * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ - public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit); + @NonNull + public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); /** * Schedules a cancelable action to be executed periodically. This default implementation schedules @@ -309,7 +317,8 @@ public Disposable schedule(Runnable run) { * the time unit of {@code period} * @return a Disposable to be able to unsubscribe the action (cancel it if not executed) */ - public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) { + @NonNull + public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) { final SequentialDisposable first = new SequentialDisposable(); final SequentialDisposable sd = new SequentialDisposable(first); @@ -337,7 +346,7 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi * @return the 'current time' * @since 2.0 */ - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @@ -346,15 +355,17 @@ public long now(TimeUnit unit) { * of this task has to happen (accounting for clock drifts). */ final class PeriodicTask implements Runnable { + @NonNull final Runnable decoratedRun; + @NonNull final SequentialDisposable sd; final long periodInNanoseconds; long count; long lastNowNanoseconds; long startInNanoseconds; - PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun, - long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) { + PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun, + long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) { this.decoratedRun = decoratedRun; this.sd = sd; this.periodInNanoseconds = periodInNanoseconds; @@ -395,12 +406,12 @@ public void run() { static class PeriodicDirectTask implements Runnable, Disposable { final Runnable run; - + @NonNull final Worker worker; - + @NonNull volatile boolean disposed; - PeriodicDirectTask(Runnable run, Worker worker) { + PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) { this.run = run; this.worker = worker; } diff --git a/src/main/java/io/reactivex/annotations/NonNull.java b/src/main/java/io/reactivex/annotations/NonNull.java index 19a2370007..adfe9ff98f 100644 --- a/src/main/java/io/reactivex/annotations/NonNull.java +++ b/src/main/java/io/reactivex/annotations/NonNull.java @@ -23,6 +23,9 @@ import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.CLASS; +/** + * Indicates that a field/parameter/variable/return type is never null. + */ @Documented @Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE}) @Retention(value = CLASS) diff --git a/src/main/java/io/reactivex/annotations/Nullable.java b/src/main/java/io/reactivex/annotations/Nullable.java new file mode 100644 index 0000000000..69e03498e7 --- /dev/null +++ b/src/main/java/io/reactivex/annotations/Nullable.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.LOCAL_VARIABLE; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.CLASS; + +/** + * Indicates that a field/parameter/variable/return type may be null. + */ +@Documented +@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE}) +@Retention(value = CLASS) +public @interface Nullable { } + diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index a74439fc0f..5faaa555a5 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -16,6 +16,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; @@ -118,19 +119,22 @@ public ComputationScheduler(ThreadFactory threadFactory) { start(); } + @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.scheduleDirect(run, delay, unit); } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.schedulePeriodicallyDirect(run, initialDelay, period, unit); } @@ -188,16 +192,18 @@ public boolean isDisposed() { return disposed; } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { if (disposed) { return EmptyDisposable.INSTANCE; } return poolWorker.scheduleActual(action, 0, null, serial); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index 3916073a40..6ce46c44f7 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; import io.reactivex.internal.queue.MpscLinkedQueue; @@ -29,21 +30,24 @@ */ public final class ExecutorScheduler extends Scheduler { + @NonNull final Executor executor; static final Scheduler HELPER = Schedulers.single(); - public ExecutorScheduler(Executor executor) { + public ExecutorScheduler(@NonNull Executor executor) { this.executor = executor; } + @NonNull @Override public Worker createWorker() { return new ExecutorWorker(executor); } + @NonNull @Override - public Disposable scheduleDirect(Runnable run) { + public Disposable scheduleDirect(@NonNull Runnable run) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { if (executor instanceof ExecutorService) { @@ -60,8 +64,9 @@ public Disposable scheduleDirect(Runnable run) { } } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final TimeUnit unit) { final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); if (executor instanceof ScheduledExecutorService) { try { @@ -87,8 +92,9 @@ public void run() { return dr; } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { if (executor instanceof ScheduledExecutorService) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { @@ -118,8 +124,9 @@ public ExecutorWorker(Executor executor) { this.queue = new MpscLinkedQueue(); } + @NonNull @Override - public Disposable schedule(Runnable run) { + public Disposable schedule(@NonNull Runnable run) { if (disposed) { return EmptyDisposable.INSTANCE; } @@ -143,8 +150,9 @@ public Disposable schedule(Runnable run) { return br; } + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (delay <= 0) { return schedule(run); } diff --git a/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java index 2dbdeb95a9..990a8cdd14 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ImmediateThinScheduler.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; /** @@ -45,22 +46,26 @@ private ImmediateThinScheduler() { // singleton class } + @NonNull @Override - public Disposable scheduleDirect(Runnable run) { + public Disposable scheduleDirect(@NonNull Runnable run) { run.run(); return DISPOSED; } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support delayed execution"); } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support periodic execution"); } + @NonNull @Override public Worker createWorker() { return WORKER; @@ -78,19 +83,22 @@ public boolean isDisposed() { return false; // dispose() has no effect } + @NonNull @Override - public Disposable schedule(Runnable run) { + public Disposable schedule(@NonNull Runnable run) { run.run(); return DISPOSED; } + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support delayed execution"); } + @NonNull @Override - public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodically(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException("This scheduler doesn't support periodic execution"); } } diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index 8030cd7333..cc22f0d4b1 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -17,6 +17,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; @@ -180,6 +181,7 @@ public void shutdown() { } } + @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); @@ -223,8 +225,9 @@ public boolean isDisposed() { return once.get(); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java index e78f897d7e..2513cb300d 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadScheduler.java @@ -17,6 +17,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import java.util.concurrent.ThreadFactory; @@ -48,6 +49,7 @@ public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } + @NonNull @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index a16955b76d..ea45d78e60 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -16,6 +16,7 @@ import java.util.concurrent.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; import io.reactivex.plugins.RxJavaPlugins; @@ -34,13 +35,15 @@ public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } + @NonNull @Override - public Disposable schedule(final Runnable run) { + public Disposable schedule(@NonNull final Runnable run) { return schedule(run, 0, null); } + @NonNull @Override - public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java index dea4e07efe..d8190ec30a 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java @@ -25,6 +25,7 @@ import io.reactivex.Observable; import io.reactivex.Scheduler; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.Exceptions; @@ -129,6 +130,7 @@ public boolean isDisposed() { return disposable.isDisposed(); } + @NonNull @Override public Worker createWorker() { final Worker actualWorker = actualScheduler.createWorker(); @@ -168,16 +170,18 @@ public boolean isDisposed() { return unsubscribed.get(); } + @NonNull @Override - public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit unit) { + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit unit) { // send a scheduled action to the actionQueue DelayedAction delayedAction = new DelayedAction(action, delayTime, unit); actionProcessor.onNext(delayedAction); return delayedAction; } + @NonNull @Override - public Disposable schedule(final Runnable action) { + public Disposable schedule(@NonNull final Runnable action) { // send a scheduled action to the actionQueue ImmediateAction immediateAction = new ImmediateAction(action); actionProcessor.onNext(immediateAction); diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index 5e0e46cc39..47e7761f89 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -13,6 +13,7 @@ package io.reactivex.internal.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.plugins.RxJavaPlugins; @@ -96,13 +97,15 @@ public void shutdown() { } } + @NonNull @Override public Worker createWorker() { return new ScheduledWorker(executor.get()); } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { Future f; @@ -118,8 +121,9 @@ public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { } } + @NonNull @Override - public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { + public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { Future f = executor.get().scheduleAtFixedRate(decoratedRun, initialDelay, period, unit); @@ -143,8 +147,9 @@ static final class ScheduledWorker extends Scheduler.Worker { this.tasks = new CompositeDisposable(); } + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index dc55b2a6d3..c63fdf2dac 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; @@ -36,6 +37,7 @@ public static TrampolineScheduler instance() { return INSTANCE; } + @NonNull @Override public Worker createWorker() { return new TrampolineWorker(); @@ -44,14 +46,16 @@ public Worker createWorker() { /* package accessible for unit tests */TrampolineScheduler() { } + @NonNull @Override - public Disposable scheduleDirect(Runnable run) { + public Disposable scheduleDirect(@NonNull Runnable run) { run.run(); return EmptyDisposable.INSTANCE; } + @NonNull @Override - public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { + public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { try { unit.sleep(delay); run.run(); @@ -71,13 +75,15 @@ static final class TrampolineWorker extends Scheduler.Worker implements Disposab volatile boolean disposed; + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { return enqueue(action, now(TimeUnit.MILLISECONDS)); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { long execTime = now(TimeUnit.MILLISECONDS) + unit.toMillis(delayTime); return enqueue(new SleepingRunnable(action, this, execTime), execTime); diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 9fd575dd8a..875dda8f35 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -12,84 +12,122 @@ */ package io.reactivex.plugins; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.*; - -import org.reactivestreams.Subscriber; - -import io.reactivex.*; +import io.reactivex.Completable; +import io.reactivex.CompletableObserver; +import io.reactivex.Flowable; +import io.reactivex.Maybe; +import io.reactivex.MaybeObserver; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.Scheduler; +import io.reactivex.Single; +import io.reactivex.SingleObserver; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import io.reactivex.flowables.ConnectableFlowable; -import io.reactivex.functions.*; +import io.reactivex.functions.BiFunction; +import io.reactivex.functions.BooleanSupplier; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.schedulers.*; +import io.reactivex.internal.schedulers.ComputationScheduler; +import io.reactivex.internal.schedulers.IoScheduler; +import io.reactivex.internal.schedulers.NewThreadScheduler; +import io.reactivex.internal.schedulers.SingleScheduler; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; import io.reactivex.parallel.ParallelFlowable; import io.reactivex.schedulers.Schedulers; +import org.reactivestreams.Subscriber; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadFactory; /** * Utility class to inject handlers to certain standard RxJava operations. */ public final class RxJavaPlugins { - + @Nullable static volatile Consumer errorHandler; + @Nullable static volatile Function onScheduleHandler; + @Nullable static volatile Function, Scheduler> onInitComputationHandler; + @Nullable static volatile Function, Scheduler> onInitSingleHandler; + @Nullable static volatile Function, Scheduler> onInitIoHandler; + @Nullable static volatile Function, Scheduler> onInitNewThreadHandler; + @Nullable static volatile Function onComputationHandler; + @Nullable static volatile Function onSingleHandler; + @Nullable static volatile Function onIoHandler; + @Nullable static volatile Function onNewThreadHandler; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onFlowableAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onConnectableFlowableAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onObservableAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onConnectableObservableAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onMaybeAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onSingleAssembly; static volatile Function onCompletableAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile Function onParallelAssembly; @SuppressWarnings("rawtypes") + @Nullable static volatile BiFunction onFlowableSubscribe; @SuppressWarnings("rawtypes") + @Nullable static volatile BiFunction onMaybeSubscribe; @SuppressWarnings("rawtypes") + @Nullable static volatile BiFunction onObservableSubscribe; @SuppressWarnings("rawtypes") + @Nullable static volatile BiFunction onSingleSubscribe; + @Nullable static volatile BiFunction onCompletableSubscribe; + @Nullable static volatile BooleanSupplier onBeforeBlocking; /** Prevents changing the plugins. */ @@ -149,6 +187,7 @@ public static boolean isFailOnNonBlockingScheduler() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function getComputationSchedulerHandler() { return onComputationHandler; } @@ -157,6 +196,7 @@ public static Function getComputationSchedulerHandler() { * Returns the a hook consumer. * @return the hook consumer, may be null */ + @Nullable public static Consumer getErrorHandler() { return errorHandler; } @@ -165,6 +205,7 @@ public static Consumer getErrorHandler() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function, Scheduler> getInitComputationSchedulerHandler() { return onInitComputationHandler; } @@ -173,6 +214,7 @@ public static Function, Scheduler> getInitComputationSchedul * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function, Scheduler> getInitIoSchedulerHandler() { return onInitIoHandler; } @@ -181,6 +223,7 @@ public static Function, Scheduler> getInitIoSchedulerHandler * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function, Scheduler> getInitNewThreadSchedulerHandler() { return onInitNewThreadHandler; } @@ -189,6 +232,7 @@ public static Function, Scheduler> getInitNewThreadScheduler * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function, Scheduler> getInitSingleSchedulerHandler() { return onInitSingleHandler; } @@ -197,6 +241,7 @@ public static Function, Scheduler> getInitSingleSchedulerHan * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function getIoSchedulerHandler() { return onIoHandler; } @@ -205,6 +250,7 @@ public static Function getIoSchedulerHandler() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function getNewThreadSchedulerHandler() { return onNewThreadHandler; } @@ -213,6 +259,7 @@ public static Function getNewThreadSchedulerHandler() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function getScheduleHandler() { return onScheduleHandler; } @@ -221,6 +268,7 @@ public static Function getScheduleHandler() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function getSingleSchedulerHandler() { return onSingleHandler; } @@ -231,7 +279,8 @@ public static Function getSingleSchedulerHandler() { * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initComputationScheduler(Callable defaultScheduler) { + @NonNull + public static Scheduler initComputationScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitComputationHandler; if (f == null) { @@ -246,7 +295,8 @@ public static Scheduler initComputationScheduler(Callable defaultSche * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initIoScheduler(Callable defaultScheduler) { + @NonNull + public static Scheduler initIoScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitIoHandler; if (f == null) { @@ -261,7 +311,8 @@ public static Scheduler initIoScheduler(Callable defaultScheduler) { * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initNewThreadScheduler(Callable defaultScheduler) { + @NonNull + public static Scheduler initNewThreadScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitNewThreadHandler; if (f == null) { @@ -276,7 +327,8 @@ public static Scheduler initNewThreadScheduler(Callable defaultSchedu * @return the value returned by the hook, not null * @throws NullPointerException if the callable parameter or its result are null */ - public static Scheduler initSingleScheduler(Callable defaultScheduler) { + @NonNull + public static Scheduler initSingleScheduler(@NonNull Callable defaultScheduler) { ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null"); Function, Scheduler> f = onInitSingleHandler; if (f == null) { @@ -290,7 +342,8 @@ public static Scheduler initSingleScheduler(Callable defaultScheduler * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - public static Scheduler onComputationScheduler(Scheduler defaultScheduler) { + @NonNull + public static Scheduler onComputationScheduler(@NonNull Scheduler defaultScheduler) { Function f = onComputationHandler; if (f == null) { return defaultScheduler; @@ -302,7 +355,7 @@ public static Scheduler onComputationScheduler(Scheduler defaultScheduler) { * Called when an undeliverable error occurs. * @param error the error to report */ - public static void onError(Throwable error) { + public static void onError(@NonNull Throwable error) { Consumer f = errorHandler; if (error == null) { @@ -324,7 +377,7 @@ public static void onError(Throwable error) { uncaught(error); } - static void uncaught(Throwable error) { + static void uncaught(@NonNull Throwable error) { Thread currentThread = Thread.currentThread(); UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler(); handler.uncaughtException(currentThread, error); @@ -335,7 +388,8 @@ static void uncaught(Throwable error) { * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - public static Scheduler onIoScheduler(Scheduler defaultScheduler) { + @NonNull + public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) { Function f = onIoHandler; if (f == null) { return defaultScheduler; @@ -348,7 +402,8 @@ public static Scheduler onIoScheduler(Scheduler defaultScheduler) { * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - public static Scheduler onNewThreadScheduler(Scheduler defaultScheduler) { + @NonNull + public static Scheduler onNewThreadScheduler(@NonNull Scheduler defaultScheduler) { Function f = onNewThreadHandler; if (f == null) { return defaultScheduler; @@ -361,7 +416,8 @@ public static Scheduler onNewThreadScheduler(Scheduler defaultScheduler) { * @param run the runnable instance * @return the replacement runnable */ - public static Runnable onSchedule(Runnable run) { + @NonNull + public static Runnable onSchedule(@NonNull Runnable run) { Function f = onScheduleHandler; if (f == null) { return run; @@ -374,7 +430,8 @@ public static Runnable onSchedule(Runnable run) { * @param defaultScheduler the hook's input value * @return the value returned by the hook */ - public static Scheduler onSingleScheduler(Scheduler defaultScheduler) { + @NonNull + public static Scheduler onSingleScheduler(@NonNull Scheduler defaultScheduler) { Function f = onSingleHandler; if (f == null) { return defaultScheduler; @@ -429,7 +486,7 @@ public static void reset() { * Sets the specific hook function. * @param handler the hook function to set, null allowed */ - public static void setComputationSchedulerHandler(Function handler) { + public static void setComputationSchedulerHandler(@Nullable Function handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -440,7 +497,7 @@ public static void setComputationSchedulerHandler(Function * Sets the specific hook function. * @param handler the hook function to set, null allowed */ - public static void setErrorHandler(Consumer handler) { + public static void setErrorHandler(@Nullable Consumer handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -451,7 +508,7 @@ public static void setErrorHandler(Consumer handler) { * Sets the specific hook function. * @param handler the hook function to set, null allowed, but the function may not return null */ - public static void setInitComputationSchedulerHandler(Function, Scheduler> handler) { + public static void setInitComputationSchedulerHandler(@Nullable Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -462,7 +519,7 @@ public static void setInitComputationSchedulerHandler(Function, Scheduler> handler) { + public static void setInitIoSchedulerHandler(@Nullable Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -473,7 +530,7 @@ public static void setInitIoSchedulerHandler(Function, Sched * Sets the specific hook function. * @param handler the hook function to set, null allowed, but the function may not return null */ - public static void setInitNewThreadSchedulerHandler(Function, Scheduler> handler) { + public static void setInitNewThreadSchedulerHandler(@Nullable Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -484,7 +541,7 @@ public static void setInitNewThreadSchedulerHandler(Function * Sets the specific hook function. * @param handler the hook function to set, null allowed, but the function may not return null */ - public static void setInitSingleSchedulerHandler(Function, Scheduler> handler) { + public static void setInitSingleSchedulerHandler(@Nullable Function, Scheduler> handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -495,7 +552,7 @@ public static void setInitSingleSchedulerHandler(Function, S * Sets the specific hook function. * @param handler the hook function to set, null allowed */ - public static void setIoSchedulerHandler(Function handler) { + public static void setIoSchedulerHandler(@Nullable Function handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -506,7 +563,7 @@ public static void setIoSchedulerHandler(Function handler) * Sets the specific hook function. * @param handler the hook function to set, null allowed */ - public static void setNewThreadSchedulerHandler(Function handler) { + public static void setNewThreadSchedulerHandler(@Nullable Function handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -517,7 +574,7 @@ public static void setNewThreadSchedulerHandler(Function h * Sets the specific hook function. * @param handler the hook function to set, null allowed */ - public static void setScheduleHandler(Function handler) { + public static void setScheduleHandler(@Nullable Function handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -528,7 +585,7 @@ public static void setScheduleHandler(Function handler) { * Sets the specific hook function. * @param handler the hook function to set, null allowed */ - public static void setSingleSchedulerHandler(Function handler) { + public static void setSingleSchedulerHandler(@Nullable Function handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -546,6 +603,7 @@ public static void setSingleSchedulerHandler(Function hand * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static Function getOnCompletableAssembly() { return onCompletableAssembly; } @@ -554,6 +612,7 @@ public static Function getOnCompletableAssembly() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable public static BiFunction getOnCompletableSubscribe() { return onCompletableSubscribe; } @@ -563,6 +622,7 @@ public static BiFunction * @return the hook function, may be null */ @SuppressWarnings("rawtypes") + @Nullable public static Function getOnFlowableAssembly() { return onFlowableAssembly; } @@ -572,6 +632,7 @@ public static Function getOnFlowableAssembly() { * @return the hook function, may be null */ @SuppressWarnings("rawtypes") + @Nullable public static Function getOnConnectableFlowableAssembly() { return onConnectableFlowableAssembly; } @@ -580,6 +641,7 @@ public static Function getOnConnectabl * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static BiFunction getOnFlowableSubscribe() { return onFlowableSubscribe; @@ -589,6 +651,7 @@ public static BiFunction getOnFlowableSubscrib * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static BiFunction getOnMaybeSubscribe() { return onMaybeSubscribe; @@ -598,6 +661,7 @@ public static BiFunction getOnMaybeSubscrib * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static Function getOnMaybeAssembly() { return onMaybeAssembly; @@ -607,6 +671,7 @@ public static Function getOnMaybeAssembly() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static Function getOnSingleAssembly() { return onSingleAssembly; @@ -616,6 +681,7 @@ public static Function getOnSingleAssembly() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static BiFunction getOnSingleSubscribe() { return onSingleSubscribe; @@ -625,6 +691,7 @@ public static BiFunction getOnSingleSubs * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static Function getOnObservableAssembly() { return onObservableAssembly; @@ -634,6 +701,7 @@ public static Function getOnObservableAssembly() { * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static Function getOnConnectableObservableAssembly() { return onConnectableObservableAssembly; @@ -643,6 +711,7 @@ public static Function getOnConnec * Returns the current hook function. * @return the hook function, may be null */ + @Nullable @SuppressWarnings("rawtypes") public static BiFunction getOnObservableSubscribe() { return onObservableSubscribe; @@ -652,7 +721,7 @@ public static BiFunction getOnObservableSubscrib * Sets the specific hook function. * @param onCompletableAssembly the hook function to set, null allowed */ - public static void setOnCompletableAssembly(Function onCompletableAssembly) { + public static void setOnCompletableAssembly(@Nullable Function onCompletableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -664,7 +733,7 @@ public static void setOnCompletableAssembly(Function o * @param onCompletableSubscribe the hook function to set, null allowed */ public static void setOnCompletableSubscribe( - BiFunction onCompletableSubscribe) { + @Nullable BiFunction onCompletableSubscribe) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -676,7 +745,7 @@ public static void setOnCompletableSubscribe( * @param onFlowableAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") - public static void setOnFlowableAssembly(Function onFlowableAssembly) { + public static void setOnFlowableAssembly(@Nullable Function onFlowableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -688,7 +757,7 @@ public static void setOnFlowableAssembly(Function onFlowable * @param onMaybeAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") - public static void setOnMaybeAssembly(Function onMaybeAssembly) { + public static void setOnMaybeAssembly(@Nullable Function onMaybeAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -700,7 +769,7 @@ public static void setOnMaybeAssembly(Function onMaybeAssembly) { * @param onConnectableFlowableAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") - public static void setOnConnectableFlowableAssembly(Function onConnectableFlowableAssembly) { + public static void setOnConnectableFlowableAssembly(@Nullable Function onConnectableFlowableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -712,7 +781,7 @@ public static void setOnConnectableFlowableAssembly(Function onFlowableSubscribe) { + public static void setOnFlowableSubscribe(@Nullable BiFunction onFlowableSubscribe) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -724,7 +793,7 @@ public static void setOnFlowableSubscribe(BiFunction onMaybeSubscribe) { + public static void setOnMaybeSubscribe(@Nullable BiFunction onMaybeSubscribe) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -736,7 +805,7 @@ public static void setOnMaybeSubscribe(BiFunction onObservableAssembly) { + public static void setOnObservableAssembly(@Nullable Function onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -748,7 +817,7 @@ public static void setOnObservableAssembly(Function onOb * @param onConnectableObservableAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") - public static void setOnConnectableObservableAssembly(Function onConnectableObservableAssembly) { + public static void setOnConnectableObservableAssembly(@Nullable Function onConnectableObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -761,7 +830,7 @@ public static void setOnConnectableObservableAssembly(Function onObservableSubscribe) { + @Nullable BiFunction onObservableSubscribe) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -773,7 +842,7 @@ public static void setOnObservableSubscribe( * @param onSingleAssembly the hook function to set, null allowed */ @SuppressWarnings("rawtypes") - public static void setOnSingleAssembly(Function onSingleAssembly) { + public static void setOnSingleAssembly(@Nullable Function onSingleAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -785,7 +854,7 @@ public static void setOnSingleAssembly(Function onSingleAssembly * @param onSingleSubscribe the hook function to set, null allowed */ @SuppressWarnings("rawtypes") - public static void setOnSingleSubscribe(BiFunction onSingleSubscribe) { + public static void setOnSingleSubscribe(@Nullable BiFunction onSingleSubscribe) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -800,7 +869,8 @@ public static void setOnSingleSubscribe(BiFunction Subscriber onSubscribe(Flowable source, Subscriber subscriber) { + @NonNull + public static Subscriber onSubscribe(@NonNull Flowable source, @NonNull Subscriber subscriber) { BiFunction f = onFlowableSubscribe; if (f != null) { return apply(f, source, subscriber); @@ -816,7 +886,8 @@ public static Subscriber onSubscribe(Flowable source, Subscrib * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Observer onSubscribe(Observable source, Observer observer) { + @NonNull + public static Observer onSubscribe(@NonNull Observable source, @NonNull Observer observer) { BiFunction f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); @@ -832,7 +903,8 @@ public static Observer onSubscribe(Observable source, Observer * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static SingleObserver onSubscribe(Single source, SingleObserver observer) { + @NonNull + public static SingleObserver onSubscribe(@NonNull Single source, @NonNull SingleObserver observer) { BiFunction f = onSingleSubscribe; if (f != null) { return apply(f, source, observer); @@ -846,7 +918,8 @@ public static SingleObserver onSubscribe(Single source, Single * @param observer the observer * @return the value returned by the hook */ - public static CompletableObserver onSubscribe(Completable source, CompletableObserver observer) { + @NonNull + public static CompletableObserver onSubscribe(@NonNull Completable source, @NonNull CompletableObserver observer) { BiFunction f = onCompletableSubscribe; if (f != null) { return apply(f, source, observer); @@ -862,7 +935,8 @@ public static CompletableObserver onSubscribe(Completable source, CompletableObs * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static MaybeObserver onSubscribe(Maybe source, MaybeObserver subscriber) { + @NonNull + public static MaybeObserver onSubscribe(@NonNull Maybe source, @NonNull MaybeObserver subscriber) { BiFunction f = onMaybeSubscribe; if (f != null) { return apply(f, source, subscriber); @@ -877,7 +951,8 @@ public static MaybeObserver onSubscribe(Maybe source, MaybeObs * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Maybe onAssembly(Maybe source) { + @NonNull + public static Maybe onAssembly(@NonNull Maybe source) { Function f = onMaybeAssembly; if (f != null) { return apply(f, source); @@ -892,7 +967,8 @@ public static Maybe onAssembly(Maybe source) { * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Flowable onAssembly(Flowable source) { + @NonNull + public static Flowable onAssembly(@NonNull Flowable source) { Function f = onFlowableAssembly; if (f != null) { return apply(f, source); @@ -907,7 +983,8 @@ public static Flowable onAssembly(Flowable source) { * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static ConnectableFlowable onAssembly(ConnectableFlowable source) { + @NonNull + public static ConnectableFlowable onAssembly(@NonNull ConnectableFlowable source) { Function f = onConnectableFlowableAssembly; if (f != null) { return apply(f, source); @@ -922,7 +999,8 @@ public static ConnectableFlowable onAssembly(ConnectableFlowable sourc * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Observable onAssembly(Observable source) { + @NonNull + public static Observable onAssembly(@NonNull Observable source) { Function f = onObservableAssembly; if (f != null) { return apply(f, source); @@ -937,7 +1015,8 @@ public static Observable onAssembly(Observable source) { * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static ConnectableObservable onAssembly(ConnectableObservable source) { + @NonNull + public static ConnectableObservable onAssembly(@NonNull ConnectableObservable source) { Function f = onConnectableObservableAssembly; if (f != null) { return apply(f, source); @@ -952,7 +1031,8 @@ public static ConnectableObservable onAssembly(ConnectableObservable s * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Single onAssembly(Single source) { + @NonNull + public static Single onAssembly(@NonNull Single source) { Function f = onSingleAssembly; if (f != null) { return apply(f, source); @@ -965,7 +1045,8 @@ public static Single onAssembly(Single source) { * @param source the hook's input value * @return the value returned by the hook */ - public static Completable onAssembly(Completable source) { + @NonNull + public static Completable onAssembly(@NonNull Completable source) { Function f = onCompletableAssembly; if (f != null) { return apply(f, source); @@ -980,7 +1061,7 @@ public static Completable onAssembly(Completable source) { */ @Experimental @SuppressWarnings("rawtypes") - public static void setOnParallelAssembly(Function handler) { + public static void setOnParallelAssembly(@Nullable Function handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -994,6 +1075,7 @@ public static void setOnParallelAssembly(Function getOnParallelAssembly() { return onParallelAssembly; } @@ -1007,7 +1089,8 @@ public static Function getOnParallelAssembly */ @Experimental @SuppressWarnings({ "rawtypes", "unchecked" }) - public static ParallelFlowable onAssembly(ParallelFlowable source) { + @NonNull + public static ParallelFlowable onAssembly(@NonNull ParallelFlowable source) { Function f = onParallelAssembly; if (f != null) { return apply(f, source); @@ -1047,7 +1130,7 @@ public static boolean onBeforeBlocking() { * @since 2.0.5 - experimental */ @Experimental - public static void setOnBeforeBlocking(BooleanSupplier handler) { + public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } @@ -1061,6 +1144,7 @@ public static void setOnBeforeBlocking(BooleanSupplier handler) { * @since 2.0.5 - experimental */ @Experimental + @Nullable public static BooleanSupplier getOnBeforeBlocking() { return onBeforeBlocking; } @@ -1074,7 +1158,8 @@ public static BooleanSupplier getOnBeforeBlocking() { * @since 2.0.5 - experimental */ @Experimental - public static Scheduler createComputationScheduler(ThreadFactory threadFactory) { + @NonNull + public static Scheduler createComputationScheduler(@NonNull ThreadFactory threadFactory) { return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1087,7 +1172,8 @@ public static Scheduler createComputationScheduler(ThreadFactory threadFactory) * @since 2.0.5 - experimental */ @Experimental - public static Scheduler createIoScheduler(ThreadFactory threadFactory) { + @NonNull + public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) { return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1100,7 +1186,8 @@ public static Scheduler createIoScheduler(ThreadFactory threadFactory) { * @since 2.0.5 - experimental */ @Experimental - public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) { + @NonNull + public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFactory) { return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1113,7 +1200,8 @@ public static Scheduler createNewThreadScheduler(ThreadFactory threadFactory) { * @since 2.0.5 - experimental */ @Experimental - public static Scheduler createSingleScheduler(ThreadFactory threadFactory) { + @NonNull + public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFactory) { return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null")); } @@ -1126,7 +1214,8 @@ public static Scheduler createSingleScheduler(ThreadFactory threadFactory) { * @param t the parameter value to the function * @return the result of the function call */ - static R apply(Function f, T t) { + @NonNull + static R apply(@NonNull Function f, @NonNull T t) { try { return f.apply(t); } catch (Throwable ex) { @@ -1145,7 +1234,8 @@ static R apply(Function f, T t) { * @param u the second parameter value to the function * @return the result of the function call */ - static R apply(BiFunction f, T t, U u) { + @NonNull + static R apply(@NonNull BiFunction f, @NonNull T t, @NonNull U u) { try { return f.apply(t, u); } catch (Throwable ex) { @@ -1160,7 +1250,8 @@ static R apply(BiFunction f, T t, U u) { * @return the result of the callable call, not null * @throws NullPointerException if the callable parameter returns null */ - static Scheduler callRequireNonNull(Callable s) { + @NonNull + static Scheduler callRequireNonNull(@NonNull Callable s) { try { return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null"); } catch (Throwable ex) { @@ -1176,7 +1267,8 @@ static Scheduler callRequireNonNull(Callable s) { * @return the result of the function call, not null * @throws NullPointerException if the function parameter returns null */ - static Scheduler applyRequireNonNull(Function, Scheduler> f, Callable s) { + @NonNull + static Scheduler applyRequireNonNull(@NonNull Function, Scheduler> f, Callable s) { return ObjectHelper.requireNonNull(apply(f, s), "Scheduler Callable result can't be null"); } diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index c5913b70cf..967acce36d 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -14,6 +14,7 @@ package io.reactivex.schedulers; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.internal.schedulers.*; import io.reactivex.plugins.RxJavaPlugins; @@ -34,14 +35,19 @@ * */ public final class Schedulers { + @NonNull static final Scheduler SINGLE; + @NonNull static final Scheduler COMPUTATION; + @NonNull static final Scheduler IO; + @NonNull static final Scheduler TRAMPOLINE; + @NonNull static final Scheduler NEW_THREAD; static final class SingleHolder { @@ -108,6 +114,7 @@ private Schedulers() { * * @return a {@link Scheduler} meant for computation-bound work */ + @NonNull public static Scheduler computation() { return RxJavaPlugins.onComputationScheduler(COMPUTATION); } @@ -125,6 +132,7 @@ public static Scheduler computation() { * * @return a {@link Scheduler} meant for IO-bound work */ + @NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } @@ -135,6 +143,7 @@ public static Scheduler io() { * * @return a {@link Scheduler} that queues work on the current thread */ + @NonNull public static Scheduler trampoline() { return TRAMPOLINE; } @@ -146,6 +155,7 @@ public static Scheduler trampoline() { * * @return a {@link Scheduler} that creates new threads */ + @NonNull public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); } @@ -163,6 +173,7 @@ public static Scheduler newThread() { * @return a {@link Scheduler} that shares a single backing thread. * @since 2.0 */ + @NonNull public static Scheduler single() { return RxJavaPlugins.onSingleScheduler(SINGLE); } @@ -174,7 +185,8 @@ public static Scheduler single() { * the executor to wrap * @return the new Scheduler wrapping the Executor */ - public static Scheduler from(Executor executor) { + @NonNull + public static Scheduler from(@NonNull Executor executor) { return new ExecutorScheduler(executor); } diff --git a/src/main/java/io/reactivex/schedulers/TestScheduler.java b/src/main/java/io/reactivex/schedulers/TestScheduler.java index ffb592ff30..d8700e4770 100644 --- a/src/main/java/io/reactivex/schedulers/TestScheduler.java +++ b/src/main/java/io/reactivex/schedulers/TestScheduler.java @@ -17,6 +17,7 @@ import java.util.concurrent.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; @@ -63,7 +64,7 @@ public int compareTo(TimedRunnable o) { } @Override - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return unit.convert(time, TimeUnit.NANOSECONDS); } @@ -118,6 +119,7 @@ private void triggerActions(long targetTimeInNanoseconds) { time = targetTimeInNanoseconds; } + @NonNull @Override public Worker createWorker() { return new TestWorker(); @@ -137,8 +139,9 @@ public boolean isDisposed() { return disposed; } + @NonNull @Override - public Disposable schedule(Runnable run, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } @@ -153,8 +156,9 @@ public void run() { }); } + @NonNull @Override - public Disposable schedule(Runnable run) { + public Disposable schedule(@NonNull Runnable run) { if (disposed) { return EmptyDisposable.INSTANCE; } @@ -169,7 +173,7 @@ public void run() { } @Override - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return TestScheduler.this.now(unit); } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index 358e3b2d2d..ff10a30a23 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; @@ -708,14 +709,16 @@ private static class InprocessWorker extends Worker { this.mockDisposable = mockDisposable; } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { action.run(); return mockDisposable; // this subscription is returned but discarded } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { action.run(); return mockDisposable; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java index 578a7e8984..c505c7610d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import org.junit.*; import org.reactivestreams.*; @@ -122,6 +123,7 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { this.unit = unit; } + @NonNull @Override public Worker createWorker() { return new SlowInner(actual.createWorker()); @@ -145,13 +147,15 @@ public boolean isDisposed() { return actualInner.isDisposed(); } + @NonNull @Override - public Disposable schedule(final Runnable action) { + public Disposable schedule(@NonNull final Runnable action) { return actualInner.schedule(action, delay, unit); } + @NonNull @Override - public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit delayUnit) { + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit delayUnit) { TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); return actualInner.schedule(action, t, common); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java index 8b309546e3..b6aadd7c8f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.annotations.NonNull; import org.junit.Test; import org.reactivestreams.*; @@ -177,6 +178,7 @@ public void run() { } } + @NonNull @Override public Worker createWorker() { return eventLoop.createWorker(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 002d0b086c..0d87564aed 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.annotations.NonNull; import org.junit.*; import org.mockito.InOrder; @@ -689,14 +690,16 @@ static class InprocessWorker extends Worker { this.mockDisposable = mockDisposable; } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { action.run(); return mockDisposable; // this subscription is returned but discarded } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { action.run(); return mockDisposable; } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java index 241f6ab554..6418c5338f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSubscribeOnTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.annotations.NonNull; import org.junit.*; import io.reactivex.*; @@ -117,6 +118,7 @@ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { this.unit = unit; } + @NonNull @Override public Worker createWorker() { return new SlowInner(actual.createWorker()); @@ -140,13 +142,15 @@ public boolean isDisposed() { return actualInner.isDisposed(); } + @NonNull @Override - public Disposable schedule(final Runnable action) { + public Disposable schedule(@NonNull final Runnable action) { return actualInner.schedule(action, delay, unit); } + @NonNull @Override - public Disposable schedule(final Runnable action, final long delayTime, final TimeUnit delayUnit) { + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit delayUnit) { TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); return actualInner.schedule(action, t, common); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java index 7a55b73350..3003c71ea3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import org.junit.Test; import io.reactivex.*; @@ -179,6 +180,7 @@ public void run() { } } + @NonNull @Override public Worker createWorker() { return eventLoop.createWorker(); diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 06bd6ec6e3..26b80f392a 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -1349,14 +1349,6 @@ public void onComplete() { // // assertSame(cop, RxJavaPlugins.onCompletableLift(cop)); - assertNull(RxJavaPlugins.onComputationScheduler(null)); - - assertNull(RxJavaPlugins.onIoScheduler(null)); - - assertNull(RxJavaPlugins.onNewThreadScheduler(null)); - - assertNull(RxJavaPlugins.onSingleScheduler(null)); - final Scheduler s = ImmediateThinScheduler.INSTANCE; Callable c = new Callable() { @Override diff --git a/src/test/java/io/reactivex/schedulers/SchedulerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerTest.java index 165ed2e026..0962d35080 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerTest.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.NonNull; import org.junit.Test; import io.reactivex.*; @@ -248,11 +249,13 @@ public void schedulersUtility() { @Test public void defaultSchedulePeriodicallyDirectRejects() { Scheduler s = new Scheduler() { + @NonNull @Override public Worker createWorker() { return new Worker() { + @NonNull @Override - public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { return EmptyDisposable.INSTANCE; } diff --git a/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java b/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java index ecacd4328e..910bf715b0 100644 --- a/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java +++ b/src/test/java/io/reactivex/schedulers/SchedulerWorkerTest.java @@ -18,6 +18,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.NonNull; import org.junit.Test; import io.reactivex.Scheduler; @@ -27,6 +28,7 @@ public class SchedulerWorkerTest { static final class CustomDriftScheduler extends Scheduler { public volatile long drift; + @NonNull @Override public Worker createWorker() { final Worker w = Schedulers.computation().createWorker(); @@ -42,13 +44,15 @@ public boolean isDisposed() { return w.isDisposed(); } + @NonNull @Override - public Disposable schedule(Runnable action) { + public Disposable schedule(@NonNull Runnable action) { return w.schedule(action); } + @NonNull @Override - public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { + public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { return w.schedule(action, delayTime, unit); } @@ -60,7 +64,7 @@ public long now(TimeUnit unit) { } @Override - public long now(TimeUnit unit) { + public long now(@NonNull TimeUnit unit) { return super.now(unit) + unit.convert(drift, TimeUnit.NANOSECONDS); } }