Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 3 additions & 55 deletions src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

import rx.annotations.Experimental;
import rx.functions.*;
import rx.internal.schedulers.SchedulerWhen;
import rx.internal.subscriptions.SequentialSubscription;
import rx.internal.schedulers.*;
import rx.schedulers.Schedulers;

/**
Expand All @@ -44,18 +43,6 @@ public abstract class Scheduler {
* : Without virtual extension methods even additive changes are breaking and thus severely impede library
* maintenance.
*/

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
*/
static final long CLOCK_DRIFT_TOLERANCE_NANOS;
static {
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx.scheduler.drift-tolerance", 15));
}

/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
* <p>
Expand Down Expand Up @@ -121,47 +108,8 @@ public abstract static class Worker implements Subscription {
* @return a subscription to be able to prevent or cancel the execution of the action
*/
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);

final SequentialSubscription first = new SequentialSubscription();
final SequentialSubscription mas = new SequentialSubscription(first);

final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
long startInNanos = firstStartInNanos;
@Override
public void call() {
action.call();

if (!mas.isUnsubscribed()) {

long nextTick;

long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
nextTick = nowNanos + periodInNanos;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanos = nextTick - (periodInNanos * (++count));
} else {
nextTick = startInNanos + (++count * periodInNanos);
}
lastNowNanos = nowNanos;

long delay = nextTick - nowNanos;
mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
first.replace(schedule(recursiveAction, initialDelay, unit));
return mas;
return SchedulePeriodicHelper.schedulePeriodically(this, action,
initialDelay, period, unit, null);
}

/**
Expand Down
102 changes: 102 additions & 0 deletions src/main/java/rx/internal/schedulers/SchedulePeriodicHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.schedulers;

import java.util.concurrent.TimeUnit;

import rx.Scheduler.Worker;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.subscriptions.SequentialSubscription;

/**
* Utility method for scheduling tasks periodically (at a fixed rate) by using Worker.schedule(Action0, long, TimeUnit).
*/
public final class SchedulePeriodicHelper {

/** Utility class. */
private SchedulePeriodicHelper() {
throw new IllegalStateException("No instances!");
}

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
*/
public static final long CLOCK_DRIFT_TOLERANCE_NANOS;
static {
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx.scheduler.drift-tolerance", 15));
}

/**
* Return the current time in nanoseconds.
*/
public interface NowNanoSupplier {
long nowNanos();
}

public static Subscription schedulePeriodically(
final Worker worker,
final Action0 action,
long initialDelay, long period, TimeUnit unit,
final NowNanoSupplier nowNanoSupplier) {
final long periodInNanos = unit.toNanos(period);
final long firstNowNanos = nowNanoSupplier != null ? nowNanoSupplier.nowNanos() : TimeUnit.MILLISECONDS.toNanos(worker.now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);

final SequentialSubscription first = new SequentialSubscription();
final SequentialSubscription mas = new SequentialSubscription(first);

final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
long startInNanos = firstStartInNanos;
@Override
public void call() {
action.call();

if (!mas.isUnsubscribed()) {

long nextTick;

long nowNanos = nowNanoSupplier != null ? nowNanoSupplier.nowNanos() : TimeUnit.MILLISECONDS.toNanos(worker.now());
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
nextTick = nowNanos + periodInNanos;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanos = nextTick - (periodInNanos * (++count));
} else {
nextTick = startInNanos + (++count * periodInNanos);
}
lastNowNanos = nowNanos;

long delay = nextTick - nowNanos;
mas.replace(worker.schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
first.replace(worker.schedule(recursiveAction, initialDelay, unit));
return mas;
}

}
15 changes: 14 additions & 1 deletion src/main/java/rx/schedulers/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.SchedulePeriodicHelper;
import rx.internal.schedulers.SchedulePeriodicHelper.NowNanoSupplier;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;

Expand Down Expand Up @@ -130,7 +132,7 @@ public Worker createWorker() {
return new InnerTestScheduler();
}

final class InnerTestScheduler extends Worker {
final class InnerTestScheduler extends Worker implements NowNanoSupplier {

private final BooleanSubscription s = new BooleanSubscription();

Expand Down Expand Up @@ -172,10 +174,21 @@ public void call() {
});
}

@Override
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
return SchedulePeriodicHelper.schedulePeriodically(this,
action, initialDelay, period, unit, this);
}

@Override
public long now() {
return TestScheduler.this.now();
}

@Override
public long nowNanos() {
return TestScheduler.this.time;
}

}

Expand Down
5 changes: 3 additions & 2 deletions src/test/java/rx/SchedulerWorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.Test;

import rx.functions.Action0;
import rx.internal.schedulers.SchedulePeriodicHelper;
import rx.schedulers.Schedulers;

public class SchedulerWorkerTest {
Expand Down Expand Up @@ -85,7 +86,7 @@ public void call() {

Thread.sleep(150);

s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(SchedulePeriodicHelper.CLOCK_DRIFT_TOLERANCE_NANOS);

Thread.sleep(400);

Expand Down Expand Up @@ -127,7 +128,7 @@ public void call() {

Thread.sleep(150);

s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(SchedulePeriodicHelper.CLOCK_DRIFT_TOLERANCE_NANOS);

Thread.sleep(400);

Expand Down
37 changes: 25 additions & 12 deletions src/test/java/rx/schedulers/TestSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,18 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.*;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.*;
import rx.observers.TestSubscriber;

public class TestSchedulerTest {

Expand Down Expand Up @@ -222,4 +215,24 @@ public void call() {
inner.unsubscribe();
}
}

@Test
public void resolution() {
for (final TimeUnit unit : TimeUnit.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Observable.interval(30, unit, scheduler)
.map(new Func1<Long, String>() {
@Override
public String call(Long v) {
return v + "-" + unit;
}
})
.subscribe(testSubscriber);
scheduler.advanceTimeTo(60, unit);

testSubscriber.assertValues("0-" + unit, "1-" + unit);
}
}
}