Skip to content

Java SKD - Duplicate messages on rewind after 1.2.34 #1050

@leni-kirilov

Description

@leni-kirilov

With the help of Mike from Ably Support , it turns out there is a regression since 1.2.34 (I caught it with 1.2.43)

An integration test of mine sends 3 messages and reads them with rewind.
It started receiving them 6 times...

My code is roughly the following:

assertRewindOrHistory("?rewind=10", 3, 3, testInfo);
private void assertRewindOrHistory(String rewindString, int existingMessagesCount, int expectedMessageCount, TestInfo testInfo) throws Exception {

    //GIVEN a channel with some messages exist
    String testSpecificChannel = HARDCODED_PUBLIC_CHANNEL_NAME + testInfo.getDisplayName() + System.currentTimeMillis();
    List<String> testMessages = List.of("message 1", "message 2", "message 3");
    ablyService.publishMessage(ablyProperties.rootApiKey(), testInfo.getDisplayName(), testSpecificChannel, testMessages);
    TimeUnit.SECONDS.sleep(1);

    //AND a customer is waiting to consume data via Websocket using a temp auth token
    var customerToken = ablyAuthService.getAuthTokenForPublicConsumption(testInfo.getDisplayName(), testSpecificChannel);
    var customerAbly = ablyService.connect(customerToken);

    //WHEN registering a consumer and rewind x messages in the past
    Consumer<Message> messageConsumer = (message) -> {
        Assertions.assertNotNull(message.data, "This is the message: " + message);
        actualMessageCount.incrementAndGet();
    };

    ablyService.consumeMessage(customerAbly, rewindString + testSpecificChannel, messageConsumer);

    //THEN expect all past messages to be read
    assertAsync(expectedMessageCount, actualMessageCount, "Reading different messages count from " + testSpecificChannel);
}
// AblyService.java
public void consumeMessage(AblyRealtime ably, String channelName, Consumer<Message> assertion) {
    logger.info("Consuming messages from channel " + channelName);

    connectAnd(ably, (AblyRealtime ably2) -> {
        Channel channel = ably2.channels.get(channelName);

        try {
            logger.info("Subscribed to the channel" + channel.name);
            channel.subscribe(assertion::accept);
        } catch (AblyException e) { throw new RuntimeException(e); }
    });
}
private void connectAnd(AblyRealtime ably, Consumer<AblyRealtime> consumer) {
    ably.connection.on(ConnectionState.connected, state -> {
        switch (state.current) {
            case connected: {
                consumer.accept(ably);
                break;
            }
            default: {throw new RuntimeException(state.toString()); }
        }
    });
}

Later I applied this : https://faqs.ably.com/why-am-i-seeing-every-message-multiple-times but it didn't help.

Results logs:

2024-12-03 10:32:52,216 [WebSocketConnectReadThread-63] INFO c.s.s.s.a.AblyService Message sent at + 2024-12-03T08:32:52.216150600
2024-12-03 10:32:52,217 [WebSocketConnectReadThread-63] INFO c.s.s.s.a.AblyService Message sent at + 2024-12-03T08:32:52.217138
2024-12-03 10:32:52,217 [WebSocketConnectReadThread-63] INFO c.s.s.s.a.AblyService Message sent at + 2024-12-03T08:32:52.217138

these are just message 0, message 1 and message 2

2024-12-03 10:32:52,499 [Test worker] INFO c.s.a.s.AblyAuthService To request auth token with: mAbfO consumeOldChannelDataWithRewindNumber(TestInfo)
2024-12-03 10:32:52,686 [Test worker] INFO c.s.s.s.a.AblyService Consuming messages from channel [?rewind=10]output-fixture-1consumeOldChannelDataWithRewindNumber(TestInfo)1733214771457
2024-12-03 10:32:52,687 [Test worker] INFO c.s.s.s.a.AblyService Subscribed to the channel[?rewind=10]output-fixture-1consumeOldChannelDataWithRewindNumber(TestInfo)1733214771457

creating a connection

2024-12-03 10:32:53,203 [WebSocketConnectReadThread-71] INFO c.s.i.POC2_CustomerConsumesExposedDataFromAblyIntegrationTests Consuming a message...{Message clientId=consumeOldChannelDataWithRewindNumber(TestInfo)_test connectionId=nGShaWn24v data=message 0 id=nGShaWn24v:0:0 name=integration_test_key}
2024-12-03 10:32:53,207 [WebSocketConnectReadThread-71] INFO c.s.i.POC2_CustomerConsumesExposedDataFromAblyIntegrationTests Consuming a message...{Message clientId=consumeOldChannelDataWithRewindNumber(TestInfo)_test connectionId=nGShaWn24v data=message 1 id=nGShaWn24v:1:0 name=integration_test_key}
2024-12-03 10:32:53,208 [WebSocketConnectReadThread-71] INFO c.s.i.POC2_CustomerConsumesExposedDataFromAblyIntegrationTests Consuming a message...{Message clientId=consumeOldChannelDataWithRewindNumber(TestInfo)_test connectionId=nGShaWn24v data=message 2 id=nGShaWn24v:2:0 name=integration_test_key}
2024-12-03 10:32:53,208 [WebSocketConnectReadThread-71] INFO c.s.i.POC2_CustomerConsumesExposedDataFromAblyIntegrationTests Consuming a message...{Message clientId=consumeOldChannelDataWithRewindNumber(TestInfo)_test connectionId=nGShaWn24v data=message 0 id=nGShaWn24v:0:0 name=integration_test_key}
2024-12-03 10:32:53,209 [WebSocketConnectReadThread-71] INFO c.s.i.POC2_CustomerConsumesExposedDataFromAblyIntegrationTests Consuming a message...{Message clientId=consumeOldChannelDataWithRewindNumber(TestInfo)_test connectionId=nGShaWn24v data=message 1 id=nGShaWn24v:1:0 name=integration_test_key}
2024-12-03 10:32:53,209 [WebSocketConnectReadThread-71] INFO c.s.i.POC2_CustomerConsumesExposedDataFromAblyIntegrationTests Consuming a message...{Message clientId=consumeOldChannelDataWithRewindNumber(TestInfo)_test connectionId=nGShaWn24v data=message 2 id=nGShaWn24v:2:0 name=integration_test_key}

Might be related to : #1018

┆Issue is synchronized with this Jira Task by Unito

Metadata

Metadata

Assignees

Labels

bugSomething isn't working. It's clear that this does need to be fixed.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions