diff --git a/src/main/java/rx/schedulers/ExecutorScheduler.java b/src/main/java/rx/schedulers/ExecutorScheduler.java index 8e5c9bf22e..70f217e49b 100644 --- a/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -96,11 +96,20 @@ public Subscription schedule(Action0 action) { @Override public void run() { do { + if (tasks.isUnsubscribed()) { + queue.clear(); + return; + } + ScheduledAction sa = queue.poll(); + if (sa == null) { + return; + } + if (!sa.isUnsubscribed()) { sa.run(); } - } while (wip.decrementAndGet() > 0); + } while (wip.decrementAndGet() != 0); } @Override @@ -170,6 +179,7 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { tasks.unsubscribe(); + queue.clear(); } } diff --git a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java index ed4e03213d..0777208cab 100644 --- a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java @@ -18,9 +18,11 @@ import static org.junit.Assert.*; import java.lang.management.*; +import java.util.Queue; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; import org.junit.Test; import rx.*; @@ -275,4 +277,32 @@ public void call() { assertFalse(w.tasks.hasSubscriptions()); } + + @Test + public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace() { + Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(1)); + for (int i = 0; i< 1000; i++) { + Worker worker = scheduler.createWorker(); + final Queue q = new ConcurrentLinkedQueue(); + Action0 action1 = new Action0() { + + @Override + public void call() { + q.add(1); + }}; + Action0 action2 = new Action0() { + + @Override + public void call() { + q.add(2); + }}; + worker.schedule(action1); + worker.schedule(action2); + worker.unsubscribe(); + if (q.size()==1 && q.poll() == 2) { + //expect a queue of 1,2 or 1. If queue is just 2 then we have a problem! + Assert.fail("wrong order on loop " + i); + } + } + } }