Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0d2b9d3
Multiple Filter Subjects Review
scottf Sep 26, 2023
fb848a8
trying to get this to build on github
scottf Sep 26, 2023
4b80d51
trying to get this to build on github
scottf Sep 26, 2023
ba8f53d
trying to get this to build on github
scottf Sep 27, 2023
433e619
trying to get this to build on github
scottf Sep 27, 2023
b183513
trying to get this to build on github
scottf Sep 27, 2023
e9ba750
trying to get this to build on github
scottf Sep 27, 2023
89b656f
against server 83cc80a - trying to get this to build on github
scottf Sep 27, 2023
4368f7b
against server 83cc80a - trying to get this to build on github
scottf Sep 27, 2023
333194c
against server 83cc80a - trying to get this to build on github
scottf Sep 27, 2023
d89cce5
against server 83cc80a - trying to get this to build on github (2)
scottf Sep 27, 2023
b81897b
against server 83cc80a - trying to get this to build on github (3)
scottf Sep 27, 2023
3df9164
against server 83cc80a - trying to get this to build on github (4)
scottf Sep 27, 2023
a54ea32
giving up. going against 2.10.1 so it will build
scottf Sep 27, 2023
b51bdfc
giving up. going against 2.10.1 so it will build
scottf Sep 27, 2023
db77d97
giving up. going against 2.10.1 so it will build
scottf Sep 27, 2023
92249a8
going back to main branch for builds
scottf Sep 27, 2023
72db379
clarity for consumerFilterSubjectsAreEquivalent
scottf Sep 28, 2023
8c1e294
clarity for consumerFilterSubjectsAreEquivalent
scottf Sep 28, 2023
325132d
clarity for consumerFilterSubjectsAreEquivalent
scottf Sep 28, 2023
0342118
Merge branch 'main' into multiple-subject-filters-review
scottf Sep 28, 2023
4c344d6
fix consumerFilterSubjectsAreEquivalent usage after merge
scottf Sep 28, 2023
129bdb7
properly validate reject subject "foo.>.bar"
scottf Sep 28, 2023
03f9c10
remove debug
scottf Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 49 additions & 48 deletions README.md

Large diffs are not rendered by default.

30 changes: 21 additions & 9 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,25 @@ protected SubscribeOptions(Builder builder, boolean isPull,

stream = validateStreamName(builder.stream, bind); // required when bind mode

// read the names and do basic validation
String temp = validateConsumerName(
validateMustMatchIfBothSupplied(builder.name, builder.cc == null ? null : builder.cc.getName(), JsSoNameMismatch),
false);
String durable = validateDurable(
validateMustMatchIfBothSupplied(builder.durable, builder.cc == null ? null : builder.cc.getDurable(), JsSoDurableMismatch),
false);
// read the consumer names and do basic validation
// A1. validate name input
String temp = validateMustMatchIfBothSupplied(
builder.name,
builder.cc == null ? null : builder.cc.getName(),
JsSoNameMismatch);
// B1. Must be a valid consumer name if supplied
temp = validateConsumerName(temp, false);

// A2. validate durable input
String durable = validateMustMatchIfBothSupplied(
builder.durable,
builder.cc == null ? null : builder.cc.getDurable(),
JsSoDurableMismatch);

// B2. Must be a valid consumer name if supplied
durable = validateDurable(durable, false);

// C. name must match durable if both supplied
name = validateMustMatchIfBothSupplied(temp, durable, JsConsumerNameDurableMismatch);

if (bind && name == null) {
Expand Down Expand Up @@ -105,7 +117,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
.ackPolicy(AckPolicy.None)
.maxDeliver(1)
.ackWait(Duration.ofHours(22))
.name(temp)
.name(name)
.memStorage(true)
.numReplicas(1);

Expand All @@ -119,7 +131,7 @@ protected SubscribeOptions(Builder builder, boolean isPull,
.durable(durable)
.deliverSubject(deliverSubject)
.deliverGroup(deliverGroup)
.name(temp)
.name(name)
.build();
}
}
Expand Down
107 changes: 50 additions & 57 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final String name;
protected final String deliverSubject;
protected final String deliverGroup;
protected final List<String> filterSubjects;
protected final String sampleFrequency;
protected final ZonedDateTime startTime;
protected final Duration ackWait;
Expand All @@ -86,6 +85,7 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final Boolean memStorage;
protected final List<Duration> backoff;
protected final Map<String, String> metadata;
protected final List<String> filterSubjects;

protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.deliverPolicy = cc.deliverPolicy;
Expand All @@ -96,7 +96,6 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
this.filterSubjects = cc.filterSubjects;
this.sampleFrequency = cc.sampleFrequency;
this.startTime = cc.startTime;
this.ackWait = cc.ackWait;
Expand All @@ -116,6 +115,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.memStorage = cc.memStorage;
this.backoff = cc.backoff == null ? null : new ArrayList<>(cc.backoff);
this.metadata = cc.metadata == null ? null : new HashMap<>(cc.metadata);
this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not released. I released that I need this to be null and not an empty list because it's the only way to determine if the user sets data in the list, which they could set empty, and this is used for comparison against a server version of the config during subscribe to ensure there are no mismatches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get it. You're setting this.filterSubjects to null only if cc.filterSubjects is null, so for empty list it would still be an empty list no? The only change here is that you make a copy of cc.filterSubjects.

Copy link
Contributor Author

@scottf scottf Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.filterSubjects is final and must be set with something, even if it's null

The reason I copy it is because I always copy list input to constructors because people have reported bugs because they re-used a list or structure. Probably not necessary, just paranoid.

}

