Skip to content
Closed
12 changes: 12 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ internal data class ObjectMapEntry(
*/
val timeserial: String? = null,

/**
* A timestamp from the [timeserial] field. Only present if [tombstone] is `true`
* Spec: OME2d
*/
val serialTimestamp: Long? = null,

/**
* The data that represents the value of the map entry.
* Spec: OME2c
Expand Down Expand Up @@ -325,6 +331,12 @@ internal data class ObjectMessage(
*/
val serial: String? = null,

/**
* A timestamp from the [serial] field.
* Spec: OM2j
*/
val serialTimestamp: Long? = null,

/**
* An opaque string used as a key to update the map of serial values on an object.
* Spec: OM2i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
/**
* @spec RTO5 - Sync objects data pool for collecting sync messages
*/
private val syncObjectsDataPool = mutableMapOf<String, ObjectState>()
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
private var currentSyncId: String? = null
/**
* @spec RTO7 - Buffered object operations during sync
Expand Down Expand Up @@ -130,19 +130,20 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, LiveObjectUpdate>>()

// RTO5c1
for ((objectId, objectState) in syncObjectsDataPool) {
for ((objectId, objectMessage) in syncObjectsDataPool) {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
receivedObjectIds.add(objectId)
val existingObject = liveObjects.objectsPool.get(objectId)

// RTO5c1a
if (existingObject != null) {
// Update existing object
val update = existingObject.applyObjectSync(objectState) // RTO5c1a1
val update = existingObject.applyObjectSync(objectMessage) // RTO5c1a1
existingObjectUpdates.add(Pair(existingObject, update))
} else { // RTO5c1b
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
val newObject = createObjectFromState(objectState)
newObject.applyObjectSync(objectState)
newObject.applyObjectSync(objectMessage)
liveObjects.objectsPool.set(objectId, newObject)
}
}
Expand Down Expand Up @@ -201,7 +202,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje

val objectState: ObjectState = objectMessage.objectState
if (objectState.counter != null || objectState.map != null) {
syncObjectsDataPool[objectState.objectId] = objectState
syncObjectsDataPool[objectState.objectId] = objectMessage
} else {
// RTO5c1b1c - object state must contain either counter or map data
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
if (operation != null) fieldCount++
if (objectState != null) fieldCount++
if (serial != null) fieldCount++
if (serialTimestamp != null) fieldCount++
if (siteCode != null) fieldCount++

packer.packMapHeader(fieldCount)
Expand Down Expand Up @@ -81,6 +82,11 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
packer.packString(serial)
}

if (serialTimestamp != null) {
packer.packString("serialTimestamp")
packer.packLong(serialTimestamp)
}

if (siteCode != null) {
packer.packString("siteCode")
packer.packString(siteCode)
Expand All @@ -106,6 +112,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
var operation: ObjectOperation? = null
var objectState: ObjectState? = null
var serial: String? = null
var serialTimestamp: Long? = null
var siteCode: String? = null

for (i in 0 until fieldCount) {
Expand All @@ -126,6 +133,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
"operation" -> operation = readObjectOperation(unpacker)
"object" -> objectState = readObjectState(unpacker)
"serial" -> serial = unpacker.unpackString()
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
"siteCode" -> siteCode = unpacker.unpackString()
else -> unpacker.skipValue()
}
Expand All @@ -140,6 +148,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
operation = operation,
objectState = objectState,
serial = serial,
serialTimestamp = serialTimestamp,
siteCode = siteCode
)
}
Expand Down Expand Up @@ -557,6 +566,7 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {

if (tombstone != null) fieldCount++
if (timeserial != null) fieldCount++
if (serialTimestamp != null) fieldCount++
if (data != null) fieldCount++

packer.packMapHeader(fieldCount)
Expand All @@ -571,6 +581,11 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {
packer.packString(timeserial)
}

if (serialTimestamp != null) {
packer.packString("serialTimestamp")
packer.packLong(serialTimestamp)
}

if (data != null) {
packer.packString("data")
data.writeMsgpack(packer)
Expand All @@ -585,6 +600,7 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {

var tombstone: Boolean? = null
var timeserial: String? = null
var serialTimestamp: Long? = null
var data: ObjectData? = null

for (i in 0 until fieldCount) {
Expand All @@ -599,12 +615,13 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {
when (fieldName) {
"tombstone" -> tombstone = unpacker.unpackBoolean()
"timeserial" -> timeserial = unpacker.unpackString()
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
"data" -> data = readObjectData(unpacker)
else -> unpacker.skipValue()
}
}

return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, data = data)
return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, serialTimestamp = serialTimestamp, data = data)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ internal abstract class BaseLiveObject(
*
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
*/
internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate {
internal fun applyObjectSync(objectMessage: ObjectMessage): LiveObjectUpdate {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
validate(objectState)
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
Expand All @@ -61,7 +62,7 @@ internal abstract class BaseLiveObject(
}
return noOpCounterUpdate
}
return applyObjectState(objectState) // RTLM6, RTLC6
return applyObjectState(objectState, objectMessage) // RTLM6, RTLC6
}

/**
Expand Down Expand Up @@ -122,11 +123,14 @@ internal abstract class BaseLiveObject(
/**
* Marks the object as tombstoned.
*/
internal fun tombstone(): LiveObjectUpdate {
internal fun tombstone(serialTimestamp: Long?): LiveObjectUpdate {
if (serialTimestamp == null) {
Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead")
}
isTombstoned = true
tombstonedAt = System.currentTimeMillis()
tombstonedAt = serialTimestamp?: System.currentTimeMillis()
val update = clearData()
// TODO: Emit lifecycle events
// TODO: Emit BaseLiveObject lifecycle events
return update
}

Expand All @@ -153,7 +157,7 @@ internal abstract class BaseLiveObject(
* @return A map describing the changes made to the object's data
*
*/
abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate
abstract fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveObjectUpdate

/**
* Applies an operation to this live object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ internal class DefaultLiveCounter private constructor(

override fun validate(state: ObjectState) = liveCounterManager.validate(state)

override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate {
return liveCounterManager.applyState(objectState)
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate {
return liveCounterManager.applyState(objectState, message.serialTimestamp)
}

override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
liveCounterManager.applyOperation(operation)
liveCounterManager.applyOperation(operation, message.serialTimestamp)
}

override fun clearData(): LiveCounterUpdate {
return LiveCounterUpdate(data.get()).apply { data.set(0.0) }
return liveCounterManager.calculateUpdateFromDataDiff(data.get(), 0.0).apply { data.set(0.0) }
}

override fun notifyUpdated(update: LiveObjectUpdate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
/**
* @spec RTLC6 - Overrides counter data with state from sync
*/
internal fun applyState(objectState: ObjectState): LiveCounterUpdate {
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveCounterUpdate {
val previousData = liveCounter.data.get()

if (objectState.tombstone) {
liveCounter.tombstone()
liveCounter.tombstone(serialTimestamp)
} else {
// override data for this object with data from the object state
liveCounter.createOperationIsMerged = false // RTLC6b
Expand All @@ -33,13 +33,13 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
}
}

return LiveCounterUpdate(liveCounter.data.get() - previousData)
return calculateUpdateFromDataDiff(previousData, liveCounter.data.get())
}

/**
* @spec RTLC7 - Applies operations to LiveCounter
*/
internal fun applyOperation(operation: ObjectOperation) {
internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) {
val update = when (operation.action) {
ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
ObjectOperationAction.CounterInc -> {
Expand All @@ -49,7 +49,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}")
}
}
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone()
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp)
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
}

Expand Down Expand Up @@ -85,6 +85,10 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
return LiveCounterUpdate(amount)
}

