diff --git a/src/main/java/rx/Scheduler.java b/src/main/java/rx/Scheduler.java index 1487373fcc..ee5f70ec46 100644 --- a/src/main/java/rx/Scheduler.java +++ b/src/main/java/rx/Scheduler.java @@ -18,11 +18,10 @@ import java.util.concurrent.TimeUnit; import rx.annotations.Experimental; -import rx.functions.Action0; -import rx.functions.Func1; +import rx.functions.*; import rx.internal.schedulers.SchedulerWhen; +import rx.internal.subscriptions.SequentialSubscription; import rx.schedulers.Schedulers; -import rx.subscriptions.MultipleAssignmentSubscription; /** * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this @@ -126,7 +125,9 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now()); final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay); - final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + final SequentialSubscription first = new SequentialSubscription(); + final SequentialSubscription mas = new SequentialSubscription(first); + final Action0 recursiveAction = new Action0() { long count; long lastNowNanos = firstNowNanos; @@ -155,14 +156,11 @@ public void call() { lastNowNanos = nowNanos; long delay = nextTick - nowNanos; - mas.set(schedule(this, delay, TimeUnit.NANOSECONDS)); + mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); } } }; - MultipleAssignmentSubscription s = new MultipleAssignmentSubscription(); - // Should call `mas.set` before `schedule`, or the new Subscription may replace the old one. - mas.set(s); - s.set(schedule(recursiveAction, initialDelay, unit)); + first.replace(schedule(recursiveAction, initialDelay, unit)); return mas; } diff --git a/src/main/java/rx/internal/subscriptions/SequentialSubscription.java b/src/main/java/rx/internal/subscriptions/SequentialSubscription.java new file mode 100644 index 0000000000..4c65e157e7 --- /dev/null +++ b/src/main/java/rx/internal/subscriptions/SequentialSubscription.java @@ -0,0 +1,189 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.subscriptions; + +import java.util.concurrent.atomic.AtomicReference; + +import rx.Subscription; +import rx.subscriptions.Subscriptions; + +/** + * A container of a Subscription that supports operations of SerialSubscription + * and MultipleAssignmentSubscription via methods (update, replace) and extends + * AtomicReference to reduce allocation count (beware the API leak of AtomicReference!). + * @since 1.1.9 + */ +public final class SequentialSubscription extends AtomicReference implements Subscription { + + /** */ + private static final long serialVersionUID = 995205034283130269L; + + /** + * Create an empty SequentialSubscription. + */ + public SequentialSubscription() { + + } + + /** + * Create a SequentialSubscription with the given initial Subscription. + * @param initial the initial Subscription, may be null + */ + public SequentialSubscription(Subscription initial) { + lazySet(initial); + } + + /** + * Returns the current contained Subscription (may be null). + *

(Remark: named as such because get() is final). + * @return the current contained Subscription (may be null) + */ + public Subscription current() { + Subscription current = super.get(); + if (current == Unsubscribed.INSTANCE) { + return Subscriptions.unsubscribed(); + } + return current; + } + + /** + * Atomically sets the contained Subscription to the provided next value and unsubscribes + * the previous value or unsubscribes the next value if this container is unsubscribed. + *

(Remark: named as such because set() is final). + * @param next the next Subscription to contain, may be null + * @return true if the update succeded, false if the container was unsubscribed + */ + public boolean update(Subscription next) { + for (;;) { + Subscription current = get(); + + if (current == Unsubscribed.INSTANCE) { + if (next != null) { + next.unsubscribe(); + } + return false; + } + + if (compareAndSet(current, next)) { + if (current != null) { + current.unsubscribe(); + } + return true; + } + } + } + + /** + * Atomically replaces the contained Subscription to the provided next value but + * does not unsubscribe the previous value or unsubscribes the next value if this + * container is unsubscribed. + * @param next the next Subscription to contain, may be null + * @return true if the update succeded, false if the container was unsubscribed + */ + public boolean replace(Subscription next) { + for (;;) { + Subscription current = get(); + + if (current == Unsubscribed.INSTANCE) { + if (next != null) { + next.unsubscribe(); + } + return false; + } + + if (compareAndSet(current, next)) { + return true; + } + } + } + + /** + * Atomically tries to set the contained Subscription to the provided next value and unsubscribes + * the previous value or unsubscribes the next value if this container is unsubscribed. + *

+ * Unlike {@link #update(Subscription)}, this doesn't retry if the replace failed + * because a concurrent operation changed the underlying contained object. + * @param next the next Subscription to contain, may be null + * @return true if the update succeded, false if the container was unsubscribed + */ + public boolean updateWeak(Subscription next) { + Subscription current = get(); + if (current == Unsubscribed.INSTANCE) { + if (next != null) { + next.unsubscribe(); + } + return false; + } + if (compareAndSet(current, next)) { + return true; + } + + current = get(); + + if (next != null) { + next.unsubscribe(); + } + return current == Unsubscribed.INSTANCE; + } + + /** + * Atomically tries to replace the contained Subscription to the provided next value but + * does not unsubscribe the previous value or unsubscribes the next value if this container + * is unsubscribed. + *

+ * Unlike {@link #replace(Subscription)}, this doesn't retry if the replace failed + * because a concurrent operation changed the underlying contained object. + * @param next the next Subscription to contain, may be null + * @return true if the update succeded, false if the container was unsubscribed + */ + public boolean replaceWeak(Subscription next) { + Subscription current = get(); + if (current == Unsubscribed.INSTANCE) { + if (next != null) { + next.unsubscribe(); + } + return false; + } + if (compareAndSet(current, next)) { + return true; + } + + current = get(); + if (current == Unsubscribed.INSTANCE) { + if (next != null) { + next.unsubscribe(); + } + return false; + } + return true; + } + + @Override + public void unsubscribe() { + Subscription current = get(); + if (current != Unsubscribed.INSTANCE) { + current = getAndSet(Unsubscribed.INSTANCE); + if (current != null && current != Unsubscribed.INSTANCE) { + current.unsubscribe(); + } + } + } + + @Override + public boolean isUnsubscribed() { + return get() == Unsubscribed.INSTANCE; + } +} diff --git a/src/main/java/rx/internal/subscriptions/Unsubscribed.java b/src/main/java/rx/internal/subscriptions/Unsubscribed.java new file mode 100644 index 0000000000..3bc85c7e9a --- /dev/null +++ b/src/main/java/rx/internal/subscriptions/Unsubscribed.java @@ -0,0 +1,35 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.subscriptions; + +import rx.Subscription; + +/** + * Represents an unsubscribed Subscription via a singleton; don't leak it! + */ +public enum Unsubscribed implements Subscription { + INSTANCE; + + @Override + public boolean isUnsubscribed() { + return true; + } + + @Override + public void unsubscribe() { + // deliberately ignored + } +} \ No newline at end of file diff --git a/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 1418a5684c..ee58c2ea77 100644 --- a/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -15,10 +15,8 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReference; - -import rx.Observable; -import rx.Subscription; +import rx.*; +import rx.internal.subscriptions.SequentialSubscription; /** * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop @@ -26,45 +24,16 @@ */ public final class MultipleAssignmentSubscription implements Subscription { - final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); + final SequentialSubscription state = new SequentialSubscription(); - static final class State { - final boolean isUnsubscribed; - final Subscription subscription; - - State(boolean u, Subscription s) { - this.isUnsubscribed = u; - this.subscription = s; - } - - State unsubscribe() { - return new State(true, subscription); - } - - State set(Subscription s) { - return new State(isUnsubscribed, s); - } - - } @Override public boolean isUnsubscribed() { - return state.get().isUnsubscribed; + return state.isUnsubscribed(); } @Override public void unsubscribe() { - State oldState; - State newState; - final AtomicReference localState = this.state; - do { - oldState = localState.get(); - if (oldState.isUnsubscribed) { - return; - } else { - newState = oldState.unsubscribe(); - } - } while (!localState.compareAndSet(oldState, newState)); - oldState.subscription.unsubscribe(); + state.unsubscribe(); } /** @@ -78,18 +47,7 @@ public void set(Subscription s) { if (s == null) { throw new IllegalArgumentException("Subscription can not be null"); } - State oldState; - State newState; - final AtomicReference localState = this.state; - do { - oldState = localState.get(); - if (oldState.isUnsubscribed) { - s.unsubscribe(); - return; - } else { - newState = oldState.set(s); - } - } while (!localState.compareAndSet(oldState, newState)); + state.replace(s); } /** @@ -98,7 +56,6 @@ public void set(Subscription s) { * @return the {@link Subscription} that underlies the {@code MultipleAssignmentSubscription} */ public Subscription get() { - return state.get().subscription; + return state.current(); } - } diff --git a/src/main/java/rx/subscriptions/SerialSubscription.java b/src/main/java/rx/subscriptions/SerialSubscription.java index b06503f761..5b9590ed25 100644 --- a/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/src/main/java/rx/subscriptions/SerialSubscription.java @@ -15,91 +15,48 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReference; - import rx.Subscription; +import rx.internal.subscriptions.SequentialSubscription; /** * Represents a subscription whose underlying subscription can be swapped for another subscription which causes * the previous underlying subscription to be unsubscribed. */ public final class SerialSubscription implements Subscription { - final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); - - static final class State { - final boolean isUnsubscribed; - final Subscription subscription; - - State(boolean u, Subscription s) { - this.isUnsubscribed = u; - this.subscription = s; - } - - State unsubscribe() { - return new State(true, subscription); - } - - State set(Subscription s) { - return new State(isUnsubscribed, s); - } - - } - + + final SequentialSubscription state = new SequentialSubscription(); + @Override public boolean isUnsubscribed() { - return state.get().isUnsubscribed; + return state.isUnsubscribed(); } @Override public void unsubscribe() { - State oldState; - State newState; - final AtomicReference localState = this.state; - do { - oldState = localState.get(); - if (oldState.isUnsubscribed) { - return; - } else { - newState = oldState.unsubscribe(); - } - } while (!localState.compareAndSet(oldState, newState)); - oldState.subscription.unsubscribe(); + state.unsubscribe(); } /** - * Swaps out the old {@link Subscription} for the specified {@code Subscription}. + * Sets the underlying subscription. If the {@code MultipleAssignmentSubscription} is already unsubscribed, + * setting a new subscription causes the new subscription to also be immediately unsubscribed. * - * @param s - * the new {@code Subscription} to swap in - * @throws IllegalArgumentException - * if {@code s} is {@code null} + * @param s the {@link Subscription} to set + * @throws IllegalArgumentException if {@code s} is {@code null} */ public void set(Subscription s) { if (s == null) { throw new IllegalArgumentException("Subscription can not be null"); } - State oldState; - State newState; - final AtomicReference localState = this.state; - do { - oldState = localState.get(); - if (oldState.isUnsubscribed) { - s.unsubscribe(); - return; - } else { - newState = oldState.set(s); - } - } while (!localState.compareAndSet(oldState, newState)); - oldState.subscription.unsubscribe(); + state.update(s); } /** - * Retrieves the current {@link Subscription} that is being represented by this {@code SerialSubscription}. - * - * @return the current {@link Subscription} that is being represented by this {@code SerialSubscription} + * Gets the underlying subscription. + * + * @return the {@link Subscription} that underlies the {@code MultipleAssignmentSubscription} */ public Subscription get() { - return state.get().subscription; + return state.current(); } } diff --git a/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java b/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java index 358f500ec3..2db843a420 100644 --- a/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java +++ b/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java @@ -15,15 +15,11 @@ */ package rx.subscriptions; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import static rx.subscriptions.Subscriptions.create; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import rx.Subscription; import rx.functions.Action0; @@ -72,6 +68,7 @@ public void subscribingWhenUnsubscribedCausesImmediateUnsubscription() { } @Test + @Ignore("This is prone to leaks") public void testSubscriptionRemainsAfterUnsubscribe() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); @@ -80,4 +77,16 @@ public void testSubscriptionRemainsAfterUnsubscribe() { Assert.assertEquals(true, mas.get() == s); } + + @Test + public void subscriptionDoesntRemainAfterUnsubscribe() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.set(s); + mas.unsubscribe(); + + assertNotEquals(s, mas.get()); + assertSame(mas.get(), Subscriptions.unsubscribed()); + } + } \ No newline at end of file