Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions src/main/java/rx/internal/operators/OnSubscribeJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,36 @@ public void call(Subscriber<? super R> t1) {
}

/** Manage the left and right sources. */
final class ResultSink {
final class ResultSink extends HashMap<Integer,TLeft> {
//HashMap aspect of `this` refers to the `leftMap`

private static final long serialVersionUID = 3491669543549085380L;

final CompositeSubscription group;
final Subscriber<? super R> subscriber;
final Object guard = new Object();
/** Guarded by guard. */
/** Guarded by this. */
boolean leftDone;
/** Guarded by guard. */
/** Guarded by this. */
int leftId;
/** Guarded by guard. */
final Map<Integer, TLeft> leftMap;
/** Guarded by guard. */
/** Guarded by this. */
boolean rightDone;
/** Guarded by guard. */
/** Guarded by this. */
int rightId;
/** Guarded by guard. */
/** Guarded by this. */
final Map<Integer, TRight> rightMap;

public ResultSink(Subscriber<? super R> subscriber) {
super();
this.subscriber = subscriber;
this.group = new CompositeSubscription();
this.leftMap = new HashMap<Integer, TLeft>();
//`leftMap` is `this`
this.rightMap = new HashMap<Integer, TRight>();
}

HashMap<Integer, TLeft> leftMap() {
return this;
}

public void run() {
subscriber.add(group);

Expand All @@ -103,8 +109,8 @@ final class LeftSubscriber extends Subscriber<TLeft> {

protected void expire(int id, Subscription resource) {
boolean complete = false;
synchronized (guard) {
if (leftMap.remove(id) != null && leftMap.isEmpty() && leftDone) {
synchronized (ResultSink.this) {
if (leftMap().remove(id) != null && leftMap().isEmpty() && leftDone) {
complete = true;
}
}
Expand All @@ -121,9 +127,9 @@ public void onNext(TLeft args) {
int id;
int highRightId;

synchronized (guard) {
synchronized (ResultSink.this) {
id = leftId++;
leftMap.put(id, args);
leftMap().put(id, args);
highRightId = rightId;
}

Expand All @@ -137,7 +143,7 @@ public void onNext(TLeft args) {
duration.unsafeSubscribe(d1);

List<TRight> rightValues = new ArrayList<TRight>();
synchronized (guard) {
synchronized (ResultSink.this) {
for (Map.Entry<Integer, TRight> entry : rightMap.entrySet()) {
if (entry.getKey() < highRightId) {
rightValues.add(entry.getValue());
Expand All @@ -162,9 +168,9 @@ public void onError(Throwable e) {
@Override
public void onCompleted() {
boolean complete = false;
synchronized (guard) {
synchronized (ResultSink.this) {
leftDone = true;
if (rightDone || leftMap.isEmpty()) {
if (rightDone || leftMap().isEmpty()) {
complete = true;
}
}
Expand Down Expand Up @@ -211,7 +217,7 @@ final class RightSubscriber extends Subscriber<TRight> {

void expire(int id, Subscription resource) {
boolean complete = false;
synchronized (guard) {
synchronized (ResultSink.this) {
if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) {
complete = true;
}
Expand All @@ -228,7 +234,7 @@ void expire(int id, Subscription resource) {
public void onNext(TRight args) {
int id;
int highLeftId;
synchronized (guard) {
synchronized (ResultSink.this) {
id = rightId++;
rightMap.put(id, args);
highLeftId = leftId;
Expand All @@ -247,8 +253,8 @@ public void onNext(TRight args) {


List<TLeft> leftValues = new ArrayList<TLeft>();
synchronized (guard) {
for (Map.Entry<Integer, TLeft> entry : leftMap.entrySet()) {
synchronized (ResultSink.this) {
for (Map.Entry<Integer, TLeft> entry : leftMap().entrySet()) {
if (entry.getKey() < highLeftId) {
leftValues.add(entry.getValue());
}
Expand All @@ -274,7 +280,7 @@ public void onError(Throwable e) {
@Override
public void onCompleted() {
boolean complete = false;
synchronized (guard) {
synchronized (ResultSink.this) {
rightDone = true;
if (leftDone || rightMap.isEmpty()) {
complete = true;
Expand Down