internal fun calculateUpdateFromDataDiff(prevData: Double, newData: Double): LiveCounterUpdate {
return LiveCounterUpdate(newData - prevData)
}

/**
* @spec RTLC10 - Merges initial data from create operation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ internal class DefaultLiveMap private constructor(

override fun unsubscribeAll() = liveMapManager.unsubscribeAll()

override fun applyObjectState(objectState: ObjectState): LiveMapUpdate {
return liveMapManager.applyState(objectState)
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate {
return liveMapManager.applyState(objectState, message.serialTimestamp)
}

override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
liveMapManager.applyOperation(operation, message.serial)
liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
}

override fun clearData(): LiveMapUpdate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
/**
* @spec RTLM6 - Overrides object data with state from sync
*/
internal fun applyState(objectState: ObjectState): LiveMapUpdate {
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveMapUpdate {
val previousData = liveMap.data.toMap()

if (objectState.tombstone) {
liveMap.tombstone()
liveMap.tombstone(serialTimestamp)
} else {
// override data for this object with data from the object state
liveMap.createOperationIsMerged = false // RTLM6b
Expand All @@ -33,7 +33,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
objectState.map?.entries?.forEach { (key, entry) ->
liveMap.data[key] = LiveMapEntry(
isTombstoned = entry.tombstone ?: false,
tombstonedAt = if (entry.tombstone == true) System.currentTimeMillis() else null,
tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp ?: System.currentTimeMillis() else null,
timeserial = entry.timeserial,
data = entry.data
)
Expand All @@ -51,24 +51,24 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
/**
* @spec RTLM15 - Applies operations to LiveMap
*/
internal fun applyOperation(operation: ObjectOperation, messageTimeserial: String?) {
internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) {
val update = when (operation.action) {
ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1
ObjectOperationAction.MapSet -> {
if (operation.mapOp != null) {
applyMapSet(operation.mapOp, messageTimeserial) // RTLM15d2
applyMapSet(operation.mapOp, serial) // RTLM15d2
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
ObjectOperationAction.MapRemove -> {
if (operation.mapOp != null) {
applyMapRemove(operation.mapOp, messageTimeserial) // RTLM15d3
applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
ObjectOperationAction.ObjectDelete -> liveMap.tombstone()
ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp)
else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
}

Expand Down Expand Up @@ -132,7 +132,6 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
// RTLM7a2 - Replace existing entry with new one instead of mutating
liveMap.data[mapOp.key] = LiveMapEntry(
isTombstoned = false, // RTLM7a2c
tombstonedAt = null,
timeserial = timeSerial, // RTLM7a2b
data = mapOp.data // RTLM7a2a
)
Expand All @@ -154,6 +153,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
private fun applyMapRemove(
mapOp: ObjectMapOp, // RTLM8c1
timeSerial: String?, // RTLM8c2
timeStamp: Long?, // RTLM8c3
): LiveMapUpdate {
val existingEntry = liveMap.data[mapOp.key]

Expand All @@ -168,19 +168,28 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
return noOpMapUpdate
}

val tombstonedAt = if (timeStamp != null) timeStamp else {
Log.w(
tag,
"No timestamp provided for MAP_REMOVE op on key=\"${mapOp.key}\"; using current time as tombstone time; " +
"objectId=${objectId}"
)
System.currentTimeMillis()
}

if (existingEntry != null) {
// RTLM8a2 - Replace existing entry with new one instead of mutating
liveMap.data[mapOp.key] = LiveMapEntry(
isTombstoned = true, // RTLM8a2c
tombstonedAt = System.currentTimeMillis(),
tombstonedAt = tombstonedAt,
timeserial = timeSerial, // RTLM8a2b
data = null // RTLM8a2a
)
} else {
// RTLM8b, RTLM8b1
liveMap.data[mapOp.key] = LiveMapEntry(
isTombstoned = true, // RTLM8b2
tombstonedAt = System.currentTimeMillis(),
tombstonedAt = tombstonedAt,
timeserial = timeSerial
)
}
Expand Down Expand Up @@ -224,7 +233,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
val opTimeserial = entry.timeserial
val update = if (entry.tombstone == true) {
// RTLM17a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op
applyMapRemove(ObjectMapOp(key), opTimeserial)
applyMapRemove(ObjectMapOp(key), opTimeserial, entry.serialTimestamp)
} else {
// RTLM17a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op
applyMapSet(ObjectMapOp(key, entry.data), opTimeserial)
Expand Down
Loading
Loading