diff --git a/src/main/java/rx/Scheduler.java b/src/main/java/rx/Scheduler.java index bbcd19ae34..b98615ff47 100644 --- a/src/main/java/rx/Scheduler.java +++ b/src/main/java/rx/Scheduler.java @@ -19,8 +19,7 @@ import rx.annotations.Experimental; import rx.functions.*; -import rx.internal.schedulers.SchedulerWhen; -import rx.internal.subscriptions.SequentialSubscription; +import rx.internal.schedulers.*; import rx.schedulers.Schedulers; /** @@ -44,18 +43,6 @@ public abstract class Scheduler { * : Without virtual extension methods even additive changes are breaking and thus severely impede library * maintenance. */ - - /** - * The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase. - *

- * The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes. - */ - static final long CLOCK_DRIFT_TOLERANCE_NANOS; - static { - CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos( - Long.getLong("rx.scheduler.drift-tolerance", 15)); - } - /** * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions. *

@@ -121,47 +108,8 @@ public abstract static class Worker implements Subscription { * @return a subscription to be able to prevent or cancel the execution of the action */ public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { - final long periodInNanos = unit.toNanos(period); - final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now()); - final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay); - - final SequentialSubscription first = new SequentialSubscription(); - final SequentialSubscription mas = new SequentialSubscription(first); - - final Action0 recursiveAction = new Action0() { - long count; - long lastNowNanos = firstNowNanos; - long startInNanos = firstStartInNanos; - @Override - public void call() { - action.call(); - - if (!mas.isUnsubscribed()) { - - long nextTick; - - long nowNanos = TimeUnit.MILLISECONDS.toNanos(now()); - // If the clock moved in a direction quite a bit, rebase the repetition period - if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos - || nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) { - nextTick = nowNanos + periodInNanos; - /* - * Shift the start point back by the drift as if the whole thing - * started count periods ago. - */ - startInNanos = nextTick - (periodInNanos * (++count)); - } else { - nextTick = startInNanos + (++count * periodInNanos); - } - lastNowNanos = nowNanos; - - long delay = nextTick - nowNanos; - mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); - } - } - }; - first.replace(schedule(recursiveAction, initialDelay, unit)); - return mas; + return SchedulePeriodicHelper.schedulePeriodically(this, action, + initialDelay, period, unit, null); } /** diff --git a/src/main/java/rx/internal/schedulers/SchedulePeriodicHelper.java b/src/main/java/rx/internal/schedulers/SchedulePeriodicHelper.java new file mode 100644 index 0000000000..4ca7b74941 --- /dev/null +++ b/src/main/java/rx/internal/schedulers/SchedulePeriodicHelper.java @@ -0,0 +1,102 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.schedulers; + +import java.util.concurrent.TimeUnit; + +import rx.Scheduler.Worker; +import rx.Subscription; +import rx.functions.Action0; +import rx.internal.subscriptions.SequentialSubscription; + +/** + * Utility method for scheduling tasks periodically (at a fixed rate) by using Worker.schedule(Action0, long, TimeUnit). + */ +public final class SchedulePeriodicHelper { + + /** Utility class. */ + private SchedulePeriodicHelper() { + throw new IllegalStateException("No instances!"); + } + + /** + * The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase. + *

+ * The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes. + */ + public static final long CLOCK_DRIFT_TOLERANCE_NANOS; + static { + CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos( + Long.getLong("rx.scheduler.drift-tolerance", 15)); + } + + /** + * Return the current time in nanoseconds. + */ + public interface NowNanoSupplier { + long nowNanos(); + } + + public static Subscription schedulePeriodically( + final Worker worker, + final Action0 action, + long initialDelay, long period, TimeUnit unit, + final NowNanoSupplier nowNanoSupplier) { + final long periodInNanos = unit.toNanos(period); + final long firstNowNanos = nowNanoSupplier != null ? nowNanoSupplier.nowNanos() : TimeUnit.MILLISECONDS.toNanos(worker.now()); + final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay); + + final SequentialSubscription first = new SequentialSubscription(); + final SequentialSubscription mas = new SequentialSubscription(first); + + final Action0 recursiveAction = new Action0() { + long count; + long lastNowNanos = firstNowNanos; + long startInNanos = firstStartInNanos; + @Override + public void call() { + action.call(); + + if (!mas.isUnsubscribed()) { + + long nextTick; + + long nowNanos = nowNanoSupplier != null ? nowNanoSupplier.nowNanos() : TimeUnit.MILLISECONDS.toNanos(worker.now()); + // If the clock moved in a direction quite a bit, rebase the repetition period + if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos + || nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) { + nextTick = nowNanos + periodInNanos; + /* + * Shift the start point back by the drift as if the whole thing + * started count periods ago. + */ + startInNanos = nextTick - (periodInNanos * (++count)); + } else { + nextTick = startInNanos + (++count * periodInNanos); + } + lastNowNanos = nowNanos; + + long delay = nextTick - nowNanos; + mas.replace(worker.schedule(this, delay, TimeUnit.NANOSECONDS)); + } + } + }; + first.replace(worker.schedule(recursiveAction, initialDelay, unit)); + return mas; + } + +} diff --git a/src/main/java/rx/schedulers/TestScheduler.java b/src/main/java/rx/schedulers/TestScheduler.java index f540569b19..89717914c9 100644 --- a/src/main/java/rx/schedulers/TestScheduler.java +++ b/src/main/java/rx/schedulers/TestScheduler.java @@ -23,6 +23,8 @@ import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; +import rx.internal.schedulers.SchedulePeriodicHelper; +import rx.internal.schedulers.SchedulePeriodicHelper.NowNanoSupplier; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -130,7 +132,7 @@ public Worker createWorker() { return new InnerTestScheduler(); } - final class InnerTestScheduler extends Worker { + final class InnerTestScheduler extends Worker implements NowNanoSupplier { private final BooleanSubscription s = new BooleanSubscription(); @@ -172,10 +174,21 @@ public void call() { }); } + @Override + public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { + return SchedulePeriodicHelper.schedulePeriodically(this, + action, initialDelay, period, unit, this); + } + @Override public long now() { return TestScheduler.this.now(); } + + @Override + public long nowNanos() { + return TestScheduler.this.time; + } } diff --git a/src/test/java/rx/SchedulerWorkerTest.java b/src/test/java/rx/SchedulerWorkerTest.java index 16d0c65553..159d3d34a7 100644 --- a/src/test/java/rx/SchedulerWorkerTest.java +++ b/src/test/java/rx/SchedulerWorkerTest.java @@ -23,6 +23,7 @@ import org.junit.Test; import rx.functions.Action0; +import rx.internal.schedulers.SchedulePeriodicHelper; import rx.schedulers.Schedulers; public class SchedulerWorkerTest { @@ -85,7 +86,7 @@ public void call() { Thread.sleep(150); - s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS); + s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(SchedulePeriodicHelper.CLOCK_DRIFT_TOLERANCE_NANOS); Thread.sleep(400); @@ -127,7 +128,7 @@ public void call() { Thread.sleep(150); - s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS); + s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(SchedulePeriodicHelper.CLOCK_DRIFT_TOLERANCE_NANOS); Thread.sleep(400); diff --git a/src/test/java/rx/schedulers/TestSchedulerTest.java b/src/test/java/rx/schedulers/TestSchedulerTest.java index ba95fd1df9..18e3e46913 100644 --- a/src/test/java/rx/schedulers/TestSchedulerTest.java +++ b/src/test/java/rx/schedulers/TestSchedulerTest.java @@ -17,25 +17,18 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; +import org.mockito.*; -import rx.Observable; +import rx.*; import rx.Observable.OnSubscribe; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.functions.Func1; +import rx.functions.*; +import rx.observers.TestSubscriber; public class TestSchedulerTest { @@ -222,4 +215,24 @@ public void call() { inner.unsubscribe(); } } + + @Test + public void resolution() { + for (final TimeUnit unit : TimeUnit.values()) { + TestScheduler scheduler = new TestScheduler(); + TestSubscriber testSubscriber = new TestSubscriber(); + + Observable.interval(30, unit, scheduler) + .map(new Func1() { + @Override + public String call(Long v) { + return v + "-" + unit; + } + }) + .subscribe(testSubscriber); + scheduler.advanceTimeTo(60, unit); + + testSubscriber.assertValues("0-" + unit, "1-" + unit); + } + } }