diff --git a/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java b/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java index 135888d310..be1a5bc6c6 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java +++ b/src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java @@ -29,11 +29,13 @@ * * @param the value type of the chain */ -public final class OnSubscribeAutoConnect implements OnSubscribe { - final ConnectableObservable source; +@SuppressWarnings("serial") +public final class OnSubscribeAutoConnect extends AtomicInteger implements OnSubscribe { + // AtomicInteger aspect of `this` represents the number of clients + + final ConnectableObservable source; final int numberOfSubscribers; final Action1 connection; - final AtomicInteger clients; public OnSubscribeAutoConnect(ConnectableObservable source, int numberOfSubscribers, @@ -44,12 +46,12 @@ public OnSubscribeAutoConnect(ConnectableObservable source, this.source = source; this.numberOfSubscribers = numberOfSubscribers; this.connection = connection; - this.clients = new AtomicInteger(); } @Override public void call(Subscriber child) { source.unsafeSubscribe(Subscribers.wrap(child)); - if (clients.incrementAndGet() == numberOfSubscribers) { + //this.get() represents the number of clients + if (this.incrementAndGet() == numberOfSubscribers) { source.connect(connection); } }