diff --git a/src/main/java/io/reactivex/annotations/SchedulerSupport.java b/src/main/java/io/reactivex/annotations/SchedulerSupport.java index d405236c92..acf89ad988 100644 --- a/src/main/java/io/reactivex/annotations/SchedulerSupport.java +++ b/src/main/java/io/reactivex/annotations/SchedulerSupport.java @@ -59,6 +59,13 @@ * or takes timing information from it. */ String TRAMPOLINE = "io.reactivex:trampoline"; + /** + * The operator/class runs on RxJava's {@linkplain Schedulers#single() single scheduler} + * or takes timing information from it. + * @since 2.0.8 - experimental + */ + @Experimental + String SINGLE = "io.reactivex:single"; /** * The kind of scheduler the class or method uses. diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index 7f621c6071..6f1071590b 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -23,6 +23,10 @@ /** * Static factory methods for returning standard Scheduler instances. *
+ * The initial and runtime values of the various scheduler types can be overridden via the + * {@code RxJavaPlugins.setInit(scheduler name)SchedulerHandler()} and + * {@code RxJavaPlugins.set(scheduler name)SchedulerHandler()} respectively. + *
* Supported system properties ({@code System.getProperty()}): *
* This can be used for event-loops, processing callbacks and other computational work. *
- * Do not perform IO-bound work on this scheduler. Use {@link #io()} instead. + * It is not recommended to perform blocking, IO-bound work on this scheduler. Use {@link #io()} instead. + *
+ * The default instance has a backing pool of single-threaded {@link ScheduledExecutorService} instances equal to + * the number of available processors ({@link java.lang.Runtime#availableProcessors()}) to the Java VM. *
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}. - * + *
+ * This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although + * not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or + * execute those tasks "unexpectedly". + *
+ * If the {@link RxJavaPlugins#setFailOnNonBlockingScheduler(boolean)} is set to true, attempting to execute + * operators that block while running on this scheduler will throw an {@link IllegalStateException}. + *
+ * You can control certain properties of this standard scheduler via system properties that have to be set
+ * before the {@link Schedulers} class is referenced in your code.
+ *
Supported system properties ({@code System.getProperty()}):
+ *
+ * The default value of this scheduler can be overridden at initialization time via the + * {@link RxJavaPlugins#setInitComputationSchedulerHandler(io.reactivex.functions.Function)} plugin method. + * Note that due to possible initialization cycles, using any of the other scheduler-returning methods will + * result in a {@code NullPointerException}. + * Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance + * via the {@link RxJavaPlugins#setComputationSchedulerHandler(io.reactivex.functions.Function)} method. + *
+ * It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the + * {@link RxJavaPlugins#createComputationScheduler(ThreadFactory)} method. Note that such custom + * instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the + * (J2EE) container to unload properly. + *
Operators on the base reactive classes that use this scheduler are marked with the + * @{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#COMPUTATION COMPUTATION}) + * annotation. * @return a {@link Scheduler} meant for computation-bound work */ @NonNull @@ -100,16 +136,42 @@ public static Scheduler computation() { } /** - * Creates and returns a {@link Scheduler} intended for IO-bound work. - *
- * The implementation is backed by an {@link Executor} thread-pool that will grow as needed. + * Returns a default, shared {@link Scheduler} instance intended for IO-bound work. *
* This can be used for asynchronously performing blocking IO. *
- * Do not perform computational work on this scheduler. Use {@link #computation()} instead. + * The implementation is backed by a pool of single-threaded {link ScheduledExecutorService} instances + * that will try to reuse previoulsy started instances used by the worker + * returned by {@link io.reactivex.Scheduler#createWorker()} but otherwise will start a new backing + * {link ScheduledExecutorService} instance. Note that this scheduler may create an unbounded number + * of worker threads that can result in system slowdowns or {@code OutOfMemoryError}. Therefore, for casual uses + * or when implementing an operator, the Worker instances must be disposed via {@link io.reactivex.Scheduler.Worker#dispose()}. + *
+ * It is not recommended to perform computational work on this scheduler. Use {@link #computation()} instead. *
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}. - * + *
+ * You can control certain properties of this standard scheduler via system properties that have to be set
+ * before the {@link Schedulers} class is referenced in your code.
+ *
Supported system properties ({@code System.getProperty()}):
+ *
+ * The default value of this scheduler can be overridden at initialization time via the + * {@link RxJavaPlugins#setInitIoSchedulerHandler(io.reactivex.functions.Function)} plugin method. + * Note that due to possible initialization cycles, using any of the other scheduler-returning methods will + * result in a {@code NullPointerException}. + * Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance + * via the {@link RxJavaPlugins#setIoSchedulerHandler(io.reactivex.functions.Function)} method. + *
+ * It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the + * {@link RxJavaPlugins#createIoScheduler(ThreadFactory)} method. Note that such custom + * instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the + * (J2EE) container to unload properly. + *
Operators on the base reactive classes that use this scheduler are marked with the + * @{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#IO IO}) + * annotation. * @return a {@link Scheduler} meant for IO-bound work */ @NonNull @@ -118,9 +180,17 @@ public static Scheduler io() { } /** - * Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the - * current work completes. - * + * Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker} + * instances queue work and execute them in a FIFO manner on one of the participating threads. + *
+ * The default implementation's {@link Scheduler#scheduleDirect(Runnable)} methods execute the tasks on the current thread + * without any queueing and the timed overloads use blocking sleep as well. + *
+ * Note that this scheduler can't be reliably used to return the execution of + * tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided + * by RxJava itself but may be found in external libraries. + *
+ * This scheduler can't be overridden via an {@link RxJavaPlugins} method. * @return a {@link Scheduler} that queues work on the current thread */ @NonNull @@ -129,10 +199,37 @@ public static Scheduler trampoline() { } /** - * Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work. + * Returns a default, shared {@link Scheduler} instance that creates a new {@link Thread} for each unit of work. + *
+ * The default implementation of this scheduler creates a new, single-threaded {@link ScheduledExecutorService} for + * each invocation of the {@link Scheduler#scheduleDirect(Runnable)} (plus its overloads) and {@link Scheduler#createWorker()} + * methods, thus an unbounded number of worker threads may be created that can + * result in system slowdowns or {@code OutOfMemoryError}. Therefore, for casual uses or when implementing an operator, + * the Worker instances must be disposed via {@link io.reactivex.Scheduler.Worker#dispose()}. *
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}. - * + *
+ * You can control certain properties of this standard scheduler via system properties that have to be set
+ * before the {@link Schedulers} class is referenced in your code.
+ *
Supported system properties ({@code System.getProperty()}):
+ *
+ * The default value of this scheduler can be overridden at initialization time via the + * {@link RxJavaPlugins#setInitNewThreadSchedulerHandler(io.reactivex.functions.Function)} plugin method. + * Note that due to possible initialization cycles, using any of the other scheduler-returning methods will + * result in a {@code NullPointerException}. + * Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance + * via the {@link RxJavaPlugins#setNewThreadSchedulerHandler(io.reactivex.functions.Function)} method. + *
+ * It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the + * {@link RxJavaPlugins#createNewThreadScheduler(ThreadFactory)} method. Note that such custom + * instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the + * (J2EE) container to unload properly. + *
Operators on the base reactive classes that use this scheduler are marked with the + * @{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#NEW_THREAD NEW_TRHEAD}) + * annotation. * @return a {@link Scheduler} that creates new threads */ @NonNull @@ -141,7 +238,8 @@ public static Scheduler newThread() { } /** - * Returns the common, single-thread backed Scheduler instance. + * Returns a default, shared, single-thread-backed {@link Scheduler} instance for work + * requiring strongly-sequential execution on the same background thread. *
* Uses: *
+ * Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}. + *
+ * This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although + * not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or + * execute those tasks "unexpectedly". + *
+ * If the {@link RxJavaPlugins#setFailOnNonBlockingScheduler(boolean)} is set to true, attempting to execute + * operators that block while running on this scheduler will throw an {@link IllegalStateException}. + *
+ * You can control certain properties of this standard scheduler via system properties that have to be set
+ * before the {@link Schedulers} class is referenced in your code.
+ *
Supported system properties ({@code System.getProperty()}):
+ *
+ * The default value of this scheduler can be overridden at initialization time via the + * {@link RxJavaPlugins#setInitSingleSchedulerHandler(io.reactivex.functions.Function)} plugin method. + * Note that due to possible initialization cycles, using any of the other scheduler-returning methods will + * result in a {@code NullPointerException}. + * Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance + * via the {@link RxJavaPlugins#setSingleSchedulerHandler(io.reactivex.functions.Function)} method. + *
+ * It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the + * {@link RxJavaPlugins#createSingleScheduler(ThreadFactory)} method. Note that such custom + * instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the + * (J2EE) container to unload properly. + *
Operators on the base reactive classes that use this scheduler are marked with the + * @{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#SINGLE SINGLE}) + * annotation. * @return a {@link Scheduler} that shares a single backing thread. * @since 2.0 */ @@ -159,8 +288,50 @@ public static Scheduler single() { } /** - * Converts an {@link Executor} into a new Scheduler instance. - * + * Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()} + * calls to it. + *
+ * If the provided executor doesn't support any of the more specific standard Java executor + * APIs, cancelling tasks scheduled by this scheduler can't be interrupted when they are + * executing but only prevented from running prior to that. In addition, tasks scheduled with + * a time delay or periodically will use the {@link #single()} scheduler for the timed waiting + * before posting the actual task to the given executor. + *
+ * If the provided executor supports the standard Java {@link ExecutorService} API, + * cancelling tasks scheduled by this scheduler can be cancelled/interrupted by calling + * {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with + * a time delay or periodically will use the {@link #single()} scheduler for the timed waiting + * before posting the actual task to the given executor. + *
+ * If the provided executor supports the standard Java {@link ScheduledExecutorService} API, + * cancelling tasks scheduled by this scheduler can be cancelled/interrupted by calling + * {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with + * a time delay or periodically will use the provided executor. Note, however, if the provided + * {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled + * with a time delay close to each other may end up executing in different order than + * the original schedule() call was issued. This limitation may be lifted in a future patch. + *
+ * Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
+ * executor's lifecycle must be managed externally:
+ *
+ *
+ * ExecutorService exec = Executors.newSingleThreadedExecutor();
+ * try {
+ * Scheduler scheduler = Schedulers.from(exec);
+ * Flowable.just(1)
+ * .subscribeOn(scheduler)
+ * .map(v -> v + 1)
+ * .observeOn(scheduler)
+ * .blockingSubscribe(System.out::println);
+ * } finally {
+ * exec.shutdown();
+ * }
+ *
+ * This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although + * not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or + * execute those tasks "unexpectedly". + *
+ * Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance. * @param executor * the executor to wrap * @return the new Scheduler wrapping the Executor @@ -171,7 +342,7 @@ public static Scheduler from(@NonNull Executor executor) { } /** - * Shuts down those standard Schedulers which support the SchedulerLifecycle interface. + * Shuts down the standard Schedulers. *
The operation is idempotent and thread-safe. */ public static void shutdown() { @@ -184,7 +355,7 @@ public static void shutdown() { } /** - * Starts those standard Schedulers which support the SchedulerLifecycle interface. + * Starts the standard Schedulers. *
The operation is idempotent and thread-safe. */ public static void start() {