Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ internal data class ObjectMapEntry(
*/
val timeserial: String? = null,

/**
* A timestamp from the [timeserial] field. Only present if [tombstone] is `true`
* Spec: OME2d
*/
val serialTimestamp: Long? = null,

/**
* The data that represents the value of the map entry.
* Spec: OME2c
Expand Down Expand Up @@ -325,6 +331,12 @@ internal data class ObjectMessage(
*/
val serial: String? = null,

/**
* A timestamp from the [serial] field.
* Spec: OM2j
*/
val serialTimestamp: Long? = null,

/**
* An opaque string used as a key to update the map of serial values on an object.
* Spec: OM2i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
/**
* @spec RTO5 - Sync objects data pool for collecting sync messages
*/
private val syncObjectsDataPool = mutableMapOf<String, ObjectState>()
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
private var currentSyncId: String? = null
/**
* @spec RTO7 - Buffered object operations during sync
Expand Down Expand Up @@ -130,19 +130,20 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje
val existingObjectUpdates = mutableListOf<Pair<BaseLiveObject, LiveObjectUpdate>>()

// RTO5c1
for ((objectId, objectState) in syncObjectsDataPool) {
for ((objectId, objectMessage) in syncObjectsDataPool) {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
receivedObjectIds.add(objectId)
val existingObject = liveObjects.objectsPool.get(objectId)

// RTO5c1a
if (existingObject != null) {
// Update existing object
val update = existingObject.applyObjectSync(objectState) // RTO5c1a1
val update = existingObject.applyObjectSync(objectMessage) // RTO5c1a1
existingObjectUpdates.add(Pair(existingObject, update))
} else { // RTO5c1b
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
val newObject = createObjectFromState(objectState)
newObject.applyObjectSync(objectState)
newObject.applyObjectSync(objectMessage)
liveObjects.objectsPool.set(objectId, newObject)
}
}
Expand Down Expand Up @@ -201,7 +202,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje

val objectState: ObjectState = objectMessage.objectState
if (objectState.counter != null || objectState.map != null) {
syncObjectsDataPool[objectState.objectId] = objectState
syncObjectsDataPool[objectState.objectId] = objectMessage
} else {
// RTO5c1b1c - object state must contain either counter or map data
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
if (operation != null) fieldCount++
if (objectState != null) fieldCount++
if (serial != null) fieldCount++
if (serialTimestamp != null) fieldCount++
if (siteCode != null) fieldCount++

packer.packMapHeader(fieldCount)
Expand Down Expand Up @@ -81,6 +82,11 @@ internal fun ObjectMessage.writeMsgpack(packer: MessagePacker) {
packer.packString(serial)
}

if (serialTimestamp != null) {
packer.packString("serialTimestamp")
packer.packLong(serialTimestamp)
}

if (siteCode != null) {
packer.packString("siteCode")
packer.packString(siteCode)
Expand All @@ -106,6 +112,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
var operation: ObjectOperation? = null
var objectState: ObjectState? = null
var serial: String? = null
var serialTimestamp: Long? = null
var siteCode: String? = null

for (i in 0 until fieldCount) {
Expand All @@ -126,6 +133,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
"operation" -> operation = readObjectOperation(unpacker)
"object" -> objectState = readObjectState(unpacker)
"serial" -> serial = unpacker.unpackString()
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
"siteCode" -> siteCode = unpacker.unpackString()
else -> unpacker.skipValue()
}
Expand All @@ -140,6 +148,7 @@ internal fun readObjectMessage(unpacker: MessageUnpacker): ObjectMessage {
operation = operation,
objectState = objectState,
serial = serial,
serialTimestamp = serialTimestamp,
siteCode = siteCode
)
}
Expand Down Expand Up @@ -557,6 +566,7 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {

if (tombstone != null) fieldCount++
if (timeserial != null) fieldCount++
if (serialTimestamp != null) fieldCount++
if (data != null) fieldCount++

packer.packMapHeader(fieldCount)
Expand All @@ -571,6 +581,11 @@ private fun ObjectMapEntry.writeMsgpack(packer: MessagePacker) {
packer.packString(timeserial)
}

if (serialTimestamp != null) {
packer.packString("serialTimestamp")
packer.packLong(serialTimestamp)
}

if (data != null) {
packer.packString("data")
data.writeMsgpack(packer)
Expand All @@ -585,6 +600,7 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {

var tombstone: Boolean? = null
var timeserial: String? = null
var serialTimestamp: Long? = null
var data: ObjectData? = null

for (i in 0 until fieldCount) {
Expand All @@ -599,12 +615,13 @@ private fun readObjectMapEntry(unpacker: MessageUnpacker): ObjectMapEntry {
when (fieldName) {
"tombstone" -> tombstone = unpacker.unpackBoolean()
"timeserial" -> timeserial = unpacker.unpackString()
"serialTimestamp" -> serialTimestamp = unpacker.unpackLong()
"data" -> data = readObjectData(unpacker)
else -> unpacker.skipValue()
}
}

return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, data = data)
return ObjectMapEntry(tombstone = tombstone, timeserial = timeserial, serialTimestamp = serialTimestamp, data = data)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ internal abstract class BaseLiveObject(
*
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
*/
internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate {
internal fun applyObjectSync(objectMessage: ObjectMessage): LiveObjectUpdate {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
validate(objectState)
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
Expand All @@ -61,7 +62,7 @@ internal abstract class BaseLiveObject(
}
return noOpCounterUpdate
}
return applyObjectState(objectState) // RTLM6, RTLC6
return applyObjectState(objectState, objectMessage) // RTLM6, RTLC6
}

/**
Expand Down Expand Up @@ -122,11 +123,14 @@ internal abstract class BaseLiveObject(
/**
* Marks the object as tombstoned.
*/
internal fun tombstone(): LiveObjectUpdate {
internal fun tombstone(serialTimestamp: Long?): LiveObjectUpdate {
if (serialTimestamp == null) {
Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead")
}
isTombstoned = true
tombstonedAt = System.currentTimeMillis()
tombstonedAt = serialTimestamp?: System.currentTimeMillis()
val update = clearData()
// TODO: Emit lifecycle events
// TODO: Emit BaseLiveObject lifecycle events
return update
}

Expand All @@ -153,7 +157,7 @@ internal abstract class BaseLiveObject(
* @return A map describing the changes made to the object's data
*
*/
abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate
abstract fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveObjectUpdate

/**
* Applies an operation to this live object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ internal class DefaultLiveCounter private constructor(

override fun validate(state: ObjectState) = liveCounterManager.validate(state)

override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate {
return liveCounterManager.applyState(objectState)
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate {
return liveCounterManager.applyState(objectState, message.serialTimestamp)
}

override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
liveCounterManager.applyOperation(operation)
liveCounterManager.applyOperation(operation, message.serialTimestamp)
}

override fun clearData(): LiveCounterUpdate {
return LiveCounterUpdate(data.get()).apply { data.set(0.0) }
return liveCounterManager.calculateUpdateFromDataDiff(data.get(), 0.0).apply { data.set(0.0) }
}

override fun notifyUpdated(update: LiveObjectUpdate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
/**
* @spec RTLC6 - Overrides counter data with state from sync
*/
internal fun applyState(objectState: ObjectState): LiveCounterUpdate {
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveCounterUpdate {
val previousData = liveCounter.data.get()

if (objectState.tombstone) {
liveCounter.tombstone()
liveCounter.tombstone(serialTimestamp)
} else {
// override data for this object with data from the object state
liveCounter.createOperationIsMerged = false // RTLC6b
Expand All @@ -33,13 +33,13 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
}
}

return LiveCounterUpdate(liveCounter.data.get() - previousData)
return calculateUpdateFromDataDiff(previousData, liveCounter.data.get())
}

/**
* @spec RTLC7 - Applies operations to LiveCounter
*/
internal fun applyOperation(operation: ObjectOperation) {
internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) {
val update = when (operation.action) {
ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
ObjectOperationAction.CounterInc -> {
Expand All @@ -49,7 +49,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}")
}
}
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone()
ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp)
else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
}

Expand Down Expand Up @@ -85,6 +85,10 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
return LiveCounterUpdate(amount)
}

