diff --git a/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java b/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java index e677fb9bc3..ddab514cc9 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java +++ b/src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java @@ -65,25 +65,27 @@ public void call(Subscriber child) { } /** Manages sub-observers and subscriptions. */ - final class ResultManager implements Subscription { + final class ResultManager extends HashMap>implements Subscription { + // HashMap aspect of `this` refers to `leftMap` + + private static final long serialVersionUID = -3035156013812425335L; + final RefCountSubscription cancel; final Subscriber subscriber; final CompositeSubscription group; - final Object guard = new Object(); - /** Guarded by guard. */ + /** Guarded by this. */ int leftIds; - /** Guarded by guard. */ + /** Guarded by this. */ int rightIds; - /** Guarded by guard. */ - final Map> leftMap = new HashMap>(); // NOPMD - /** Guarded by guard. */ + /** Guarded by this. */ final Map rightMap = new HashMap(); // NOPMD - /** Guarded by guard. */ + /** Guarded by this. */ boolean leftDone; - /** Guarded by guard. */ + /** Guarded by this. */ boolean rightDone; public ResultManager(Subscriber subscriber) { + super(); this.subscriber = subscriber; this.group = new CompositeSubscription(); this.cancel = new RefCountSubscription(group); @@ -110,15 +112,20 @@ public void unsubscribe() { public boolean isUnsubscribed() { return cancel.isUnsubscribed(); } + + Map> leftMap() { + return this; + } + /** * Notify everyone and cleanup. * @param e the exception */ void errorAll(Throwable e) { List> list; - synchronized (guard) { - list = new ArrayList>(leftMap.values()); - leftMap.clear(); + synchronized (ResultManager.this) { + list = new ArrayList>(leftMap().values()); + leftMap().clear(); rightMap.clear(); } for (Observer o : list) { @@ -132,8 +139,8 @@ void errorAll(Throwable e) { * @param e the exception */ void errorMain(Throwable e) { - synchronized (guard) { - leftMap.clear(); + synchronized (ResultManager.this) { + leftMap().clear(); rightMap.clear(); } subscriber.onError(e); @@ -158,9 +165,9 @@ public void onNext(T1 args) { Subject subj = PublishSubject.create(); Observer subjSerial = new SerializedObserver(subj); - synchronized (guard) { + synchronized (ResultManager.this) { id = leftIds++; - leftMap.put(id, subjSerial); + leftMap().put(id, subjSerial); } Observable window = Observable.create(new WindowObservableFunc(subj, cancel)); @@ -174,7 +181,7 @@ public void onNext(T1 args) { R result = resultSelector.call(args, window); List rightMapValues; - synchronized (guard) { + synchronized (ResultManager.this) { rightMapValues = new ArrayList(rightMap.values()); } @@ -192,11 +199,11 @@ public void onNext(T1 args) { @Override public void onCompleted() { List> list = null; - synchronized (guard) { + synchronized (ResultManager.this) { leftDone = true; if (rightDone) { - list = new ArrayList>(leftMap.values()); - leftMap.clear(); + list = new ArrayList>(leftMap().values()); + leftMap().clear(); rightMap.clear(); } } @@ -216,7 +223,7 @@ final class RightObserver extends Subscriber { public void onNext(T2 args) { try { int id; - synchronized (guard) { + synchronized (ResultManager.this) { id = rightIds++; rightMap.put(id, args); } @@ -228,8 +235,8 @@ public void onNext(T2 args) { duration.unsafeSubscribe(d2); List> list; - synchronized (guard) { - list = new ArrayList>(leftMap.values()); + synchronized (ResultManager.this) { + list = new ArrayList>(leftMap().values()); } for (Observer o : list) { o.onNext(args); @@ -242,11 +249,11 @@ public void onNext(T2 args) { @Override public void onCompleted() { List> list = null; - synchronized (guard) { + synchronized (ResultManager.this) { rightDone = true; if (leftDone) { - list = new ArrayList>(leftMap.values()); - leftMap.clear(); + list = new ArrayList>(leftMap().values()); + leftMap().clear(); rightMap.clear(); } } @@ -273,8 +280,8 @@ public void onCompleted() { if (once) { once = false; Observer gr; - synchronized (guard) { - gr = leftMap.remove(id); + synchronized (ResultManager.this) { + gr = leftMap().remove(id); } if (gr != null) { gr.onCompleted(); @@ -306,7 +313,7 @@ public RightDurationObserver(int id) { public void onCompleted() { if (once) { once = false; - synchronized (guard) { + synchronized (ResultManager.this) { rightMap.remove(id); } group.remove(this);