Skip to content

Commit eaac51a

Browse files
committed
fix: fix flaky test testNonBlockingWithMultipleMessages
1 parent 20361c7 commit eaac51a

1 file changed

Lines changed: 95 additions & 90 deletions

File tree

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 95 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,143 +1274,148 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio
12741274
@Test
12751275
@Timeout(value = 1, unit = TimeUnit.MINUTES)
12761276
public void testNonBlockingWithMultipleMessages() throws Exception {
1277-
// 1. Send first non-blocking message to create task in WORKING state
1278-
Message message1 = Message.builder(MESSAGE)
1279-
.taskId("multi-event-test")
1277+
String multiEventTaskId = "multi-event-test";
1278+
try {
1279+
// 1. Send first non-blocking message to create task in WORKING state
1280+
Message message1 = Message.builder(MESSAGE)
1281+
.taskId(multiEventTaskId)
12801282
.contextId("test-context")
12811283
.parts(new TextPart("First request"))
12821284
.build();
12831285

1284-
AtomicReference<String> taskIdRef = new AtomicReference<>();
1285-
CountDownLatch firstTaskLatch = new CountDownLatch(1);
1286+
AtomicReference<String> taskIdRef = new AtomicReference<>();
1287+
CountDownLatch firstTaskLatch = new CountDownLatch(1);
12861288

1287-
BiConsumer<ClientEvent, AgentCard> firstMessageConsumer = (event, agentCard) -> {
1288-
if (event instanceof TaskEvent te) {
1289-
taskIdRef.set(te.getTask().id());
1290-
firstTaskLatch.countDown();
1291-
} else if (event instanceof TaskUpdateEvent tue && tue.getUpdateEvent() instanceof TaskStatusUpdateEvent status) {
1292-
taskIdRef.set(status.taskId());
1293-
firstTaskLatch.countDown();
1294-
}
1295-
};
1289+
BiConsumer<ClientEvent, AgentCard> firstMessageConsumer = (event, agentCard) -> {
1290+
if (event instanceof TaskEvent te) {
1291+
taskIdRef.set(te.getTask().id());
1292+
firstTaskLatch.countDown();
1293+
} else if (event instanceof TaskUpdateEvent tue && tue.getUpdateEvent() instanceof TaskStatusUpdateEvent status) {
1294+
taskIdRef.set(status.taskId());
1295+
firstTaskLatch.countDown();
1296+
}
1297+
};
12961298

1297-
// Non-blocking message creates task in WORKING state and returns immediately
1298-
// Queue stays open because task is not in final state
1299-
getPollingClient().sendMessage(message1, List.of(firstMessageConsumer), null);
1300-
1301-
assertTrue(firstTaskLatch.await(10, TimeUnit.SECONDS));
1302-
String taskId = taskIdRef.get();
1303-
assertNotNull(taskId);
1304-
assertEquals("multi-event-test", taskId);
1305-
1306-
// 2. Resubscribe to task (queue should still be open)
1307-
CountDownLatch resubEventLatch = new CountDownLatch(2); // artifact-2 + completion
1308-
List<io.a2a.spec.UpdateEvent> resubReceivedEvents = new CopyOnWriteArrayList<>();
1309-
AtomicBoolean resubUnexpectedEvent = new AtomicBoolean(false);
1310-
AtomicReference<Throwable> resubErrorRef = new AtomicReference<>();
1311-
1312-
BiConsumer<ClientEvent, AgentCard> resubConsumer = (event, agentCard) -> {
1313-
if (event instanceof TaskUpdateEvent tue) {
1314-
resubReceivedEvents.add(tue.getUpdateEvent());
1315-
resubEventLatch.countDown();
1316-
} else {
1317-
resubUnexpectedEvent.set(true);
1318-
}
1319-
};
1299+
// Non-blocking message creates task in WORKING state and returns immediately
1300+
// Queue stays open because task is not in final state
1301+
getPollingClient().sendMessage(message1, List.of(firstMessageConsumer), null);
13201302

1321-
Consumer<Throwable> resubErrorHandler = error -> {
1322-
if (!isStreamClosedError(error)) {
1323-
resubErrorRef.set(error);
1324-
}
1325-
};
1303+
assertTrue(firstTaskLatch.await(10, TimeUnit.SECONDS));
1304+
String taskId = taskIdRef.get();
1305+
assertNotNull(taskId);
1306+
assertEquals(multiEventTaskId, taskId);
1307+
1308+
// 2. Resubscribe to task (queue should still be open)
1309+
CountDownLatch resubEventLatch = new CountDownLatch(2); // artifact-2 + completion
1310+
List<io.a2a.spec.UpdateEvent> resubReceivedEvents = new CopyOnWriteArrayList<>();
1311+
AtomicBoolean resubUnexpectedEvent = new AtomicBoolean(false);
1312+
AtomicReference<Throwable> resubErrorRef = new AtomicReference<>();
1313+
1314+
BiConsumer<ClientEvent, AgentCard> resubConsumer = (event, agentCard) -> {
1315+
if (event instanceof TaskUpdateEvent tue) {
1316+
resubReceivedEvents.add(tue.getUpdateEvent());
1317+
resubEventLatch.countDown();
1318+
} else {
1319+
resubUnexpectedEvent.set(true);
1320+
}
1321+
};
13261322

1327-
// Wait for subscription to be active
1328-
CountDownLatch subscriptionLatch = new CountDownLatch(1);
1329-
awaitStreamingSubscription()
1323+
Consumer<Throwable> resubErrorHandler = error -> {
1324+
if (!isStreamClosedError(error)) {
1325+
resubErrorRef.set(error);
1326+
}
1327+
};
1328+
1329+
// Wait for subscription to be active
1330+
CountDownLatch subscriptionLatch = new CountDownLatch(1);
1331+
awaitStreamingSubscription()
13301332
.whenComplete((unused, throwable) -> subscriptionLatch.countDown());
13311333

1332-
getClient().resubscribe(new TaskIdParams(taskId),
1334+
getClient().resubscribe(new TaskIdParams(taskId),
13331335
List.of(resubConsumer),
13341336
resubErrorHandler);
13351337

1336-
assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS));
1338+
assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS));
13371339

