diff --git a/src/main/java/rx/internal/operators/OperatorZip.java b/src/main/java/rx/internal/operators/OperatorZip.java index 91fc05f09f..6f1280b3c3 100644 --- a/src/main/java/rx/internal/operators/OperatorZip.java +++ b/src/main/java/rx/internal/operators/OperatorZip.java @@ -177,7 +177,10 @@ public void request(long n) { } - private static final class Zip extends AtomicLong { + static final class Zip extends AtomicLong { + /** */ + private static final long serialVersionUID = 5995274816189928317L; + final Observer child; private final FuncN zipFunction; private final CompositeSubscription childSubscription = new CompositeSubscription(); @@ -186,7 +189,7 @@ private static final class Zip extends AtomicLong { int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block /* initialized when started in `start` */ - private Object[] observers; + private volatile Object[] subscribers; private AtomicLong requested; public Zip(final Subscriber child, FuncN zipFunction) { @@ -197,16 +200,18 @@ public Zip(final Subscriber child, FuncN zipFunction) { @SuppressWarnings("unchecked") public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) { - observers = new Object[os.length]; - this.requested = requested; + final Object[] subscribers = new Object[os.length]; for (int i = 0; i < os.length; i++) { InnerSubscriber io = new InnerSubscriber(); - observers[i] = io; + subscribers[i] = io; childSubscription.add(io); } - + + this.requested = requested; + this.subscribers = subscribers; // full memory barrier: release all above + for (int i = 0; i < os.length; i++) { - os[i].unsafeSubscribe((InnerSubscriber) observers[i]); + os[i].unsafeSubscribe((InnerSubscriber) subscribers[i]); } } @@ -219,13 +224,13 @@ public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requ */ @SuppressWarnings("unchecked") void tick() { - final Object[] observers = this.observers; - if (observers == null) { + final Object[] subscribers = this.subscribers; + if (subscribers == null) { // nothing yet to do (initial request from Producer) return; } if (getAndIncrement() == 0) { - final int length = observers.length; + final int length = subscribers.length; final Observer child = this.child; final AtomicLong requested = this.requested; do { @@ -234,7 +239,7 @@ void tick() { final Object[] vs = new Object[length]; boolean allHaveValues = true; for (int i = 0; i < length; i++) { - RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items; + RxRingBuffer buffer = ((InnerSubscriber) subscribers[i]).items; Object n = buffer.peek(); if (n == null) { @@ -265,7 +270,7 @@ void tick() { return; } // now remove them - for (Object obj : observers) { + for (Object obj : subscribers) { RxRingBuffer buffer = ((InnerSubscriber) obj).items; buffer.poll(); // eagerly check if the next item on this queue is an onComplete @@ -278,7 +283,7 @@ void tick() { } } if (emitted > THRESHOLD) { - for (Object obj : observers) { + for (Object obj : subscribers) { ((InnerSubscriber) obj).requestMore(emitted); } emitted = 0;