From 970a6092c39010fab65c09004c379ec8db0df10a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 8 Aug 2016 15:04:35 +0200 Subject: [PATCH] 1.x: Schedulers.from() to call RxJavaHooks.onScheduleAction --- .../schedulers/ExecutorScheduler.java | 9 +++- .../schedulers/ExecutorSchedulerTest.java | 49 +++++++++++++++++-- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java index b4bcf19d7f..aa188fc96c 100644 --- a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java @@ -64,6 +64,9 @@ public Subscription schedule(Action0 action) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } + + action = RxJavaHooks.onScheduledAction(action); + ScheduledAction ea = new ScheduledAction(action, tasks); tasks.add(ea); queue.offer(ea); @@ -111,7 +114,7 @@ public void run() { } @Override - public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (delayTime <= 0) { return schedule(action); } @@ -119,6 +122,8 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit return Subscriptions.unsubscribed(); } + final Action0 decorated = RxJavaHooks.onScheduledAction(action); + final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription(); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); mas.set(first); @@ -137,7 +142,7 @@ public void call() { return; } // schedule the real action untimed - Subscription s2 = schedule(action); + Subscription s2 = schedule(decorated); mas.set(s2); // unless the worker is unsubscribed, we should get a new ScheduledAction if (s2.getClass() == ScheduledAction.class) { diff --git a/src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java b/src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java index f80a32b041..a4f419f044 100644 --- a/src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java @@ -19,15 +19,16 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; + +import org.junit.*; + import rx.*; import rx.Scheduler.Worker; import rx.functions.*; import rx.internal.schedulers.ExecutorScheduler.ExecutorSchedulerWorker; import rx.internal.util.RxThreadFactory; -import rx.schedulers.AbstractSchedulerConcurrencyTests; -import rx.schedulers.SchedulerTests; -import rx.schedulers.Schedulers; +import rx.plugins.RxJavaHooks; +import rx.schedulers.*; public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -208,4 +209,44 @@ public void call() { assertFalse(w.tasks.hasSubscriptions()); } + + @Test + public void actionHookCalled() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + final int[] call = { 0 }; + + RxJavaHooks.setOnScheduleAction(new Func1() { + @Override + public Action0 call(Action0 t) { + call[0]++; + return t; + } + }); + + Scheduler s = Schedulers.from(exec); + + Worker w = s.createWorker(); + + final CountDownLatch cdl = new CountDownLatch(1); + + try { + w.schedule(new Action0() { + @Override + public void call() { + cdl.countDown(); + } + }); + + Assert.assertTrue("Action timed out", cdl.await(5, TimeUnit.SECONDS)); + } finally { + w.unsubscribe(); + } + + Assert.assertEquals("Hook not called!", 1, call[0]); + } finally { + RxJavaHooks.reset(); + exec.shutdown(); + } + } }