1338-
// 3. Send second streaming message to same taskId
1339-
Message message2 = Message.builder(MESSAGE)
1340-
.taskId("multi-event-test") // Same taskId
1340+
// 3. Send second streaming message to same taskId
1341+
Message message2 = Message.builder(MESSAGE)
1342+
.taskId(multiEventTaskId) // Same taskId
13411343
.contextId("test-context")
13421344
.parts(new TextPart("Second request"))
13431345
.build();
13441346

1345-
CountDownLatch streamEventLatch = new CountDownLatch(2); // artifact-2 + completion
1346-
List<io.a2a.spec.UpdateEvent> streamReceivedEvents = new CopyOnWriteArrayList<>();
1347-
AtomicBoolean streamUnexpectedEvent = new AtomicBoolean(false);
1347+
CountDownLatch streamEventLatch = new CountDownLatch(2); // artifact-2 + completion
1348+
List<io.a2a.spec.UpdateEvent> streamReceivedEvents = new CopyOnWriteArrayList<>();
1349+
AtomicBoolean streamUnexpectedEvent = new AtomicBoolean(false);
13481350

1349-
BiConsumer<ClientEvent, AgentCard> streamConsumer = (event, agentCard) -> {
1350-
if (event instanceof TaskUpdateEvent tue) {
1351-
streamReceivedEvents.add(tue.getUpdateEvent());
1352-
streamEventLatch.countDown();
1353-
} else {
1354-
streamUnexpectedEvent.set(true);
1355-
}
1356-
};
1351+
BiConsumer<ClientEvent, AgentCard> streamConsumer = (event, agentCard) -> {
1352+
if (event instanceof TaskUpdateEvent tue) {
1353+
streamReceivedEvents.add(tue.getUpdateEvent());
1354+
streamEventLatch.countDown();
1355+
} else {
1356+
streamUnexpectedEvent.set(true);
1357+
}
1358+
};
13571359

1358-
// Streaming message adds artifact-2 and completes task
1359-
getClient().sendMessage(message2, List.of(streamConsumer), null);
1360+
// Streaming message adds artifact-2 and completes task
1361+
getClient().sendMessage(message2, List.of(streamConsumer), null);
13601362

1361-
// 4. Verify both consumers received artifact-2 and completion
1362-
assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS));
1363-
assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS));
1363+
// 4. Verify both consumers received artifact-2 and completion
1364+
assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS));
1365+
assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS));
13641366

