Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/test/java/io/nats/client/impl/JetStreamPullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ public void testPullRequestOptionsBuilder() {
}

interface ConflictSetup {
JetStreamSubscription setup(JetStreamManagement jsm, JetStream js) throws Exception;
JetStreamSubscription setup(JetStreamManagement jsm, JetStream js, TestHandler handler) throws Exception;
}

private boolean versionIsBefore(Connection nc, String targetVersion) {
Expand All @@ -691,7 +691,7 @@ private void testConflictStatus(String statusText, int type, boolean syncMode, S
createDefaultTestStream(nc);
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();
JetStreamSubscription sub = setup.setup(jsm, js);
JetStreamSubscription sub = setup.setup(jsm, js, handler);
if (type == TYPE_ERROR && syncMode) {
assertThrows(JetStreamStatusException.class, () -> sub.nextMessage(500));
}
Expand Down Expand Up @@ -726,7 +726,7 @@ else if (type == TYPE_WARNING) {
@Test
public void testExceedsMaxWaiting() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().maxPullWaiting(1).buildPullSubscribeOptions();
testConflictStatus(EXCEEDED_MAX_WAITING, TYPE_WARNING, true, null, (jsm, js) -> {
testConflictStatus(EXCEEDED_MAX_WAITING, TYPE_WARNING, true, null, (jsm, js, handler) -> {
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pull(1);
sub.pull(1);
Expand All @@ -737,7 +737,7 @@ public void testExceedsMaxWaiting() throws Exception {
@Test
public void testExceedsMaxRequestBatch() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().maxBatch(1).buildPullSubscribeOptions();
testConflictStatus(EXCEEDED_MAX_REQUEST_BATCH, TYPE_WARNING, true, null, (jsm, js) -> {
testConflictStatus(EXCEEDED_MAX_REQUEST_BATCH, TYPE_WARNING, true, null, (jsm, js, handler) -> {
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pull(2);
return sub;
Expand All @@ -747,7 +747,7 @@ public void testExceedsMaxRequestBatch() throws Exception {
@Test
public void testMessageSizeExceedsMaxBytes() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().buildPullSubscribeOptions();
testConflictStatus(MESSAGE_SIZE_EXCEEDS_MAX_BYTES, TYPE_NONE, true, "2.9.0", (jsm, js) -> {
testConflictStatus(MESSAGE_SIZE_EXCEEDS_MAX_BYTES, TYPE_NONE, true, "2.9.0", (jsm, js, handler) -> {
js.publish(SUBJECT, new byte[1000]);
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pull(PullRequestOptions.builder(1).maxBytes(100).build());
Expand All @@ -758,7 +758,7 @@ public void testMessageSizeExceedsMaxBytes() throws Exception {
@Test
public void testExceedsMaxRequestExpires() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().maxExpires(1000).buildPullSubscribeOptions();
testConflictStatus(EXCEEDED_MAX_REQUEST_EXPIRES, TYPE_WARNING, true, null, (jsm, js) -> {
testConflictStatus(EXCEEDED_MAX_REQUEST_EXPIRES, TYPE_WARNING, true, null, (jsm, js, handler) -> {
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pullExpiresIn(1, 2000);
return sub;
Expand All @@ -768,7 +768,7 @@ public void testExceedsMaxRequestExpires() throws Exception {
@Test
public void testConsumerIsPushBased() throws Exception {
PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, durable(1));
testConflictStatus(CONSUMER_IS_PUSH_BASED, TYPE_ERROR, true, null, (jsm, js) -> {
testConflictStatus(CONSUMER_IS_PUSH_BASED, TYPE_ERROR, true, null, (jsm, js, handler) -> {
jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).build());
JetStreamSubscription sub = js.subscribe(null, so);
jsm.deleteConsumer(STREAM, durable(1));
Expand All @@ -781,20 +781,20 @@ public void testConsumerIsPushBased() throws Exception {
@Test
public void testConsumerDeleted() throws Exception {
PullSubscribeOptions so = PullSubscribeOptions.bind(STREAM, durable(1));
testConflictStatus(CONSUMER_DELETED, TYPE_ERROR, true, "2.9.6", (jsm, js) -> {
testConflictStatus(CONSUMER_DELETED, TYPE_ERROR, true, "2.9.6", (jsm, js, handler) -> {
jsm.addOrUpdateConsumer(STREAM, builder().durable(durable(1)).build());
JetStreamSubscription sub = js.subscribe(null, so);
sub.pullExpiresIn(1, 10000);
jsm.deleteConsumer(STREAM, durable(1));
sleep(200);
sleep(1000);
return sub;
});
}

@Test
public void testBadRequest() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().buildPullSubscribeOptions();
testConflictStatus(BAD_REQUEST, TYPE_ERROR, true, null, (jsm, js) -> {
testConflictStatus(BAD_REQUEST, TYPE_ERROR, true, null, (jsm, js, handler) -> {
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pull(PullRequestOptions.builder(1).noWait().idleHeartbeat(1).build());
return sub;
Expand All @@ -804,7 +804,7 @@ public void testBadRequest() throws Exception {
@Test
public void testNotFound() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().buildPullSubscribeOptions();
testConflictStatus(NO_MESSAGES, TYPE_NONE, true, null, (jsm, js) -> {
testConflictStatus(NO_MESSAGES, TYPE_NONE, true, null, (jsm, js, handler) -> {
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pullNoWait(1);
return sub;
Expand All @@ -814,7 +814,7 @@ public void testNotFound() throws Exception {
@Test
public void testExceedsMaxRequestBytes1stMessage() throws Exception {
PullSubscribeOptions so = ConsumerConfiguration.builder().maxBytes(1).buildPullSubscribeOptions();
testConflictStatus(EXCEEDED_MAX_REQUEST_MAX_BYTES, TYPE_WARNING, true, null, (jsm, js) -> {
testConflictStatus(EXCEEDED_MAX_REQUEST_MAX_BYTES, TYPE_WARNING, true, null, (jsm, js, handler) -> {
JetStreamSubscription sub = js.subscribe(SUBJECT, so);
sub.pull(PullRequestOptions.builder(1).maxBytes(2).build());
return sub;
Expand Down