ConsumerConfiguration(JsonValue v) {
Expand All @@ -128,15 +128,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
name = readString(v, NAME);
deliverSubject = readString(v, DELIVER_SUBJECT);
deliverGroup = readString(v, DELIVER_GROUP);
String tempFs = readString(v, FILTER_SUBJECT);
if (tempFs != null) {
filterSubjects = Collections.singletonList(tempFs);
}
else {
filterSubjects = readStringList(v, FILTER_SUBJECTS);
}
sampleFrequency = readString(v, SAMPLE_FREQ);

startTime = readDate(v, OPT_START_TIME);
ackWait = readNanos(v, ACK_WAIT);
idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
Expand All @@ -158,6 +150,14 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {

backoff = readNanosList(v, BACKOFF, true);
metadata = readStringStringMap(v, METADATA);

String tempFs = emptyAsNull(readString(v, FILTER_SUBJECT));
if (tempFs == null) {
filterSubjects = readOptionalStringList(v, FILTER_SUBJECTS);
}
else {
filterSubjects = Collections.singletonList(tempFs);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this down a bit in the code since it is more than a one liner

}

// For the builder
Expand All @@ -172,7 +172,6 @@ protected ConsumerConfiguration(Builder b)
this.name = b.name;
this.startTime = b.startTime;
this.ackWait = b.ackWait;
this.filterSubjects = b.filterSubjects;
this.sampleFrequency = b.sampleFrequency;
this.deliverSubject = b.deliverSubject;
this.deliverGroup = b.deliverGroup;
Expand All @@ -195,6 +194,7 @@ protected ConsumerConfiguration(Builder b)

this.backoff = b.backoff;
this.metadata = b.metadata;
this.filterSubjects = b.filterSubjects;
}

/**
Expand All @@ -216,12 +216,6 @@ public String toJson() {
JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait);
JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
if (filterSubjects.size() > 1) {
JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
}
else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
}
JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString());
JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency);
JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit);
Expand All @@ -237,6 +231,14 @@ else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, NUM_REPLICAS, numReplicas);
JsonUtils.addField(sb, MEM_STORAGE, memStorage);
JsonUtils.addField(sb, METADATA, metadata);
if (filterSubjects != null) {
if (filterSubjects.size() > 1) {
JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
}
else if (filterSubjects.size() == 1) {
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubjects.get(0));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved down, no change

return endJson(sb).toString();
}

Expand Down Expand Up @@ -329,21 +331,31 @@ public long getMaxDeliver() {
}

/**
* Gets the first filter subject of this consumer configuration. Will be null if there are none.
* Gets the filter subject of this consumer configuration.
* With the introduction of multiple filter subjects, this method will
* return null if there are not exactly one filter subjects
* @return the first filter subject.
*/
public String getFilterSubject() {
return filterSubjects.isEmpty() ? null : filterSubjects.get(0);
return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
}

/**
* Gets the filter subjects as a list. May be empty, but won't be null
* Gets the filter subjects as a list. May be null, otherwise won't be empty
* @return the list
*/
public List<String> getFilterSubjects() {
return filterSubjects;
}

/**
* Whether there are multiple filter subjects for this consumer configuration.
* @return true if there are multiple filter subjects
*/
public boolean hasMultipleFilterSubjects() {
return filterSubjects != null && filterSubjects.size() > 1;
}

/**
* Gets the replay policy of this consumer configuration.
* @return the replay policy.
Expand Down Expand Up @@ -635,7 +647,6 @@ public static class Builder {
private String name;
private String deliverSubject;
private String deliverGroup;
private List<String> filterSubjects = new ArrayList<>();
private String sampleFrequency;

private ZonedDateTime startTime;
Expand All @@ -659,6 +670,7 @@ public static class Builder {

private List<Duration> backoff;
private Map<String, String> metadata;
private List<String> filterSubjects;

public Builder() {}

Expand All @@ -673,7 +685,6 @@ public Builder(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
this.filterSubjects = new ArrayList<>(cc.filterSubjects);
this.sampleFrequency = cc.sampleFrequency;

this.startTime = cc.startTime;
Expand Down Expand Up @@ -701,6 +712,9 @@ public Builder(ConsumerConfiguration cc) {
if (cc.metadata != null) {
this.metadata = new HashMap<>(cc.metadata);
}
if (cc.filterSubjects != null) {
this.filterSubjects = new ArrayList<>(cc.filterSubjects);
}
Copy link
Contributor Author

@scottf scottf Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always make a copy of lists because user's sometimes re-use and modify

}
}

Expand Down Expand Up @@ -848,21 +862,25 @@ public Builder maxDeliver(long maxDeliver) {

/**
* Sets the filter subject of the ConsumerConfiguration.
* Replaces any other filter subjects set in the builder
* @param filterSubject the filter subject
* @return Builder
*/
public Builder filterSubject(String filterSubject) {
this.filterSubjects.clear();
if (!nullOrEmpty(filterSubject)) {
this.filterSubjects.add(filterSubject);
if (nullOrEmpty(filterSubject)) {
this.filterSubjects = null;
}
else {
this.filterSubjects = Collections.singletonList(filterSubject);
}
return this;
}


/**
* Sets the filter subjects of the ConsumerConfiguration.
* @param filterSubjects the array of filter subjects
* Replaces any other filter subjects set in the builder
* @param filterSubjects one or more filter subjects
* @return Builder
*/
public Builder filterSubjects(String... filterSubjects) {
Expand All @@ -871,18 +889,22 @@ public Builder filterSubjects(String... filterSubjects) {

/**
* Sets the filter subjects of the ConsumerConfiguration.
* Replaces any other filter subjects set in the builder
* @param filterSubjects the list of filter subjects
* @return Builder
*/
public Builder filterSubjects(List<String> filterSubjects) {
this.filterSubjects.clear();
this.filterSubjects = new ArrayList<>();
if (filterSubjects != null) {
for (String fs : filterSubjects) {
if (!nullOrEmpty(fs)) {
this.filterSubjects.add(fs);
}
}
}
if (this.filterSubjects.isEmpty()) {
this.filterSubjects = null;
}
return this;
}

Expand Down Expand Up @@ -1255,36 +1277,7 @@ public PullSubscribeOptions buildPullSubscribeOptions(String stream) {

@Override
public String toString() {
return "ConsumerConfiguration{" +
"description='" + description + '\'' +
", durable='" + durable + '\'' +
", name='" + name + '\'' +
", deliverPolicy=" + deliverPolicy +
", deliverSubject='" + deliverSubject + '\'' +
", deliverGroup='" + deliverGroup + '\'' +
", startSeq=" + startSeq +
", startTime=" + startTime +
", ackPolicy=" + ackPolicy +
", ackWait=" + ackWait +
", maxDeliver=" + maxDeliver +
", filterSubjects='" + String.join(",", filterSubjects) + '\'' +
", replayPolicy=" + replayPolicy +
", sampleFrequency='" + sampleFrequency + '\'' +
", rateLimit=" + rateLimit +
", maxAckPending=" + maxAckPending +
", idleHeartbeat=" + idleHeartbeat +
", flowControl=" + flowControl +
", maxPullWaiting=" + maxPullWaiting +
", maxBatch=" + maxBatch +
", maxBytes=" + maxBytes +
", maxExpires=" + maxExpires +
", numReplicas=" + numReplicas +
", headersOnly=" + headersOnly +
", memStorage=" + memStorage +
", inactiveThreshold=" + inactiveThreshold +
", backoff=" + backoff +
", metadata=" + metadata +
'}';
return "ConsumerConfiguration " + toJson();
}

protected static int getOrUnset(Integer val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) {
}

public String getFilterSubject() {
return filterSubjects.get(0);
return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
}

public List<String> getFilterSubjects() {
return filterSubjects;
}

public boolean hasMultipleFilterSubjects() {
return filterSubjects != null && filterSubjects.size() > 1;
}

public DeliverPolicy getDeliverPolicy() {
return deliverPolicy;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/api/SourceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readValue;
import static io.nats.client.support.Validator.listsAreEquivalent;
import static io.nats.client.support.Validator.consumerFilterSubjectsAreEquivalent;

public abstract class SourceBase implements JsonSerializable {
private final String name;
Expand Down Expand Up @@ -194,7 +194,7 @@ public boolean equals(Object o) {
if (!Objects.equals(filterSubject, that.filterSubject))
return false;
if (!Objects.equals(external, that.external)) return false;
return listsAreEquivalent(subjectTransforms, that.subjectTransforms);
return consumerFilterSubjectsAreEquivalent(subjectTransforms, that.subjectTransforms);
}

@Override
Expand Down
Loading