From 091560b1183328133bcaac8ed91f679896abb31d Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 24 Jul 2025 18:51:17 +0530 Subject: [PATCH 1/5] [ECO-5447] Updated ObjectMessage with server provided serialTimestamp 1. Updated MsgpackSerialization accordingly. 2. Updated LiveMapManagerTests, added few more tests related to changes --- .../io/ably/lib/objects/ObjectMessage.kt | 12 + .../serialization/MsgpackSerialization.kt | 19 +- .../objects/type/livemap/DefaultLiveMap.kt | 2 +- .../objects/type/livemap/LiveMapManager.kt | 25 +- .../unit/type/livemap/LiveMapManagerTest.kt | 219 ++++++++++++++++-- 5 files changed, 250 insertions(+), 27 deletions(-) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt index 04677eb38..d3321a85f 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt @@ -125,6 +125,12 @@ internal data class ObjectMapEntry( */ val timeserial: String? = null, + /** + * A timestamp from the [timeserial] field. Only present if [tombstone] is `true` + * Spec: OME2d + */ + val serialTimestamp: Long? = null, + /** * The data that represents the value of the map entry. * Spec: OME2c @@ -336,6 +342,12 @@ internal data class ObjectMessage( */ val serial: String? = null, + /** + * A timestamp from the [serial] field. + * Spec: OM2j + */ + val serialTimestamp: Long? = null, + /** * An opaque string used as a key to update the map of serial values on an object. * Spec: OM2i diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt index a659fbbdd..0b221cf40 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt @@ -38,6 +38,7 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) { if (operation != null) fieldCount++ if (objectState != null) fieldCount++ if (serial != null) fieldCount++ + if (serialTimestamp != null) fieldCount++ if (siteCode != null) fieldCount++ packer.packMapHeader(fieldCount) @@ -82,6 +83,11 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) { packer.packString(serial) } + if (serialTimestamp != null) { + packer.packString("serialTimestamp") + packer.packLong(serialTimestamp) + } + if (siteCode != null) { packer.packString("siteCode") packer.packString(siteCode) @@ -107,6 +113,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage { var operation: ObjectOperation? = null var objectState: ObjectState? = null var serial: String? = null + var serialTimestamp: Long? = null var siteCode: String? = null for (i in 0 until fieldCount) { @@ -127,6 +134,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage { "operation" -> operation = readObjectOperation(unpacker) "object" -> objectState = readObjectState(unpacker) "serial" -> serial = unpacker.unpackString() + "serialTimestamp" -> serialTimestamp = unpacker.unpackLong() "siteCode" -> siteCode = unpacker.unpackString() else -> unpacker.skipValue() } @@ -141,6 +149,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage { operation = operation, objectState = objectState, serial = serial, + serialTimestamp = serialTimestamp, siteCode = siteCode ) } @@ -558,6 +567,7 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) { if (tombstone != null) fieldCount++ if (timeserial != null) fieldCount++ + if (serialTimestamp != null) fieldCount++ if (data != null) fieldCount++ packer.packMapHeader(fieldCount) @@ -572,6 +582,11 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) { packer.packString(timeserial) } + if (serialTimestamp != null) { + packer.packString("serialTimestamp") + packer.packLong(serialTimestamp) + } + if (data != null) { packer.packString("data") data.writeMsgpack(packer) @@ -586,6 +601,7 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry { var tombstone: Boolean? = null var timeserial: String? = null + var serialTimestamp: Long? = null var data: ObjectData? = null for (i in 0 until fieldCount) { @@ -600,12 +616,13 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry { when (fieldName) { "tombstone" -> tombstone = unpacker.unpackBoolean() "timeserial" -> timeserial = unpacker.unpackString() + "serialTimestamp" -> serialTimestamp = unpacker.unpackLong() "data" -> data = readObjectData(unpacker) else -> unpacker.skipValue() } } - return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, data = data) + return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, serialTimestamp = serialTimestamp, data = data) } /** diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 1ad361df3..c8cbb8c97 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -124,7 +124,7 @@ internal class DefaultLiveMap private constructor( } override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) { - liveMapManager.applyOperation(operation, message.serial) + liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp) } override fun clearData(): LiveMapUpdate { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index a081455cc..3a7bf4c95 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -33,7 +33,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang objectState.map?.entries?.forEach { (key, entry) -> liveMap.data[key] = LiveMapEntry( isTombstoned = entry.tombstone ?: false, - tombstonedAt = if (entry.tombstone == true) System.currentTimeMillis() else null, + tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp ?: System.currentTimeMillis() else null, timeserial = entry.timeserial, data = entry.data ) @@ -51,19 +51,19 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang /** * @spec RTLM15 - Applies operations to LiveMap */ - internal fun applyOperation(operation: ObjectOperation, messageTimeserial: String?) { + internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) { val update = when (operation.action) { ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1 ObjectOperationAction.MapSet -> { if (operation.mapOp != null) { - applyMapSet(operation.mapOp, messageTimeserial) // RTLM15d2 + applyMapSet(operation.mapOp, serial) // RTLM15d2 } else { throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") } } ObjectOperationAction.MapRemove -> { if (operation.mapOp != null) { - applyMapRemove(operation.mapOp, messageTimeserial) // RTLM15d3 + applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3 } else { throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") } @@ -132,7 +132,6 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang // RTLM7a2 - Replace existing entry with new one instead of mutating liveMap.data[mapOp.key] = LiveMapEntry( isTombstoned = false, // RTLM7a2c - tombstonedAt = null, timeserial = timeSerial, // RTLM7a2b data = mapOp.data // RTLM7a2a ) @@ -154,6 +153,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang private fun applyMapRemove( mapOp: ObjectMapOp, // RTLM8c1 timeSerial: String?, // RTLM8c2 + timeStamp: Long?, // RTLM8c3 ): LiveMapUpdate { val existingEntry = liveMap.data[mapOp.key] @@ -168,11 +168,20 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang return noOpMapUpdate } + val tombstonedAt = if (timeStamp != null) timeStamp else { + Log.w( + tag, + "No timestamp provided for MAP_REMOVE op on key=\"${mapOp.key}\"; using current time as tombstone time; " + + "objectId=${objectId}" + ) + System.currentTimeMillis() + } + if (existingEntry != null) { // RTLM8a2 - Replace existing entry with new one instead of mutating liveMap.data[mapOp.key] = LiveMapEntry( isTombstoned = true, // RTLM8a2c - tombstonedAt = System.currentTimeMillis(), + tombstonedAt = tombstonedAt, timeserial = timeSerial, // RTLM8a2b data = null // RTLM8a2a ) @@ -180,7 +189,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang // RTLM8b, RTLM8b1 liveMap.data[mapOp.key] = LiveMapEntry( isTombstoned = true, // RTLM8b2 - tombstonedAt = System.currentTimeMillis(), + tombstonedAt = tombstonedAt, timeserial = timeSerial ) } @@ -224,7 +233,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang val opTimeserial = entry.timeserial val update = if (entry.tombstone == true) { // RTLM17a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op - applyMapRemove(ObjectMapOp(key), opTimeserial) + applyMapRemove(ObjectMapOp(key), opTimeserial, entry.serialTimestamp) } else { // RTLM17a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op applyMapSet(ObjectMapOp(key, entry.data), opTimeserial) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index bc25a7c0e..7255d5c21 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -184,6 +184,109 @@ class LiveMapManagerTest { assertEquals(expectedUpdate, update.update) } + @Test + fun `(RTLM6, RTLM6c) DefaultLiveMap should handle tombstoned entries with serialTimestamp in state`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // Set initial data + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "1", + data = ObjectData(value = ObjectValue("oldValue")) + ) + + val expectedTimestamp = 1234567890L + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectMap( + semantics = MapSemantics.LWW, + entries = mapOf( + "key1" to ObjectMapEntry( + data = ObjectData(value = ObjectValue("newValue")), + timeserial = "serial1", + tombstone = true, + serialTimestamp = expectedTimestamp + ), + "key2" to ObjectMapEntry( + data = ObjectData(value = ObjectValue("value2")), + timeserial = "serial2" + ) + ) + ), + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = false, + ) + + val update = liveMapManager.applyState(objectState) + + assertFalse(liveMap.createOperationIsMerged) // RTLM6b + assertEquals(2, liveMap.data.size) // RTLM6c + assertTrue(liveMap.data["key1"]?.isTombstoned == true) // Should be tombstoned + assertEquals(expectedTimestamp, liveMap.data["key1"]?.tombstonedAt) // Should use provided serialTimestamp + assertEquals("value2", liveMap.data["key2"]?.data?.value?.value) // RTLM6c + + // Assert on update field - should show that key1 was removed (tombstoned) + val expectedUpdate = mapOf( + "key1" to LiveMapUpdate.Change.REMOVED, // key1 was tombstoned + "key2" to LiveMapUpdate.Change.UPDATED // key2 was added + ) + assertEquals(expectedUpdate, update.update) + } + + @Test + fun `(RTLM6, RTLM6c) DefaultLiveMap should handle tombstoned entries without serialTimestamp in state`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // Set initial data + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "1", + data = ObjectData(value = ObjectValue("oldValue")) + ) + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectMap( + semantics = MapSemantics.LWW, + entries = mapOf( + "key1" to ObjectMapEntry( + data = ObjectData(value = ObjectValue("newValue")), + timeserial = "serial1", + tombstone = true, + serialTimestamp = null // No timestamp provided + ), + "key2" to ObjectMapEntry( + data = ObjectData(value = ObjectValue("value2")), + timeserial = "serial2" + ) + ) + ), + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = false, + ) + + val beforeOperation = System.currentTimeMillis() + val update = liveMapManager.applyState(objectState) + val afterOperation = System.currentTimeMillis() + + assertFalse(liveMap.createOperationIsMerged) // RTLM6b + assertEquals(2, liveMap.data.size) // RTLM6c + assertTrue(liveMap.data["key1"]?.isTombstoned == true) // Should be tombstoned + assertNotNull(liveMap.data["key1"]?.tombstonedAt) // Should have timestamp + assertTrue(liveMap.data["key1"]?.tombstonedAt!! >= beforeOperation) // Should be after operation start + assertTrue(liveMap.data["key1"]?.tombstonedAt!! <= afterOperation) // Should be before operation end + assertEquals("value2", liveMap.data["key2"]?.data?.value?.value) // RTLM6c + + // Assert on update field - should show that key1 was removed (tombstoned) + val expectedUpdate = mapOf( + "key1" to LiveMapUpdate.Change.REMOVED, // key1 was tombstoned + "key2" to LiveMapUpdate.Change.UPDATED // key2 was added + ) + assertEquals(expectedUpdate, update.update) + } + @Test fun `(RTLM15, RTLM15d1, RTLM16) LiveMapManager should apply map create operation`() { @@ -209,7 +312,7 @@ class LiveMapManagerTest { ) // RTLM15d1 - Apply map create operation - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals(2, liveMap.data.size) // Should have both entries assertEquals("value1", liveMap.data["key1"]?.data?.value?.value) // Should have value1 @@ -217,6 +320,55 @@ class LiveMapManagerTest { assertTrue(liveMap.createOperationIsMerged) // Should be marked as merged } + @Test + fun `(RTLM16, RTLM16d, RTLM17) LiveMapManager should merge initial data from create operation with tombstoned entries`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // Set initial data + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial1", + data = ObjectData(value = ObjectValue("existingValue")) + ) + + val expectedTimestamp = 1234567890L + val operation = ObjectOperation( + action = ObjectOperationAction.MapCreate, + objectId = "map:testMap@1", + map = ObjectMap( + semantics = MapSemantics.LWW, + entries = mapOf( + "key1" to ObjectMapEntry( + data = ObjectData(value = ObjectValue("createValue")), + timeserial = "serial2", + tombstone = true, + serialTimestamp = expectedTimestamp + ), + "key2" to ObjectMapEntry( + data = ObjectData(value = ObjectValue("newValue")), + timeserial = "serial3" + ), + "key3" to ObjectMapEntry( + data = null, + timeserial = "serial4", + tombstone = true + ) + ) + ) + ) + + // RTLM16d - Merge initial data from create operation + liveMapManager.applyOperation(operation, "serial1", null) + + assertEquals(3, liveMap.data.size) // Should have all entries + assertTrue(liveMap.data["key1"]?.isTombstoned == true) // RTLM17a2 - Should be tombstoned + assertEquals(expectedTimestamp, liveMap.data["key1"]?.tombstonedAt) // Should use provided serialTimestamp + assertEquals("newValue", liveMap.data["key2"]?.data?.value?.value) // RTLM17a1 - Should be added + assertTrue(liveMap.data["key3"]?.isTombstoned == true) // RTLM17a2 - Should be tombstoned + assertTrue(liveMap.createOperationIsMerged) // RTLM17b - Should be marked as merged + } + @Test fun `(RTLM15, RTLM15d2, RTLM7) LiveMapManager should apply map set operation`() { val liveMap = getDefaultLiveMapWithMockedDeps() @@ -239,7 +391,7 @@ class LiveMapManagerTest { ) // RTLM15d2 - Apply map set operation - liveMapManager.applyOperation(operation, "serial2") + liveMapManager.applyOperation(operation, "serial2", null) assertEquals("newValue", liveMap.data["key1"]?.data?.value?.value) // RTLM7a2a assertEquals("serial2", liveMap.data["key1"]?.timeserial) // RTLM7a2b @@ -264,12 +416,45 @@ class LiveMapManagerTest { mapOp = ObjectMapOp(key = "key1") ) - // RTLM15d3 - Apply map remove operation - liveMapManager.applyOperation(operation, "serial2") + val expectedTimestamp = 1234567890L + // RTLM15d3 - Apply map remove operation with provided timestamp + liveMapManager.applyOperation(operation, "serial2", expectedTimestamp) + + assertNull(liveMap.data["key1"]?.data) // RTLM8a2a + assertEquals("serial2", liveMap.data["key1"]?.timeserial) // RTLM8a2b + assertTrue(liveMap.data["key1"]?.isTombstoned == true) // RTLM8a2c + assertEquals(expectedTimestamp, liveMap.data["key1"]?.tombstonedAt) // RTLM8c3 - Should use provided timestamp + } + + @Test + fun `(RTLM8, RTLM8c3) LiveMapManager should use current time when no timestamp provided for map remove operation`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // Set initial data + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial1", + data = ObjectData(value = ObjectValue("value1")) + ) + + val operation = ObjectOperation( + action = ObjectOperationAction.MapRemove, + objectId = "map:testMap@1", + mapOp = ObjectMapOp(key = "key1") + ) + + val beforeOperation = System.currentTimeMillis() + // RTLM8c3 - Apply map remove operation without timestamp (should use current time) + liveMapManager.applyOperation(operation, "serial2", null) + val afterOperation = System.currentTimeMillis() assertNull(liveMap.data["key1"]?.data) // RTLM8a2a assertEquals("serial2", liveMap.data["key1"]?.timeserial) // RTLM8a2b assertTrue(liveMap.data["key1"]?.isTombstoned == true) // RTLM8a2c + assertNotNull(liveMap.data["key1"]?.tombstonedAt) // Should have timestamp + assertTrue(liveMap.data["key1"]?.tombstonedAt!! >= beforeOperation) // Should be after operation start + assertTrue(liveMap.data["key1"]?.tombstonedAt!! <= afterOperation) // Should be before operation end } @Test @@ -285,7 +470,7 @@ class LiveMapManagerTest { // RTLM15d4 - Should throw error for unsupported action val exception = assertFailsWith { - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) } val errorInfo = exception.errorInfo @@ -317,7 +502,7 @@ class LiveMapManagerTest { ) // RTLM16b - Should skip if already merged - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals(0, liveMap.data.size) // Should not change (still empty) assertTrue(liveMap.createOperationIsMerged) // Should remain merged @@ -361,7 +546,7 @@ class LiveMapManagerTest { ) // RTLM16d - Merge initial data from create operation - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals(3, liveMap.data.size) // Should have all entries assertEquals("createValue", liveMap.data["key1"]?.data?.value?.value) // RTLM17a1 - Should be updated @@ -385,7 +570,7 @@ class LiveMapManagerTest { ) // RTLM7b - Create new entry - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals(1, liveMap.data.size) // Should have one entry assertEquals("newValue", liveMap.data["newKey"]?.data?.value?.value) // RTLM7b1 @@ -415,7 +600,7 @@ class LiveMapManagerTest { ) // RTLM7a - Should skip operation with lower serial - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals("existingValue", liveMap.data["key1"]?.data?.value?.value) // Should not change assertEquals("serial2", liveMap.data["key1"]?.timeserial) // Should keep original serial @@ -433,7 +618,7 @@ class LiveMapManagerTest { ) // RTLM8b - Create tombstoned entry for non-existing key - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals(1, liveMap.data.size) // Should have one entry assertNull(liveMap.data["nonExistingKey"]?.data) // RTLM8b1 @@ -460,7 +645,7 @@ class LiveMapManagerTest { ) // RTLM8a - Should skip operation with lower serial - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals("existingValue", liveMap.data["key1"]?.data?.value?.value) // Should not change assertEquals("serial2", liveMap.data["key1"]?.timeserial) // Should keep original serial @@ -489,7 +674,7 @@ class LiveMapManagerTest { ) // RTLM9b - Both null serials should be treated as equal - liveMapManager.applyOperation(operation, null) + liveMapManager.applyOperation(operation, null, null) assertEquals("existingValue", liveMap.data["key1"]?.data?.value?.value) // Should not change } @@ -516,7 +701,7 @@ class LiveMapManagerTest { ) // RTLM9d - Operation serial is greater than missing entry serial - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals("newValue", liveMap.data["key1"]?.data?.value?.value) // Should be updated assertEquals("serial1", liveMap.data["key1"]?.timeserial) // Should have new serial @@ -544,7 +729,7 @@ class LiveMapManagerTest { ) // RTLM9c - Missing operation serial is lower than existing entry serial - liveMapManager.applyOperation(operation, null) + liveMapManager.applyOperation(operation, null, null) assertEquals("existingValue", liveMap.data["key1"]?.data?.value?.value) // Should not change assertEquals("serial1", liveMap.data["key1"]?.timeserial) // Should keep original serial @@ -572,7 +757,7 @@ class LiveMapManagerTest { ) // RTLM9e - Higher serial should be applied - liveMapManager.applyOperation(operation, "serial2") + liveMapManager.applyOperation(operation, "serial2", null) assertEquals("newValue", liveMap.data["key1"]?.data?.value?.value) // Should be updated assertEquals("serial2", liveMap.data["key1"]?.timeserial) // Should have new serial @@ -600,7 +785,7 @@ class LiveMapManagerTest { ) // RTLM9e - Lower serial should be skipped - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) assertEquals("existingValue", liveMap.data["key1"]?.data?.value?.value) // Should not change assertEquals("serial2", liveMap.data["key1"]?.timeserial) // Should keep original serial @@ -621,7 +806,7 @@ class LiveMapManagerTest { ) val exception = assertFailsWith { - liveMapManager.applyOperation(operation, "serial1") + liveMapManager.applyOperation(operation, "serial1", null) } val errorInfo = exception.errorInfo From 5045f8e9c746bc990a2c71e36c55465915bc1362 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 25 Jul 2025 16:22:29 +0530 Subject: [PATCH 2/5] [ECO-5447] Updated BaseLiveObject#tombstone method to accept serialTimestamp 1. Updated LiveMap and LiveCounter impl to accept serialTimestamp from ObjectMessage 2. Updated unit tests for both LiveMap and LiveCounter --- .../io/ably/lib/objects/ObjectsManager.kt | 11 +-- .../ably/lib/objects/type/BaseLiveObject.kt | 16 ++-- .../type/livecounter/DefaultLiveCounter.kt | 6 +- .../type/livecounter/LiveCounterManager.kt | 8 +- .../objects/type/livemap/DefaultLiveMap.kt | 4 +- .../objects/type/livemap/LiveMapManager.kt | 6 +- .../io/ably/lib/objects/unit/TestHelpers.kt | 3 + .../unit/objects/ObjectsManagerTest.kt | 6 +- .../livecounter/DefaultLiveCounterTest.kt | 10 ++- .../livecounter/LiveCounterManagerTest.kt | 78 ++++++++++++++--- .../unit/type/livemap/DefaultLiveMapTest.kt | 10 ++- .../unit/type/livemap/LiveMapManagerTest.kt | 86 ++++++++++++++++--- 12 files changed, 195 insertions(+), 49 deletions(-) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index da499267f..38d6e77c0 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -15,7 +15,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje /** * @spec RTO5 - Sync objects data pool for collecting sync messages */ - private val syncObjectsDataPool = mutableMapOf() + private val syncObjectsDataPool = mutableMapOf() private var currentSyncId: String? = null /** * @spec RTO7 - Buffered object operations during sync @@ -130,19 +130,20 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje val existingObjectUpdates = mutableListOf>() // RTO5c1 - for ((objectId, objectState) in syncObjectsDataPool) { + for ((objectId, objectMessage) in syncObjectsDataPool) { + val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b receivedObjectIds.add(objectId) val existingObject = liveObjects.objectsPool.get(objectId) // RTO5c1a if (existingObject != null) { // Update existing object - val update = existingObject.applyObjectSync(objectState) // RTO5c1a1 + val update = existingObject.applyObjectSync(objectMessage) // RTO5c1a1 existingObjectUpdates.add(Pair(existingObject, update)) } else { // RTO5c1b // RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool val newObject = createObjectFromState(objectState) - newObject.applyObjectSync(objectState) + newObject.applyObjectSync(objectMessage) liveObjects.objectsPool.set(objectId, newObject) } } @@ -201,7 +202,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje val objectState: ObjectState = objectMessage.objectState if (objectState.counter != null || objectState.map != null) { - syncObjectsDataPool[objectState.objectId] = objectState + syncObjectsDataPool[objectState.objectId] = objectMessage } else { // RTO5c1b1c - object state must contain either counter or map data Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}") diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt index b740b0b8c..4363ebb3b 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt @@ -47,7 +47,8 @@ internal abstract class BaseLiveObject( * * @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter */ - internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate { + internal fun applyObjectSync(objectMessage: ObjectMessage): LiveObjectUpdate { + val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b validate(objectState) // object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation. // should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object. @@ -61,7 +62,7 @@ internal abstract class BaseLiveObject( } return noOpCounterUpdate } - return applyObjectState(objectState) // RTLM6, RTLC6 + return applyObjectState(objectState, objectMessage) // RTLM6, RTLC6 } /** @@ -122,11 +123,14 @@ internal abstract class BaseLiveObject( /** * Marks the object as tombstoned. */ - internal fun tombstone(): LiveObjectUpdate { + internal fun tombstone(serialTimestamp: Long?): LiveObjectUpdate { + if (serialTimestamp == null) { + Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead") + } isTombstoned = true - tombstonedAt = System.currentTimeMillis() + tombstonedAt = serialTimestamp?: System.currentTimeMillis() val update = clearData() - // TODO: Emit lifecycle events + // TODO: Emit BaseLiveObject lifecycle events return update } @@ -153,7 +157,7 @@ internal abstract class BaseLiveObject( * @return A map describing the changes made to the object's data * */ - abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate + abstract fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveObjectUpdate /** * Applies an operation to this live object. diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index f910f785b..c4637c461 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -72,12 +72,12 @@ internal class DefaultLiveCounter private constructor( override fun validate(state: ObjectState) = liveCounterManager.validate(state) - override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate { - return liveCounterManager.applyState(objectState) + override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate { + return liveCounterManager.applyState(objectState, message.serialTimestamp) } override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) { - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, message.serialTimestamp) } override fun clearData(): LiveCounterUpdate { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt index 0988b316d..344130dd5 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt @@ -17,11 +17,11 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): /** * @spec RTLC6 - Overrides counter data with state from sync */ - internal fun applyState(objectState: ObjectState): LiveCounterUpdate { + internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveCounterUpdate { val previousData = liveCounter.data.get() if (objectState.tombstone) { - liveCounter.tombstone() + liveCounter.tombstone(serialTimestamp) } else { // override data for this object with data from the object state liveCounter.createOperationIsMerged = false // RTLC6b @@ -39,7 +39,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): /** * @spec RTLC7 - Applies operations to LiveCounter */ - internal fun applyOperation(operation: ObjectOperation) { + internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) { val update = when (operation.action) { ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1 ObjectOperationAction.CounterInc -> { @@ -49,7 +49,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}") } } - ObjectOperationAction.ObjectDelete -> liveCounter.tombstone() + ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp) else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3 } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index c8cbb8c97..1511f392f 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -119,8 +119,8 @@ internal class DefaultLiveMap private constructor( override fun unsubscribeAll() = liveMapManager.unsubscribeAll() - override fun applyObjectState(objectState: ObjectState): LiveMapUpdate { - return liveMapManager.applyState(objectState) + override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate { + return liveMapManager.applyState(objectState, message.serialTimestamp) } override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index 3a7bf4c95..f0f771190 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -20,11 +20,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang /** * @spec RTLM6 - Overrides object data with state from sync */ - internal fun applyState(objectState: ObjectState): LiveMapUpdate { + internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveMapUpdate { val previousData = liveMap.data.toMap() if (objectState.tombstone) { - liveMap.tombstone() + liveMap.tombstone(serialTimestamp) } else { // override data for this object with data from the object state liveMap.createOperationIsMerged = false // RTLM6b @@ -68,7 +68,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}") } } - ObjectOperationAction.ObjectDelete -> liveMap.tombstone() + ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp) else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index a7453336f..6db9d5ccb 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -57,6 +57,9 @@ internal fun ObjectsPool.size(): Int { return pool.size } +internal val BaseLiveObject.TombstonedAt: Long? + get() = this.getPrivateField("tombstonedAt") + /** * ====================================== * START - DefaultLiveObjects dep mocks diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt index 2d777f3ff..9f6bcf7e3 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt @@ -80,17 +80,17 @@ class ObjectsManagerTest { val testObject1 = objectsPool.get("map:testObject@1") assertNotNull(testObject1, "map:testObject@1 should exist in pool after sync") verify(exactly = 1) { - testObject1.applyObjectSync(any()) + testObject1.applyObjectSync(any()) } val testObject2 = objectsPool.get("counter:testObject@2") assertNotNull(testObject2, "counter:testObject@2 should exist in pool after sync") verify(exactly = 1) { - testObject2.applyObjectSync(any()) + testObject2.applyObjectSync(any()) } val testObject3 = objectsPool.get("map:testObject@3") assertNotNull(testObject3, "map:testObject@3 should exist in pool after sync") verify(exactly = 1) { - testObject3.applyObjectSync(any()) + testObject3.applyObjectSync(any()) } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt index 49d90da22..76902ef3e 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt @@ -26,7 +26,15 @@ class DefaultLiveCounterTest { siteTimeserials = mapOf("site3" to "serial3", "site4" to "serial4"), tombstone = false, ) - liveCounter.applyObjectSync(objectState) + + val objectMessage = ObjectMessage( + id = "testId", + objectState = objectState, + serial = "serial1", + siteCode = "site1" + ) + + liveCounter.applyObjectSync(objectMessage) assertEquals(mapOf("site3" to "serial3", "site4" to "serial4"), liveCounter.siteTimeserials) // RTLC6a } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt index 48ddd41c2..88bc6b738 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt @@ -2,6 +2,7 @@ package io.ably.lib.objects.unit.type.livecounter import io.ably.lib.objects.* import io.ably.lib.objects.unit.LiveCounterManager +import io.ably.lib.objects.unit.TombstonedAt import io.ably.lib.objects.unit.getDefaultLiveCounterWithMockedDeps import io.ably.lib.types.AblyException import org.junit.Test @@ -24,7 +25,7 @@ class DefaultLiveCounterManagerTest { tombstone = false, ) - val update = liveCounterManager.applyState(objectState) + val update = liveCounterManager.applyState(objectState, null) assertFalse(liveCounter.createOperationIsMerged) // RTLC6b assertEquals(25.0, liveCounter.data.get()) // RTLC6c @@ -55,7 +56,7 @@ class DefaultLiveCounterManagerTest { ) // RTLC6d - Merge initial data from create operation - val update = liveCounterManager.applyState(objectState) + val update = liveCounterManager.applyState(objectState, null) assertEquals(25.0, liveCounter.data.get()) // 15 from state + 10 from create op assertEquals(20.0, update.update.amount) // Total change @@ -75,7 +76,7 @@ class DefaultLiveCounterManagerTest { // RTLC7d3 - Should throw error for unsupported action val exception = assertFailsWith { - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) } val errorInfo = exception.errorInfo @@ -96,7 +97,7 @@ class DefaultLiveCounterManagerTest { ) // RTLC7d1 - Apply counter create operation - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) assertEquals(20.0, liveCounter.data.get()) // Should be set to counter count assertTrue(liveCounter.createOperationIsMerged) // Should be marked as merged @@ -119,7 +120,7 @@ class DefaultLiveCounterManagerTest { ) // RTLC8b - Should skip if already merged - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) assertEquals(4.0, liveCounter.data.get()) // Should not change (still 0) assertTrue(liveCounter.createOperationIsMerged) // Should remain merged @@ -142,7 +143,7 @@ class DefaultLiveCounterManagerTest { ) // RTLC8c - Should apply if not merged - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) assertTrue(liveCounter.createOperationIsMerged) // Should be marked as merged assertEquals(30.0, liveCounter.data.get()) // Should be set to counter count @@ -165,7 +166,7 @@ class DefaultLiveCounterManagerTest { // RTLC10a - Should default to 0 // RTLC10b - Mark as merged - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) assertEquals(10.0, liveCounter.data.get()) // No change (null defaults to 0) assertTrue(liveCounter.createOperationIsMerged) // RTLC10b @@ -186,7 +187,7 @@ class DefaultLiveCounterManagerTest { ) // RTLC7d2 - Apply counter increment operation - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) assertEquals(15.0, liveCounter.data.get()) // RTLC9b - 10 + 5 } @@ -204,7 +205,7 @@ class DefaultLiveCounterManagerTest { // RTLC7d2 - Should throw error for missing payload val exception = assertFailsWith { - liveCounterManager.applyOperation(operation) + liveCounterManager.applyOperation(operation, null) } val errorInfo = exception.errorInfo @@ -229,7 +230,7 @@ class DefaultLiveCounterManagerTest { action = ObjectOperationAction.CounterInc, objectId = "testCounterId", counterOp = counterOp - )) + ), null) assertEquals(17.0, liveCounter.data.get()) // 10 + 7 } @@ -249,8 +250,63 @@ class DefaultLiveCounterManagerTest { action = ObjectOperationAction.CounterInc, objectId = "testCounterId", counterOp = counterOp - )) + ), null) assertEquals(10.0, liveCounter.data.get()) // Should not change (null defaults to 0) } + + @Test + fun `(RTLC6, OM2j) DefaultLiveCounter should handle tombstone with serialTimestamp in state`() { + val liveCounter = getDefaultLiveCounterWithMockedDeps() + val liveCounterManager = liveCounter.LiveCounterManager + + // Set initial data + liveCounter.data.set(10.0) + + val expectedTimestamp = 1234567890L + val objectState = ObjectState( + objectId = "testCounterId", + counter = null, // Null counter for tombstone + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = true, // Object is tombstoned + ) + + val update = liveCounterManager.applyState(objectState, expectedTimestamp) + + assertTrue(liveCounter.isTombstoned) // Should be tombstoned + assertEquals(expectedTimestamp, liveCounter.TombstonedAt) // Should use provided timestamp + assertEquals(0.0, liveCounter.data.get()) // Should be reset after tombstone + + // Assert on update field - should show the change + assertEquals(-10.0, update.update.amount) // Difference from 10.0 to 0.0 + } + + @Test + fun `(RTLC6, OM2j) DefaultLiveCounter should handle tombstone without serialTimestamp in state`() { + val liveCounter = getDefaultLiveCounterWithMockedDeps() + val liveCounterManager = liveCounter.LiveCounterManager + + // Set initial data + liveCounter.data.set(10.0) + + val objectState = ObjectState( + objectId = "testCounterId", + counter = null, // Null counter for tombstone + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = true, // Object is tombstoned + ) + + val beforeOperation = System.currentTimeMillis() + val update = liveCounterManager.applyState(objectState, null) + val afterOperation = System.currentTimeMillis() + + assertTrue(liveCounter.isTombstoned) // Should be tombstoned + assertNotNull(liveCounter.TombstonedAt) // Should have timestamp + assertTrue(liveCounter.TombstonedAt!! >= beforeOperation) // Should be after operation start + assertTrue(liveCounter.TombstonedAt!! <= afterOperation) // Should be before operation end + assertEquals(0.0, liveCounter.data.get()) // Should be reset after tombstone + + // Assert on update field - should show the change + assertEquals(-10.0, update.update.amount) // Difference from 10.0 to 0.0 + } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt index c071f6395..2f31e597e 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt @@ -30,7 +30,15 @@ class DefaultLiveMapTest { semantics = MapSemantics.LWW, ) ) - liveMap.applyObjectSync(objectState) + + val objectMessage = ObjectMessage( + id = "testId", + objectState = objectState, + serial = "serial1", + siteCode = "site1" + ) + + liveMap.applyObjectSync(objectMessage) assertEquals(mapOf("site3" to "serial3", "site4" to "serial4"), liveMap.siteTimeserials) // RTLM6a } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index 7255d5c21..9e5afb3b1 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -5,6 +5,7 @@ import io.ably.lib.objects.type.livemap.LiveMapEntry import io.ably.lib.objects.type.livemap.LiveMapManager import io.ably.lib.objects.type.map.LiveMapUpdate import io.ably.lib.objects.unit.LiveMapManager +import io.ably.lib.objects.unit.TombstonedAt import io.ably.lib.objects.unit.getDefaultLiveMapWithMockedDeps import io.ably.lib.types.AblyException import io.mockk.mockk @@ -47,7 +48,7 @@ class LiveMapManagerTest { tombstone = false, ) - val update = liveMapManager.applyState(objectState) + val update = liveMapManager.applyState(objectState, null) assertFalse(liveMap.createOperationIsMerged) // RTLM6b assertEquals(2, liveMap.data.size) // RTLM6c @@ -84,7 +85,7 @@ class LiveMapManagerTest { tombstone = false, ) - val update = liveMapManager.applyState(objectState) + val update = liveMapManager.applyState(objectState, null) assertFalse(liveMap.createOperationIsMerged) // RTLM6b assertEquals(0, liveMap.data.size) // RTLM6c - should be empty map @@ -113,7 +114,7 @@ class LiveMapManagerTest { tombstone = false, ) - val update = liveMapManager.applyState(objectState) + val update = liveMapManager.applyState(objectState, null) assertFalse(liveMap.createOperationIsMerged) // RTLM6b assertEquals(0, liveMap.data.size) // RTLM6c - should be empty map when map is null @@ -170,7 +171,7 @@ class LiveMapManagerTest { ) // RTLM6d - Merge initial data from create operation - val update = liveMapManager.applyState(objectState) + val update = liveMapManager.applyState(objectState, null) assertEquals(2, liveMap.data.size) // Should have both state and create op entries assertEquals("stateValue", liveMap.data["key1"]?.data?.value?.value) // State value takes precedence @@ -185,7 +186,7 @@ class LiveMapManagerTest { } @Test - fun `(RTLM6, RTLM6c) DefaultLiveMap should handle tombstoned entries with serialTimestamp in state`() { + fun `(RTLM6, RTLM6c, OME2d) DefaultLiveMap should handle tombstoned entries with serialTimestamp in state`() { val liveMap = getDefaultLiveMapWithMockedDeps() val liveMapManager = liveMap.LiveMapManager @@ -218,7 +219,7 @@ class LiveMapManagerTest { tombstone = false, ) - val update = liveMapManager.applyState(objectState) + val update = liveMapManager.applyState(objectState, null) assertFalse(liveMap.createOperationIsMerged) // RTLM6b assertEquals(2, liveMap.data.size) // RTLM6c @@ -235,7 +236,7 @@ class LiveMapManagerTest { } @Test - fun `(RTLM6, RTLM6c) DefaultLiveMap should handle tombstoned entries without serialTimestamp in state`() { + fun `(RTLM6, RTLM6c, OME2d) DefaultLiveMap should handle tombstoned entries without serialTimestamp in state`() { val liveMap = getDefaultLiveMapWithMockedDeps() val liveMapManager = liveMap.LiveMapManager @@ -268,7 +269,7 @@ class LiveMapManagerTest { ) val beforeOperation = System.currentTimeMillis() - val update = liveMapManager.applyState(objectState) + val update = liveMapManager.applyState(objectState, null) val afterOperation = System.currentTimeMillis() assertFalse(liveMap.createOperationIsMerged) // RTLM6b @@ -321,7 +322,7 @@ class LiveMapManagerTest { } @Test - fun `(RTLM16, RTLM16d, RTLM17) LiveMapManager should merge initial data from create operation with tombstoned entries`() { + fun `(RTLM16, RTLM16d, RTLM17, OME2d) LiveMapManager should merge initial data from create operation with tombstoned entries`() { val liveMap = getDefaultLiveMapWithMockedDeps() val liveMapManager = liveMap.LiveMapManager @@ -427,7 +428,7 @@ class LiveMapManagerTest { } @Test - fun `(RTLM8, RTLM8c3) LiveMapManager should use current time when no timestamp provided for map remove operation`() { + fun `(RTLM8, RTLM8c3, OME2d) LiveMapManager should use current time when no timestamp provided for map remove operation`() { val liveMap = getDefaultLiveMapWithMockedDeps() val liveMapManager = liveMap.LiveMapManager @@ -1002,4 +1003,69 @@ class LiveMapManagerTest { val result11 = livemapManager.calculateUpdateFromDataDiff(prevData11, newData11) assertEquals("Should not detect change for same data", emptyMap(), result11.update) } + + @Test + fun `(RTLM6, OM2j) DefaultLiveMap should handle tombstone with serialTimestamp in state`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // Set initial data + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "1", + data = ObjectData(value = ObjectValue("oldValue")) + ) + + val expectedTimestamp = 1234567890L + val objectState = ObjectState( + objectId = "map:testMap@1", + map = null, // Null map for tombstone + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = true, // Object is tombstoned + ) + + val update = liveMapManager.applyState(objectState, expectedTimestamp) + + assertTrue(liveMap.isTombstoned) // Should be tombstoned + assertEquals(expectedTimestamp, liveMap.TombstonedAt) // Should use provided timestamp + assertEquals(0, liveMap.data.size) // Should be empty after tombstone + + // Assert on update field - should show that key1 was removed + val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) + assertEquals(expectedUpdate, update.update) + } + + @Test + fun `(RTLM6, OM2j) DefaultLiveMap should handle tombstone without serialTimestamp in state`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // Set initial data + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "1", + data = ObjectData(value = ObjectValue("oldValue")) + ) + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = null, // Null map for tombstone + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = true, // Object is tombstoned + ) + + val beforeOperation = System.currentTimeMillis() + val update = liveMapManager.applyState(objectState, null) + val afterOperation = System.currentTimeMillis() + + assertTrue(liveMap.isTombstoned) // Should be tombstoned + assertNotNull(liveMap.TombstonedAt) // Should have timestamp + assertTrue(liveMap.TombstonedAt!! >= beforeOperation) // Should be after operation start + assertTrue(liveMap.TombstonedAt!! <= afterOperation) // Should be before operation end + assertEquals(0, liveMap.data.size) // Should be empty after tombstone + + // Assert on update field - should show that key1 was removed + val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) + assertEquals(expectedUpdate, update.update) + } } From f700c58f3d24a051c86544a108cfce2d33bae9c9 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 1 Aug 2025 16:08:59 +0530 Subject: [PATCH 3/5] [ECO-5447] Fixed LiveMapManager tests for tombstone serial --- .../unit/type/livemap/LiveMapManagerTest.kt | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index f8bec2cdf..eb79fa997 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -194,7 +194,7 @@ class LiveMapManagerTest { liveMap.data["key1"] = LiveMapEntry( isTombstoned = false, timeserial = "1", - data = ObjectData(value = ObjectValue("oldValue")) + data = ObjectData(value = ObjectValue.String("oldValue")) ) val expectedTimestamp = 1234567890L @@ -204,13 +204,13 @@ class LiveMapManagerTest { semantics = MapSemantics.LWW, entries = mapOf( "key1" to ObjectMapEntry( - data = ObjectData(value = ObjectValue("newValue")), + data = ObjectData(value = ObjectValue.String("newValue")), timeserial = "serial1", tombstone = true, serialTimestamp = expectedTimestamp ), "key2" to ObjectMapEntry( - data = ObjectData(value = ObjectValue("value2")), + data = ObjectData(value = ObjectValue.String("value2")), timeserial = "serial2" ) ) @@ -244,7 +244,7 @@ class LiveMapManagerTest { liveMap.data["key1"] = LiveMapEntry( isTombstoned = false, timeserial = "1", - data = ObjectData(value = ObjectValue("oldValue")) + data = ObjectData(value = ObjectValue.String("oldValue")) ) val objectState = ObjectState( @@ -253,13 +253,13 @@ class LiveMapManagerTest { semantics = MapSemantics.LWW, entries = mapOf( "key1" to ObjectMapEntry( - data = ObjectData(value = ObjectValue("newValue")), + data = ObjectData(value = ObjectValue.String("newValue")), timeserial = "serial1", tombstone = true, serialTimestamp = null // No timestamp provided ), "key2" to ObjectMapEntry( - data = ObjectData(value = ObjectValue("value2")), + data = ObjectData(value = ObjectValue.String("value2")), timeserial = "serial2" ) ) @@ -330,7 +330,7 @@ class LiveMapManagerTest { liveMap.data["key1"] = LiveMapEntry( isTombstoned = false, timeserial = "serial1", - data = ObjectData(value = ObjectValue("existingValue")) + data = ObjectData(value = ObjectValue.String("existingValue")) ) val expectedTimestamp = 1234567890L @@ -341,13 +341,13 @@ class LiveMapManagerTest { semantics = MapSemantics.LWW, entries = mapOf( "key1" to ObjectMapEntry( - data = ObjectData(value = ObjectValue("createValue")), + data = ObjectData(value = ObjectValue.String("createValue")), timeserial = "serial2", tombstone = true, serialTimestamp = expectedTimestamp ), "key2" to ObjectMapEntry( - data = ObjectData(value = ObjectValue("newValue")), + data = ObjectData(value = ObjectValue.String("newValue")), timeserial = "serial3" ), "key3" to ObjectMapEntry( @@ -436,7 +436,7 @@ class LiveMapManagerTest { liveMap.data["key1"] = LiveMapEntry( isTombstoned = false, timeserial = "serial1", - data = ObjectData(value = ObjectValue("value1")) + data = ObjectData(value = ObjectValue.String("value1")) ) val operation = ObjectOperation( @@ -1013,7 +1013,7 @@ class LiveMapManagerTest { liveMap.data["key1"] = LiveMapEntry( isTombstoned = false, timeserial = "1", - data = ObjectData(value = ObjectValue("oldValue")) + data = ObjectData(value = ObjectValue.String("oldValue")) ) val expectedTimestamp = 1234567890L @@ -1044,7 +1044,7 @@ class LiveMapManagerTest { liveMap.data["key1"] = LiveMapEntry( isTombstoned = false, timeserial = "1", - data = ObjectData(value = ObjectValue("oldValue")) + data = ObjectData(value = ObjectValue.String("oldValue")) ) val objectState = ObjectState( From 97f55f79c53f3e2f515cf0b74d905cf325336690 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 1 Aug 2025 17:38:22 +0530 Subject: [PATCH 4/5] [ECO-5447] Replaced partial testObjectRemovalFromRoot with well simulated testObjectDelete test --- .../integration/DefaultLiveObjectsTest.kt | 41 +++++++++++++++---- .../lib/objects/integration/helpers/Utils.kt | 28 ++++++++++++- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt index 3bea82c92..2a3a08090 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt @@ -6,11 +6,15 @@ import io.ably.lib.objects.* import io.ably.lib.objects.Binary import io.ably.lib.objects.integration.helpers.State import io.ably.lib.objects.integration.helpers.fixtures.initializeRootMap +import io.ably.lib.objects.integration.helpers.simulateObjectDelete import io.ably.lib.objects.integration.setup.IntegrationTest import io.ably.lib.objects.size import io.ably.lib.objects.state.ObjectsStateEvent import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.livecounter.DefaultLiveCounter +import io.ably.lib.objects.type.livemap.DefaultLiveMap import io.ably.lib.objects.type.map.LiveMap +import io.ably.lib.objects.type.map.LiveMapUpdate import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals @@ -155,13 +159,14 @@ class DefaultLiveObjectsTest : IntegrationTest() { } /** - * Spec: RTLO4e - Tests the removal of objects from the root map. * Server runs periodic garbage collection (GC) to remove orphaned objects and will send * OBJECT_DELETE events for objects that are no longer referenced. - * `OBJECT_DELETE` event is not covered in the test and we only check if map entries are removed + * So, we simulate the deletion of an object by sending an object delete ProtocolMessage. + * This does not actually delete the object from the server, only simulates the deletion locally. + * Spec: RTLO4e */ @Test - fun testObjectRemovalFromRoot() = runTest { + fun testObjectDelete() = runTest { val channelName = generateChannelName() // Initialize the root map on the channel with initial data restObjects.initializeRootMap(channelName) @@ -171,19 +176,41 @@ class DefaultLiveObjectsTest : IntegrationTest() { assertEquals(6L, rootMap.size()) // Should have 6 entries initially // Remove the "referencedCounter" from the root map - assertNotNull(rootMap.get("referencedCounter")) // Access to ensure it exists before removal + val refCounter = rootMap.get("referencedCounter") as LiveCounter + assertNotNull(refCounter) + // Subscribe to counter updates to verify removal + val counterUpdates = mutableListOf() + refCounter.subscribe { event -> + counterUpdates.add(event.update.amount) + } - restObjects.removeMapValue(channelName, "root", "referencedCounter") + // Simulate the deletion of the referencedCounter object + channel.objects.simulateObjectDelete(refCounter as DefaultLiveCounter) assertWaiter { rootMap.size() == 5L } // Wait for the removal to complete assertNull(rootMap.get("referencedCounter")) // Should be null after removal + assertEquals(1, counterUpdates.size) // Should have received one update for deletion + assertEquals(20.0, counterUpdates[0]) // The update should indicate the counter was removed // Remove the "referencedMap" from the root map - assertNotNull(rootMap.get("referencedMap")) // Access to ensure it exists before removal + val referencedMap = rootMap.get("referencedMap") as LiveMap + assertNotNull(referencedMap) + // Subscribe to map updates to verify removal + val mapUpdates = mutableListOf>() + referencedMap.subscribe { event -> + mapUpdates.add(event.update) + } - restObjects.removeMapValue(channelName, "root", "referencedMap") + // Simulate the deletion of the referencedMap object + channel.objects.simulateObjectDelete(referencedMap as DefaultLiveMap) assertWaiter { rootMap.size() == 4L } // Wait for the removal to complete assertNull(rootMap.get("referencedMap")) // Should be null after removal + assertEquals(1, mapUpdates.size) // Should have received one update for deletion + + val updatedMap = mapUpdates.first() + assertEquals(1, updatedMap.size) // Should have one change + assertEquals("counterKey", updatedMap.keys.first()) // The change should be for the "counterKey" + assertEquals(LiveMapUpdate.Change.REMOVED, updatedMap.values.first()) // Should indicate removal } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt index f50c60cf9..0e3213748 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt @@ -1,14 +1,40 @@ package io.ably.lib.objects.integration.helpers +import io.ably.lib.objects.* import io.ably.lib.objects.DefaultLiveObjects +import io.ably.lib.objects.ObjectMessage +import io.ably.lib.objects.ObjectOperation +import io.ably.lib.objects.type.BaseLiveObject import io.ably.lib.objects.type.counter.LiveCounter import io.ably.lib.objects.type.map.LiveMap -import io.ably.lib.objects.LiveObjects import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap +import io.ably.lib.types.ProtocolMessage internal val LiveMap.ObjectId get() = (this as DefaultLiveMap).objectId internal val LiveCounter.ObjectId get() = (this as DefaultLiveCounter).objectId internal val LiveObjects.State get() = (this as DefaultLiveObjects).state + +/** + * Server runs periodic garbage collection (GC) to remove orphaned objects and will send + * OBJECT_DELETE events for objects that are no longer referenced. + * So, we simulate the deletion of an object by sending a ProtocolMessage. + */ +internal fun LiveObjects.simulateObjectDelete(baseObject: BaseLiveObject) { + val defaultLiveObjects = this as DefaultLiveObjects + val existingSiteCode = baseObject.siteTimeserials.keys.first() + val existingSiteSerial = baseObject.siteTimeserials[existingSiteCode]!! + + val deleteObjectProtoMsg = ProtocolMessage(ProtocolMessage.Action.`object`, channelName) + deleteObjectProtoMsg.state = arrayOf(ObjectMessage( + siteCode = existingSiteCode, + serial = existingSiteSerial + "1", // Increment serial to accept new operation + operation = ObjectOperation( + action = ObjectOperationAction.ObjectDelete, + objectId = baseObject.objectId, + ) + )) + defaultLiveObjects.handle(deleteObjectProtoMsg) +} From 7023ff6ea730c2ef7b27f68ede163f0a4ffa38e8 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 1 Aug 2025 19:15:16 +0530 Subject: [PATCH 5/5] ECO-5447] Added calculateUpdateFromDataDiff method to CounterManager - Added extra assertion for deleting of "valuesMap" --- .../type/livecounter/DefaultLiveCounter.kt | 2 +- .../type/livecounter/LiveCounterManager.kt | 6 ++++- .../integration/DefaultLiveObjectsTest.kt | 25 ++++++++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index 3e2241e81..73dd63d62 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -80,7 +80,7 @@ internal class DefaultLiveCounter private constructor( } override fun clearData(): LiveCounterUpdate { - return LiveCounterUpdate(data.get()).apply { data.set(0.0) } + return liveCounterManager.calculateUpdateFromDataDiff(data.get(), 0.0).apply { data.set(0.0) } } override fun notifyUpdated(update: LiveObjectUpdate) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt index 344130dd5..667773abf 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt @@ -33,7 +33,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): } } - return LiveCounterUpdate(liveCounter.data.get() - previousData) + return calculateUpdateFromDataDiff(previousData, liveCounter.data.get()) } /** @@ -85,6 +85,10 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): return LiveCounterUpdate(amount) } + internal fun calculateUpdateFromDataDiff(prevData: Double, newData: Double): LiveCounterUpdate { + return LiveCounterUpdate(newData - prevData) + } + /** * @spec RTLC10 - Merges initial data from create operation */ diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt index 2a3a08090..b3a6f5d95 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt @@ -190,7 +190,7 @@ class DefaultLiveObjectsTest : IntegrationTest() { assertWaiter { rootMap.size() == 5L } // Wait for the removal to complete assertNull(rootMap.get("referencedCounter")) // Should be null after removal assertEquals(1, counterUpdates.size) // Should have received one update for deletion - assertEquals(20.0, counterUpdates[0]) // The update should indicate the counter was removed + assertEquals(-20.0, counterUpdates[0]) // The update should indicate counter was removed with value 20 // Remove the "referencedMap" from the root map val referencedMap = rootMap.get("referencedMap") as LiveMap @@ -212,5 +212,28 @@ class DefaultLiveObjectsTest : IntegrationTest() { assertEquals(1, updatedMap.size) // Should have one change assertEquals("counterKey", updatedMap.keys.first()) // The change should be for the "counterKey" assertEquals(LiveMapUpdate.Change.REMOVED, updatedMap.values.first()) // Should indicate removal + + // Remove the "valuesMap" from the root map + val valuesMap = rootMap.get("valuesMap") as LiveMap + assertNotNull(valuesMap) + // Subscribe to map updates to verify removal + val valuesMapUpdates = mutableListOf>() + valuesMap.subscribe { event -> + valuesMapUpdates.add(event.update) + } + + // Simulate the deletion of the valuesMap object + channel.objects.simulateObjectDelete(valuesMap as DefaultLiveMap) + + assertWaiter { rootMap.size() == 3L } // Wait for the removal to complete + assertNull(rootMap.get("valuesMap")) // Should be null after removal + assertEquals(1, valuesMapUpdates.size) // Should have received one update for deletion + + val updatedValuesMap = valuesMapUpdates.first() + assertEquals(13, updatedValuesMap.size) // Should have 13 changes (one for each entry in valuesMap) + // Verify that all entries in valuesMap were marked as REMOVED + updatedValuesMap.values.forEach { change -> + assertEquals(LiveMapUpdate.Change.REMOVED, change) + } } }