internal fun calculateUpdateFromDataDiff(prevData: Double, newData: Double): LiveCounterUpdate {
return LiveCounterUpdate(newData - prevData)
}

/**
* @spec RTLC10 - Merges initial data from create operation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ internal class DefaultLiveMap private constructor(

override fun unsubscribeAll() = liveMapManager.unsubscribeAll()

override fun applyObjectState(objectState: ObjectState): LiveMapUpdate {
return liveMapManager.applyState(objectState)
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate {
return liveMapManager.applyState(objectState, message.serialTimestamp)
}

override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
liveMapManager.applyOperation(operation, message.serial)
liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
}

override fun clearData(): LiveMapUpdate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
/**
* @spec RTLM6 - Overrides object data with state from sync
*/
internal fun applyState(objectState: ObjectState): LiveMapUpdate {
internal fun applyState(objectState: ObjectState, serialTimestamp: Long?): LiveMapUpdate {
val previousData = liveMap.data.toMap()

if (objectState.tombstone) {
liveMap.tombstone()
liveMap.tombstone(serialTimestamp)
} else {
// override data for this object with data from the object state
liveMap.createOperationIsMerged = false // RTLM6b
Expand All @@ -33,7 +33,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
objectState.map?.entries?.forEach { (key, entry) ->
liveMap.data[key] = LiveMapEntry(
isTombstoned = entry.tombstone ?: false,
tombstonedAt = if (entry.tombstone == true) System.currentTimeMillis() else null,
tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp ?: System.currentTimeMillis() else null,
timeserial = entry.timeserial,
data = entry.data
)
Expand All @@ -51,24 +51,24 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
/**
* @spec RTLM15 - Applies operations to LiveMap
*/
internal fun applyOperation(operation: ObjectOperation, messageTimeserial: String?) {
internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) {
val update = when (operation.action) {
ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1
ObjectOperationAction.MapSet -> {
if (operation.mapOp != null) {
applyMapSet(operation.mapOp, messageTimeserial) // RTLM15d2
applyMapSet(operation.mapOp, serial) // RTLM15d2
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
ObjectOperationAction.MapRemove -> {
if (operation.mapOp != null) {
applyMapRemove(operation.mapOp, messageTimeserial) // RTLM15d3
applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
ObjectOperationAction.ObjectDelete -> liveMap.tombstone()
ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp)
else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
}

Expand Down Expand Up @@ -132,7 +132,6 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
// RTLM7a2 - Replace existing entry with new one instead of mutating
liveMap.data[mapOp.key] = LiveMapEntry(
isTombstoned = false, // RTLM7a2c
tombstonedAt = null,
timeserial = timeSerial, // RTLM7a2b
data = mapOp.data // RTLM7a2a
)
Expand All @@ -154,6 +153,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
private fun applyMapRemove(
mapOp: ObjectMapOp, // RTLM8c1
timeSerial: String?, // RTLM8c2
timeStamp: Long?, // RTLM8c3
): LiveMapUpdate {
val existingEntry = liveMap.data[mapOp.key]

Expand All @@ -168,19 +168,28 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
return noOpMapUpdate
}

val tombstonedAt = if (timeStamp != null) timeStamp else {
Log.w(
tag,
"No timestamp provided for MAP_REMOVE op on key=\"${mapOp.key}\"; using current time as tombstone time; " +
"objectId=${objectId}"
)
System.currentTimeMillis()
}

if (existingEntry != null) {
// RTLM8a2 - Replace existing entry with new one instead of mutating
liveMap.data[mapOp.key] = LiveMapEntry(
isTombstoned = true, // RTLM8a2c
tombstonedAt = System.currentTimeMillis(),
tombstonedAt = tombstonedAt,
timeserial = timeSerial, // RTLM8a2b
data = null // RTLM8a2a
)
} else {
// RTLM8b, RTLM8b1
liveMap.data[mapOp.key] = LiveMapEntry(
isTombstoned = true, // RTLM8b2
tombstonedAt = System.currentTimeMillis(),
tombstonedAt = tombstonedAt,
timeserial = timeSerial
)
}
Expand Down Expand Up @@ -224,7 +233,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
val opTimeserial = entry.timeserial
val update = if (entry.tombstone == true) {
// RTLM17a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op
applyMapRemove(ObjectMapOp(key), opTimeserial)
applyMapRemove(ObjectMapOp(key), opTimeserial, entry.serialTimestamp)
} else {
// RTLM17a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op
applyMapSet(ObjectMapOp(key, entry.data), opTimeserial)
Expand Down
Loading
Loading