* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events.
*/
-public class ExecutorScheduler extends Scheduler {
+public class ExecutorScheduler extends Scheduler implements ReentrantSchedulerHelper {
private final Executor executor;
public ExecutorScheduler(Executor executor) {
@@ -47,18 +48,17 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
@Override
public Subscription schedulePeriodically(final T state, final Func2 super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
- final CompositeSubscription subscriptions = new CompositeSubscription();
+ CompositeSubscription subscription = new CompositeSubscription();
+ final ForwardSubscription scheduleSub = new ForwardSubscription();
+ final ForwardSubscription actionSub = new ForwardSubscription();
+ subscription.add(scheduleSub);
+ subscription.add(actionSub);
- ScheduledFuture> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- Subscription s = action.call(ExecutorScheduler.this, state);
- subscriptions.add(s);
- }
- }, initialDelay, period, unit);
+ final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);
- subscriptions.add(Subscriptions.from(f));
- return subscriptions;
+ _scheduler.schedulePeriodically(state, action, initialDelay, period, unit);
+
+ return subscription;
} else {
return super.schedulePeriodically(state, action, initialDelay, period, unit);
@@ -67,81 +67,80 @@ public void run() {
@Override
public Subscription schedule(final T state, final Func2 super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
- final DiscardableAction discardableAction = new DiscardableAction(state, action);
- final Scheduler _scheduler = this;
+ CompositeSubscription subscription = new CompositeSubscription();
+ final ForwardSubscription scheduleSub = new ForwardSubscription();
+ final ForwardSubscription actionSub = new ForwardSubscription();
+ subscription.add(scheduleSub);
+ subscription.add(actionSub);
+
+ final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);
+
+ _scheduler.schedule(state, action, delayTime, unit);
+
+ return subscription;
+ }
+
+ @Override
+ public Subscription schedule(T state, Func2 super Scheduler, ? super T, ? extends Subscription> action) {
// all subscriptions that may need to be unsubscribed
- final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
+ CompositeSubscription subscription = new CompositeSubscription();
+ final ForwardSubscription scheduleSub = new ForwardSubscription();
+ final ForwardSubscription actionSub = new ForwardSubscription();
+ subscription.add(scheduleSub);
+ subscription.add(actionSub);
+
+ final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);
+
+ _scheduler.schedule(state, action);
+ return subscription;
+ }
+
+ @Override
+ public void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit) {
+ Subscription before = out.getSubscription();
if (executor instanceof ScheduledExecutorService) {
// we are a ScheduledExecutorService so can do proper scheduling
- ScheduledFuture> f = ((ScheduledExecutorService) executor).schedule(new Runnable() {
- @Override
- public void run() {
- // when the delay has passed we now do the work on the actual scheduler
- Subscription s = discardableAction.call(_scheduler);
- // add the subscription to the CompositeSubscription so it is unsubscribed
- subscription.add(s);
- }
- }, delayTime, unit);
+ ScheduledFuture> f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
- subscription.add(Subscriptions.from(f));
+ out.compareExchange(before, Subscriptions.from(f));
} else {
// we are not a ScheduledExecutorService so can't directly schedule
if (delayTime == 0) {
// no delay so put on the thread-pool right now
- Subscription s = schedule(state, action);
- // add the subscription to the CompositeSubscription so it is unsubscribed
- subscription.add(s);
+ scheduleTask(r, out);
} else {
// there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
// to handle the scheduling and once it's ready then execute on this Executor
- ScheduledFuture> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
-
- @Override
- public void run() {
- // now execute on the real Executor (by using the other overload that schedules for immediate execution)
- Subscription s = _scheduler.schedule(state, action);
- // add the subscription to the CompositeSubscription so it is unsubscribed
- subscription.add(s);
- }
- }, delayTime, unit);
+ ScheduledFuture> f = GenericScheduledExecutorService.getInstance().schedule(r, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
- subscription.add(Subscriptions.from(f));
+ out.compareExchange(before, Subscriptions.from(f));
}
}
- return subscription;
}
-
+
@Override
- public Subscription schedule(T state, Func2 super Scheduler, ? super T, ? extends Subscription> action) {
- final DiscardableAction discardableAction = new DiscardableAction(state, action);
- final Scheduler _scheduler = this;
- // all subscriptions that may need to be unsubscribed
- final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
-
- // work to be done on a thread
- Runnable r = new Runnable() {
- @Override
- public void run() {
- Subscription s = discardableAction.call(_scheduler);
- // add the subscription to the CompositeSubscription so it is unsubscribed
- subscription.add(s);
- }
- };
-
+ public void scheduleTask(Runnable r, ForwardSubscription out) {
+ Subscription before = out.getSubscription();
// submit for immediate execution
if (executor instanceof ExecutorService) {
// we are an ExecutorService so get a Future back that supports unsubscribe
Future> f = ((ExecutorService) executor).submit(r);
// add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
- subscription.add(Subscriptions.from(f));
+ out.compareExchange(before, Subscriptions.from(f));
} else {
// we are the lowest common denominator so can't unsubscribe once we execute
executor.execute(r);
+ out.compareExchange(before, Subscriptions.empty());
}
+ }
- return subscription;
+ @Override
+ public void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit) {
+ Subscription before = out.getSubscription();
+ ScheduledFuture> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit);
+ out.compareExchange(before, Subscriptions.from(f));
}
-
+
}
diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java
new file mode 100644
index 0000000000..950d517585
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java
@@ -0,0 +1,151 @@
+ /**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.schedulers;
+
+import java.util.concurrent.TimeUnit;
+import rx.Scheduler;
+import rx.Subscription;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.ForwardSubscription;
+import rx.subscriptions.SerialSubscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
+
+/**
+ * Do not re-enter the main scheduler's schedule() method as it will
+ * unnecessarily chain the subscriptions of every invocation.
+ */
+public final class ReentrantScheduler extends Scheduler {
+ final ReentrantSchedulerHelper scheduler;
+ final ForwardSubscription scheduleSub;
+ final ForwardSubscription actionSub;
+ final CompositeSubscription composite;
+
+ public ReentrantScheduler(
+ ReentrantSchedulerHelper scheduler,
+ ForwardSubscription scheduleSub,
+ ForwardSubscription actionSub,
+ CompositeSubscription composite) {
+ this.scheduler = scheduler;
+ this.scheduleSub = scheduleSub;
+ this.actionSub = actionSub;
+ this.composite = composite;
+ }
+
+ @Override
+ public Subscription schedule(T state, Func2 super Scheduler, ? super T, ? extends Subscription> action) {
+ if (composite.isUnsubscribed()) {
+ // don't bother scheduling a task which wouldn't run anyway
+ return Subscriptions.empty();
+ }
+ Subscription before = actionSub.getSubscription();
+ final DiscardableAction discardableAction = new DiscardableAction(state, action);
+
+ actionSub.compareExchange(before, discardableAction);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ Subscription sbefore = actionSub.getSubscription();
+ Subscription s = discardableAction.call(ReentrantScheduler.this);
+ actionSub.compareExchange(sbefore, s);
+ }
+ };
+
+ scheduler.scheduleTask(r, scheduleSub);
+
+ return composite;
+ }
+
+ @Override
+ public Subscription schedule(T state, Func2 super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
+ if (composite.isUnsubscribed()) {
+ // don't bother scheduling a task which wouldn't run anyway
+ return Subscriptions.empty();
+ }
+
+ Subscription before = actionSub.getSubscription();
+ final DiscardableAction discardableAction = new DiscardableAction(state, action);
+ actionSub.compareExchange(before, discardableAction);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ Subscription sbefore = actionSub.getSubscription();
+ Subscription s = discardableAction.call(ReentrantScheduler.this);
+ actionSub.compareExchange(sbefore, s);
+ }
+ };
+ scheduler.scheduleTask(r, scheduleSub, delayTime, unit);
+
+ return composite;
+ }
+
+ @Override
+ public Subscription schedulePeriodically(T state, Func2 super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
+ if (composite.isUnsubscribed()) {
+ // don't bother scheduling a task which wouldn't run anyway
+ return Subscriptions.empty();
+ }
+
+ Subscription before = actionSub.getSubscription();
+ final PeriodicAction periodicAction = new PeriodicAction(state, action);
+ actionSub.compareExchange(before, periodicAction);
+
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ Subscription sbefore = actionSub.getSubscription();
+ Subscription s = periodicAction.call(ReentrantScheduler.this);
+ actionSub.compareExchange(sbefore, s);
+ }
+ };
+ scheduler.scheduleTask(r, scheduleSub, initialDelay, period, unit);
+
+ return composite;
+ }
+ /**
+ * An action that calls the underlying function in a periodic environment.
+ * @param the state value type
+ */
+ private static final class PeriodicAction implements Subscription, Func1 {
+ final T state;
+ final Func2 super Scheduler, ? super T, ? extends Subscription> underlying;
+ final SerialSubscription ssub;
+
+ public PeriodicAction(T state, Func2 super Scheduler, ? super T, ? extends Subscription> underlying) {
+ this.state = state;
+ this.underlying = underlying;
+ this.ssub = new SerialSubscription();
+ }
+
+ @Override
+ public Subscription call(Scheduler scheduler) {
+ if (!ssub.isUnsubscribed()) {
+ Subscription s = underlying.call(scheduler, state);
+ ssub.setSubscription(s);
+ return ssub;
+ }
+ return Subscriptions.empty();
+ }
+
+ @Override
+ public void unsubscribe() {
+ ssub.unsubscribe();
+ }
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java b/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java
new file mode 100644
index 0000000000..717d60581e
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/schedulers/ReentrantSchedulerHelper.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.schedulers;
+
+import java.util.concurrent.TimeUnit;
+import rx.subscriptions.ForwardSubscription;
+
+/**
+ * Simple scheduler API used by the ReentrantScheduler to
+ * communicate with the actual scheduler implementation.
+ */
+public interface ReentrantSchedulerHelper {
+ /**
+ * Schedule a task to be run immediately and update the subscription
+ * describing the schedule.
+ * @param r the task to run immediately
+ * @param out the subscription holding the current schedule subscription
+ */
+ void scheduleTask(Runnable r, ForwardSubscription out);
+
+ /**
+ * Schedule a task to be run after the delay time and update the subscription
+ * describing the schedule.
+ * @param r the task to schedule
+ * @param out the subscription holding the current schedule subscription
+ * @param delayTime the time to delay the execution
+ * @param unit the time unit
+ */
+ void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit);
+
+ /**
+ * Schedule a task to be run after the delay time and after
+ * each period, then update the subscription describing the schedule.
+ * @param r the task to schedule
+ * @param out the subscription holding the current schedule subscription
+ * @param initialDelay the initial delay of the schedule
+ * @param period the between period of the schedule
+ * @param unit the time unit
+ */
+ void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit);
+}
diff --git a/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java
new file mode 100644
index 0000000000..7a64b260de
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/subscriptions/ForwardSubscription.java
@@ -0,0 +1,162 @@
+ /**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subscriptions;
+
+import java.util.concurrent.atomic.AtomicReference;
+import rx.Subscription;
+
+/**
+ * A subscription that holds another subscription and
+ * allows swapping it in compare-and-swap style and does
+ * not unsubscribe any replaced values by default.
+ *
+ * Overloads are provided to perform the unsubscription on
+ * the old value if required.
+ */
+public class ForwardSubscription implements Subscription {
+ /** The atomic reference. */
+ final AtomicReference reference = new AtomicReference();
+ /** The unsubscription sentinel. */
+ private static final Subscription UNSUBSCRIBE_SENTINEL = new Subscription() {
+ @Override
+ public void unsubscribe() {
+ }
+ };
+ /**
+ * Creates an empty ForwardSubscription.
+ */
+ public ForwardSubscription() {
+
+ }
+ /**
+ * Creates a ForwardSubscription with the initial subscription.
+ * @param initial the initial subscription
+ */
+ public ForwardSubscription(Subscription initial) {
+ reference.set(initial);
+ }
+ /**
+ * Returns true if this subscription has been unsubscribed.
+ * @return true if this subscription has been unsubscribed
+ */
+ public boolean isUnsubscribed() {
+ return reference.get() == UNSUBSCRIBE_SENTINEL;
+ }
+ /**
+ * Returns the current maintained subscription.
+ * @return the current maintained subscription
+ */
+ public Subscription getSubscription() {
+ Subscription s = reference.get();
+ if (s == UNSUBSCRIBE_SENTINEL) {
+ return Subscriptions.empty();
+ }
+ return s;
+ }
+ /**
+ * Atomically replace the current subscription but
+ * don't unsubscribe the old value.
+ * @param newValue the new subscription to set
+ */
+ public void setSubscription(Subscription newValue) {
+ setSubscription(newValue, false);
+ }
+ /**
+ * Atomically replace the current subscription and
+ * unsubscribe the old value id required.
+ * @param newValue the new subscription to set
+ */
+ public void setSubscription(Subscription newValue, boolean unsubscribeOld) {
+ Subscription s = replace(newValue);
+ if (unsubscribeOld && s != null) {
+ s.unsubscribe();
+ }
+ }
+ /**
+ * Atomically replace a new subscription and return the old one.
+ *
+ * If this subscription is unsubscribed, the newValue subscription
+ * is unsubscribed and an empty subscription is returned.
+ * @param newValue the new subscription
+ * @return the old subscription or empty if this ForwardSubscription is unsubscribed
+ */
+ public Subscription replace(Subscription newValue) {
+ do {
+ Subscription old = reference.get();
+ if (old == UNSUBSCRIBE_SENTINEL) {
+ if (newValue != null) {
+ newValue.unsubscribe();
+ }
+ return Subscriptions.empty();
+ }
+ if (reference.compareAndSet(old, newValue)) {
+ return old;
+ }
+ } while (true);
+ }
+ /**
+ * Atomically change the subscription only if it is the expected value
+ * but don't unsubscribe the old value.
+ * If this subscription is unsubscribed, the newValue is immediately
+ * unsubscribed.
+ * @param expected the expected subscription
+ * @param newValue the new subscription
+ * @return true if successfully replaced, false if this
+ * subscription is unsubscribed or it didn't contain
+ * the expected subscription.
+ */
+ public boolean compareExchange(Subscription expected, Subscription newValue) {
+ return compareExchange(expected, newValue, false);
+ }
+ /**
+ * Atomically change the subscription only if it is the expected value
+ * and unsubscribe the old one if required.
+ * @param expected the expected subscription
+ * @param newValue the new subscription
+ * @param unsubscribeOld indicates to unsubscribe the old subscription if the exchange succeeded.
+ * @return true if successfully replaced, false if this
+ * subscription is unsubscribed or it didn't contain
+ * the expected subscription.
+ */
+ public boolean compareExchange(Subscription expected, Subscription newValue, boolean unsubscribeOld) {
+ do {
+ Subscription old = reference.get();
+ if (old == UNSUBSCRIBE_SENTINEL) {
+ if (newValue != null) {
+ newValue.unsubscribe();
+ }
+ return false;
+ }
+ if (old != expected) {
+ return false;
+ }
+ if (reference.compareAndSet(old, newValue)) {
+ if (unsubscribeOld && old != null) {
+ old.unsubscribe();
+ }
+ return true;
+ }
+ } while (true);
+ }
+ @Override
+ public void unsubscribe() {
+ Subscription s = reference.getAndSet(UNSUBSCRIBE_SENTINEL);
+ if (s != null) {
+ s.unsubscribe();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/rxjava-core/src/test/java/rx/SchedulersTest.java b/rxjava-core/src/test/java/rx/SchedulersTest.java
index 62e74f5798..f91dbbc908 100644
--- a/rxjava-core/src/test/java/rx/SchedulersTest.java
+++ b/rxjava-core/src/test/java/rx/SchedulersTest.java
@@ -327,12 +327,13 @@ public void testRecursiveScheduler2() throws InterruptedException {
// use latches instead of Thread.sleep
final CountDownLatch latch = new CountDownLatch(10);
final CountDownLatch completionLatch = new CountDownLatch(1);
+ final BooleanSubscription cancel = new BooleanSubscription();
- Observable obs = Observable.create(new OnSubscribeFunc() {
+ Observable obs = Observable.create(new Observable.OnSubscribeFunc() {
@Override
public Subscription onSubscribe(final Observer super Integer> observer) {
- return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2() {
+ return Schedulers.threadPoolForComputation().schedule(cancel, new Func2() {
@Override
public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
if (cancel.isUnsubscribed()) {
@@ -378,6 +379,15 @@ public void onNext(Integer args) {
fail("Timed out waiting on onNext latch");
}
+
+ // wait some turn to let the action run
+ Thread.sleep(100);
+
+ cancel.unsubscribe();
+
+ // allow seeing the cancellation
+ Thread.sleep(100);
+
// now unsubscribe and ensure it stops the recursive loop
subscribe.unsubscribe();
System.out.println("unsubscribe");
diff --git a/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java
new file mode 100644
index 0000000000..9bb03d07a7
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/schedulers/ReentrantSchedulerTest.java
@@ -0,0 +1,104 @@
+ /**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.schedulers;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
+
+public class ReentrantSchedulerTest {
+ @Test
+ public void testReentrantSchedulerIsProvided() throws InterruptedException {
+ final AtomicReference