diff --git a/src/main/java/rx/internal/operators/OnSubscribeJoin.java b/src/main/java/rx/internal/operators/OnSubscribeJoin.java index f93437c5d0..d5d7f08f3b 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeJoin.java +++ b/src/main/java/rx/internal/operators/OnSubscribeJoin.java @@ -61,30 +61,36 @@ public void call(Subscriber t1) { } /** Manage the left and right sources. */ - final class ResultSink { + final class ResultSink extends HashMap { + //HashMap aspect of `this` refers to the `leftMap` + + private static final long serialVersionUID = 3491669543549085380L; + final CompositeSubscription group; final Subscriber subscriber; - final Object guard = new Object(); - /** Guarded by guard. */ + /** Guarded by this. */ boolean leftDone; - /** Guarded by guard. */ + /** Guarded by this. */ int leftId; - /** Guarded by guard. */ - final Map leftMap; - /** Guarded by guard. */ + /** Guarded by this. */ boolean rightDone; - /** Guarded by guard. */ + /** Guarded by this. */ int rightId; - /** Guarded by guard. */ + /** Guarded by this. */ final Map rightMap; public ResultSink(Subscriber subscriber) { + super(); this.subscriber = subscriber; this.group = new CompositeSubscription(); - this.leftMap = new HashMap(); + //`leftMap` is `this` this.rightMap = new HashMap(); } + HashMap leftMap() { + return this; + } + public void run() { subscriber.add(group); @@ -103,8 +109,8 @@ final class LeftSubscriber extends Subscriber { protected void expire(int id, Subscription resource) { boolean complete = false; - synchronized (guard) { - if (leftMap.remove(id) != null && leftMap.isEmpty() && leftDone) { + synchronized (ResultSink.this) { + if (leftMap().remove(id) != null && leftMap().isEmpty() && leftDone) { complete = true; } } @@ -121,9 +127,9 @@ public void onNext(TLeft args) { int id; int highRightId; - synchronized (guard) { + synchronized (ResultSink.this) { id = leftId++; - leftMap.put(id, args); + leftMap().put(id, args); highRightId = rightId; } @@ -137,7 +143,7 @@ public void onNext(TLeft args) { duration.unsafeSubscribe(d1); List rightValues = new ArrayList(); - synchronized (guard) { + synchronized (ResultSink.this) { for (Map.Entry entry : rightMap.entrySet()) { if (entry.getKey() < highRightId) { rightValues.add(entry.getValue()); @@ -162,9 +168,9 @@ public void onError(Throwable e) { @Override public void onCompleted() { boolean complete = false; - synchronized (guard) { + synchronized (ResultSink.this) { leftDone = true; - if (rightDone || leftMap.isEmpty()) { + if (rightDone || leftMap().isEmpty()) { complete = true; } } @@ -211,7 +217,7 @@ final class RightSubscriber extends Subscriber { void expire(int id, Subscription resource) { boolean complete = false; - synchronized (guard) { + synchronized (ResultSink.this) { if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) { complete = true; } @@ -228,7 +234,7 @@ void expire(int id, Subscription resource) { public void onNext(TRight args) { int id; int highLeftId; - synchronized (guard) { + synchronized (ResultSink.this) { id = rightId++; rightMap.put(id, args); highLeftId = leftId; @@ -247,8 +253,8 @@ public void onNext(TRight args) { List leftValues = new ArrayList(); - synchronized (guard) { - for (Map.Entry entry : leftMap.entrySet()) { + synchronized (ResultSink.this) { + for (Map.Entry entry : leftMap().entrySet()) { if (entry.getKey() < highLeftId) { leftValues.add(entry.getValue()); } @@ -274,7 +280,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { boolean complete = false; - synchronized (guard) { + synchronized (ResultSink.this) { rightDone = true; if (leftDone || rightMap.isEmpty()) { complete = true;