1365-
assertFalse(resubUnexpectedEvent.get());
1366-
assertFalse(streamUnexpectedEvent.get());
1367-
assertNull(resubErrorRef.get());
1367+
assertFalse(resubUnexpectedEvent.get());
1368+
assertFalse(streamUnexpectedEvent.get());
1369+
assertNull(resubErrorRef.get());
13681370

1369-
// Both should have received 2 events: artifact-2 and completion
1370-
assertEquals(2, resubReceivedEvents.size());
1371-
assertEquals(2, streamReceivedEvents.size());
1371+
// Both should have received 2 events: artifact-2 and completion
1372+
assertEquals(2, resubReceivedEvents.size());
1373+
assertEquals(2, streamReceivedEvents.size());
13721374

1373-
// Verify resubscription events
1374-
long resubArtifactCount = resubReceivedEvents.stream()
1375+
// Verify resubscription events
1376+
long resubArtifactCount = resubReceivedEvents.stream()
13751377
.filter(e -> e instanceof TaskArtifactUpdateEvent)
13761378
.count();
1377-
assertEquals(1, resubArtifactCount);
1379+
assertEquals(1, resubArtifactCount);
13781380

1379-
long resubCompletionCount = resubReceivedEvents.stream()
1381+
long resubCompletionCount = resubReceivedEvents.stream()
13801382
.filter(e -> e instanceof TaskStatusUpdateEvent)
13811383
.filter(e -> ((TaskStatusUpdateEvent) e).isFinal())
13821384
.count();
1383-
assertEquals(1, resubCompletionCount);
1385+
assertEquals(1, resubCompletionCount);
13841386

1385-
// Verify streaming events
1386-
long streamArtifactCount = streamReceivedEvents.stream()
1387+
// Verify streaming events
1388+
long streamArtifactCount = streamReceivedEvents.stream()
13871389
.filter(e -> e instanceof TaskArtifactUpdateEvent)
13881390
.count();
1389-
assertEquals(1, streamArtifactCount);
1391+
assertEquals(1, streamArtifactCount);
13901392

1391-
long streamCompletionCount = streamReceivedEvents.stream()
1393+
long streamCompletionCount = streamReceivedEvents.stream()
13921394
.filter(e -> e instanceof TaskStatusUpdateEvent)
13931395
.filter(e -> ((TaskStatusUpdateEvent) e).isFinal())
13941396
.count();
1395-
assertEquals(1, streamCompletionCount);
1397+
assertEquals(1, streamCompletionCount);
13961398

1397-
// Verify artifact-2 details from resubscription
1398-
TaskArtifactUpdateEvent resubArtifact = (TaskArtifactUpdateEvent) resubReceivedEvents.stream()
1399+
// Verify artifact-2 details from resubscription
1400+
TaskArtifactUpdateEvent resubArtifact = (TaskArtifactUpdateEvent) resubReceivedEvents.stream()
13991401
.filter(e -> e instanceof TaskArtifactUpdateEvent)
14001402
.findFirst()
14011403
.orElseThrow();
1402-
assertEquals("artifact-2", resubArtifact.artifact().artifactId());
1403-
assertEquals("Second message artifact",
1404+
assertEquals("artifact-2", resubArtifact.artifact().artifactId());
1405+
assertEquals("Second message artifact",
14041406
((TextPart) resubArtifact.artifact().parts().get(0)).text());
14051407

1406-
// Verify artifact-2 details from streaming
1407-
TaskArtifactUpdateEvent streamArtifact = (TaskArtifactUpdateEvent) streamReceivedEvents.stream()
1408+
// Verify artifact-2 details from streaming
1409+
TaskArtifactUpdateEvent streamArtifact = (TaskArtifactUpdateEvent) streamReceivedEvents.stream()
14081410
.filter(e -> e instanceof TaskArtifactUpdateEvent)
14091411
.findFirst()
14101412
.orElseThrow();
1411-
assertEquals("artifact-2", streamArtifact.artifact().artifactId());
1412-
assertEquals("Second message artifact",
1413+
assertEquals("artifact-2", streamArtifact.artifact().artifactId());
1414+
assertEquals("Second message artifact",
14131415
((TextPart) streamArtifact.artifact().parts().get(0)).text());
1416+
} finally {
1417+
deleteTaskInTaskStore(multiEventTaskId);
1418+
}
14141419
}
14151420

14161421
@Test

0 commit comments

Comments
 (0)