From 29d785472dbb161e6c456e84435a11c2a130abd6 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 23 May 2025 17:43:34 +0530 Subject: [PATCH 1/4] [ECO-5375] Refactored LiveObjects plugin to handle channelSerial --- .../ably/lib/objects/LiveObjectsAdapter.java | 36 +++++++++++++++++++ .../io/ably/lib/realtime/AblyRealtime.java | 7 ++-- .../ably/lib/transport/ConnectionManager.java | 8 +---- .../io/ably/lib/objects/DefaultLiveObjects.kt | 16 ++++++++- .../lib/objects/DefaultLiveObjectsPlugin.kt | 27 +++----------- .../kotlin/io/ably/lib/objects/Helpers.kt | 24 +++++++++++++ 6 files changed, 85 insertions(+), 33 deletions(-) create mode 100644 lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java create mode 100644 live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java new file mode 100644 index 000000000..0a3c68de1 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -0,0 +1,36 @@ +package io.ably.lib.objects; + +import io.ably.lib.plugins.PluginConnectionAdapter; +import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.util.Log; +import org.jetbrains.annotations.NotNull; + +public interface LiveObjectsAdapter extends PluginConnectionAdapter { + void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial); + + class Adapter implements LiveObjectsAdapter { + private final AblyRealtime ably; + private static final String TAG = LiveObjectsAdapter.class.getName(); + + public Adapter(@NotNull AblyRealtime ably) { + this.ably = ably; + } + + @Override + public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) { + if (ably.channels.containsKey(channelName)) { + ably.channels.get(channelName).properties.channelSerial = channelSerial; + } else { + Log.e(TAG, "setChannelSerial(): channel not found: " + channelName); + } + } + + @Override + public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { + ably.connection.connectionManager.send(msg, true, listener); + } + } +} diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 92e0fdbd8..3cfe16bd1 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -6,8 +6,8 @@ import java.util.List; import java.util.Map; +import io.ably.lib.objects.LiveObjectsAdapter; import io.ably.lib.objects.LiveObjectsPlugin; -import io.ably.lib.plugins.PluginConnectionAdapter; import io.ably.lib.rest.AblyRest; import io.ably.lib.rest.Auth; import io.ably.lib.transport.ConnectionManager; @@ -187,9 +187,10 @@ public interface Channels extends ReadOnlyMap { private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() { try { Class liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin"); + LiveObjectsAdapter adapter = new LiveObjectsAdapter.Adapter(this); return (LiveObjectsPlugin) liveObjectsImplementation - .getDeclaredConstructor(PluginConnectionAdapter.class) - .newInstance(this.connection.connectionManager); + .getDeclaredConstructor(LiveObjectsAdapter.class) + .newInstance(adapter); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e); diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index f6f20e9eb..ffe6e36f1 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -15,7 +15,6 @@ import io.ably.lib.debug.DebugOptions.RawProtocolListener; import io.ably.lib.http.HttpHelpers; import io.ably.lib.objects.LiveObjectsPlugin; -import io.ably.lib.plugins.PluginConnectionAdapter; import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.CompletionListener; @@ -37,7 +36,7 @@ import io.ably.lib.util.PlatformAgentProvider; import io.ably.lib.util.ReconnectionStrategy; -public class ConnectionManager implements ConnectListener, PluginConnectionAdapter { +public class ConnectionManager implements ConnectListener { final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); /************************************************************** @@ -1687,11 +1686,6 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) { } } - @Override - public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { - this.send(msg, true, listener); - } - public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException { State state; synchronized(this) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index 2fc70a2a9..ea88c5e99 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -1,8 +1,12 @@ package io.ably.lib.objects import io.ably.lib.types.Callback +import io.ably.lib.types.ProtocolMessage +import io.ably.lib.util.Log + +internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects { + private val tag = DefaultLiveObjects::class.simpleName -internal class DefaultLiveObjects(private val channelName: String): LiveObjects { override fun getRoot(): LiveMap { TODO("Not yet implemented") } @@ -43,6 +47,16 @@ internal class DefaultLiveObjects(private val channelName: String): LiveObjects TODO("Not yet implemented") } + fun handle(msg: ProtocolMessage) { + // RTL15b + msg.channelSerial?.let { + if (msg.action === ProtocolMessage.Action.`object`) { + Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}") + adapter.setChannelSerial(channelName, msg.channelSerial) + } + } + } + fun dispose() { // Dispose of any resources associated with this LiveObjects instance // For example, close any open connections or clean up references diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt index 277d4df31..e31002a89 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt @@ -1,36 +1,19 @@ package io.ably.lib.objects -import io.ably.lib.plugins.PluginConnectionAdapter -import io.ably.lib.realtime.CompletionListener -import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage -import kotlinx.coroutines.CompletableDeferred import java.util.concurrent.ConcurrentHashMap -public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin { +public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) : LiveObjectsPlugin { private val liveObjects = ConcurrentHashMap() override fun getInstance(channelName: String): LiveObjects { - return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) } + return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName, adapter) } } - public suspend fun send(message: ProtocolMessage) { - val deferred = CompletableDeferred() - pluginConnectionAdapter.send(message, object : CompletionListener { - override fun onSuccess() { - deferred.complete(Unit) - } - - override fun onError(reason: ErrorInfo) { - deferred.completeExceptionally(Exception(reason.message)) - } - }) - deferred.await() - } - - override fun handle(message: ProtocolMessage) { - TODO("Not yet implemented") + override fun handle(msg: ProtocolMessage) { + val channelName = msg.channel + liveObjects[channelName]?.handle(msg) } override fun dispose(channelName: String) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt new file mode 100644 index 000000000..f5259808f --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -0,0 +1,24 @@ +package io.ably.lib.objects + +import io.ably.lib.realtime.CompletionListener +import io.ably.lib.types.ErrorInfo +import io.ably.lib.types.ProtocolMessage +import kotlinx.coroutines.CompletableDeferred + +internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) { + val deferred = CompletableDeferred() + try { + this.send(message, object : CompletionListener { + override fun onSuccess() { + deferred.complete(Unit) + } + + override fun onError(reason: ErrorInfo) { + deferred.completeExceptionally(Exception(reason.message)) + } + }) + } catch (e: Exception) { + deferred.completeExceptionally(e) + } + deferred.await() +} From 375328e0fac7b670714e68fb2c25d6caea4bb32c Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 27 May 2025 17:09:55 +0530 Subject: [PATCH 2/4] [ECO-5375] Created ObjectMessage.kt, declared data classes as per spec --- .../java/io/ably/lib/objects/LiveMap.java | 8 +- .../ably/lib/objects/LiveObjectsAdapter.java | 1 + .../kotlin/io/ably/lib/objects/Helpers.kt | 7 + .../io/ably/lib/objects/ObjectMessage.kt | 317 ++++++++++++++++++ 4 files changed, 329 insertions(+), 4 deletions(-) create mode 100644 live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt diff --git a/lib/src/main/java/io/ably/lib/objects/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/LiveMap.java index 7ba4433f9..7a964dc90 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveMap.java @@ -18,11 +18,11 @@ public interface LiveMap { /** * Retrieves the value associated with the specified key. - * If this map object is tombstoned (deleted), `undefined` is returned. - * If no entry is associated with the specified key, `undefined` is returned. - * If map entry is tombstoned (deleted), `undefined` is returned. + * If this map object is tombstoned (deleted), null is returned. + * If no entry is associated with the specified key, null is returned. + * If map entry is tombstoned (deleted), null is returned. * If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject - * is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned. + * is returned, provided it exists in the local pool and is not tombstoned. Otherwise, null is returned. * If the value is not an objectId, then that value is returned. * * @param keyName the key whose associated value is to be returned. diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index 0a3c68de1..9ee842dd4 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -30,6 +30,7 @@ public void setChannelSerial(@NotNull String channelName, @NotNull String channe @Override public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { + // Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment ably.connection.connectionManager.send(msg, true, listener); } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index f5259808f..85a4d25fa 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -22,3 +22,10 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) { } deferred.await() } + +internal enum class MessageFormat(private val value: String) { + MSGPACK("msgpack"), + JSON("json"); + + override fun toString(): String = value +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt new file mode 100644 index 000000000..2c2d825f6 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt @@ -0,0 +1,317 @@ +package io.ably.lib.objects + +import java.nio.ByteBuffer + +/** + * An enum class representing the different actions that can be performed on an object. + * Spec: OOP2 + */ +internal enum class ObjectOperationAction(val code: Int) { + MAP_CREATE(0), + MAP_SET(1), + MAP_REMOVE(2), + COUNTER_CREATE(3), + COUNTER_INC(4), + OBJECT_DELETE(5); +} + +/** + * An enum class representing the conflict-resolution semantics used by a Map object. + * Spec: MAP2 + */ +internal enum class MapSemantics(val code: Int) { + LWW(0); +} + +/** + * An ObjectData represents a value in an object on a channel. + * Spec: OD1 + */ +internal data class ObjectData( + /** + * A reference to another object, used to support composable object structures. + * Spec: OD2a + */ + val objectId: String? = null, + + /** + * Can be set by the client to indicate that value in `string` or `bytes` field have an encoding. + * Spec: OD2b + */ + val encoding: String? = null, + + /** + * String, number, boolean or binary - a concrete value of the object + * Spec: OD2c + */ + val value: Any? = null, +) + +/** + * A MapOp describes an operation to be applied to a Map object. + * Spec: MOP1 + */ +internal data class MapOp( + /** + * The key of the map entry to which the operation should be applied. + * Spec: MOP2a + */ + val key: String, + + /** + * The data that the map entry should contain if the operation is a MAP_SET operation. + * Spec: MOP2b + */ + val data: ObjectData? = null +) + +/** + * A CounterOp describes an operation to be applied to a Counter object. + * Spec: COP1 + */ +internal data class CounterOp( + /** + * The data value that should be added to the counter + * Spec: COP2a + */ + val amount: Double +) + +/** + * A MapEntry represents the value at a given key in a Map object. + * Spec: ME1 + */ +internal data class MapEntry( + /** + * Indicates whether the map entry has been removed. + * Spec: ME2a + */ + val tombstone: Boolean? = null, + + /** + * The serial value of the last operation that was applied to the map entry. + * It is optional in a MAP_CREATE operation and might be missing, in which case the client should use a nullish value for it + * and treat it as the "earliest possible" serial for comparison purposes. + * Spec: ME2b + */ + val timeserial: String? = null, + + /** + * The data that represents the value of the map entry. + * Spec: ME2c + */ + val data: ObjectData? = null +) + +/** + * An ObjectMap object represents a map of key-value pairs. + * Spec: MAP1 + */ +internal data class ObjectMap( + /** + * The conflict-resolution semantics used by the map object. + * Spec: MAP3a + */ + val semantics: MapSemantics? = null, + + /** + * The map entries, indexed by key. + * Spec: MAP3b + */ + val entries: Map? = null +) + +/** + * An ObjectCounter object represents an incrementable and decrementable value + * Spec: CNT1 + */ +internal data class ObjectCounter( + /** + * The value of the counter + * Spec: CNT2a + */ + val count: Double? = null +) + +/** + * An ObjectOperation describes an operation to be applied to an object on a channel. + * Spec: OOP1 + */ +internal data class ObjectOperation( + /** + * Defines the operation to be applied to the object. + * Spec: OOP3a + */ + val action: ObjectOperationAction, + + /** + * The object ID of the object on a channel to which the operation should be applied. + * Spec: OOP3b + */ + val objectId: String, + + /** + * The payload for the operation if it is an operation on a Map object type. + * Spec: OOP3c + */ + val mapOp: MapOp? = null, + + /** + * The payload for the operation if it is an operation on a Counter object type. + * Spec: OOP3d + */ + val counterOp: CounterOp? = null, + + /** + * The payload for the operation if the operation is MAP_CREATE. + * Defines the initial value for the Map object. + * Spec: OOP3e + */ + val map: ObjectMap? = null, + + /** + * The payload for the operation if the operation is COUNTER_CREATE. + * Defines the initial value for the Counter object. + * Spec: OOP3f + */ + val counter: ObjectCounter? = null, + + /** + * The nonce, must be present on create operations. This is the random part + * that has been hashed with the type and initial value to create the object ID. + * Spec: OOP3g + */ + val nonce: String? = null, + + /** + * The initial value bytes for the object. These bytes should be used along with the nonce + * and timestamp to create the object ID. Frontdoor will use this to verify the object ID. + * After verification the bytes will be decoded into the Map or Counter objects and + * the initialValue, nonce, and initialValueEncoding will be removed. + * Spec: OOP3h + */ + val initialValue: ByteBuffer? = null, + + /** The initial value encoding defines how the initialValue should be interpreted. + * Spec: OOP3i + */ + val initialValueEncoding: MessageFormat? = null +) + +/** + * An ObjectState describes the instantaneous state of an object on a channel. + * Spec: OST1 + */ +internal data class ObjectState( + /** + * The identifier of the object. + * Spec: OST2a + */ + val objectId: String, + + /** + * A map of serials keyed by a {@link ObjectMessage.siteCode}, + * representing the last operations applied to this object + * Spec: OST2b + */ + val siteTimeserials: Map, + + /** + * True if the object has been tombstoned. + * Spec: OST2c + */ + val tombstone: Boolean, + + /** + * The operation that created the object. + * Can be missing if create operation for the object is not known at this point. + * Spec: OST2d + */ + val createOp: ObjectOperation? = null, + + /** + * The data that represents the result of applying all operations to a Map object + * excluding the initial value from the create operation if it is a Map object type. + * Spec: OST2e + */ + val map: ObjectMap? = null, + + /** + * The data that represents the result of applying all operations to a Counter object + * excluding the initial value from the create operation if it is a Counter object type. + * Spec: OST2f + */ + val counter: ObjectCounter? = null +) + +/** + * An @ObjectMessage@ represents an individual object message to be sent or received via the Ably Realtime service. + * Spec: OM1 + */ +internal data class ObjectMessage( + /** + * unique ID for this object message. This attribute is always populated for object messages received over REST. + * For object messages received over Realtime, if the object message does not contain an @id@, + * it should be set to @protocolMsgId:index@, where @protocolMsgId@ is the id of the @ProtocolMessage@ encapsulating it, + * and @index@ is the index of the object message inside the @state@ array of the @ProtocolMessage@ + * Spec: OM2a + */ + val id: String? = null, + + /** + * time in milliseconds since epoch. If an object message received from Ably does not contain a @timestamp@, + * it should be set to the @timestamp@ of the encapsulating @ProtocolMessage@ + * Spec: OM2e + */ + val timestamp: Long? = null, + + /** + * Spec: OM2b + */ + val clientId: String? = null, + + /** + * If an object message received from Ably does not contain a @connectionId@, + * it should be set to the @connectionId@ of the encapsulating @ProtocolMessage@ + * Spec: OM2c + */ + val connectionId: String? = null, + + /** + * JSON-encodable object, used to contain any arbitrary key value pairs which may also contain other primitive JSON types, + * JSON-encodable objects or JSON-encodable arrays. The @extras@ field is provided to contain message metadata and/or + * ancillary payloads in support of specific functionality. For 3.1 no specific functionality is specified for + * @extras@ in object messages. Unless otherwise specified, the client library should not attempt to do any filtering + * or validation of the @extras@ field itself, but should treat it opaquely, encoding it and passing it to realtime unaltered + * Spec: OM2d + */ + val extras: Any? = null, + + /** + * Describes an operation to be applied to an object. + * Mutually exclusive with the `object` field. This field is only set on object messages if the `action` field of the + * `ProtocolMessage` encapsulating it is `OBJECT`. + * Spec: OM2f + */ + val operation: ObjectOperation? = null, + + /** + * Describes the instantaneous state of an object. + * Mutually exclusive with the `operation` field. This field is only set on object messages if the `action` field of + * the `ProtocolMessage` encapsulating it is `OBJECT_SYNC`. + * Spec: OM2g + */ + val `object`: ObjectState? = null, + + /** + * An opaque string that uniquely identifies this object message. + * Spec: OM2h + */ + val serial: String? = null, + + /** + * An opaque string used as a key to update the map of serial values on an object. + * Spec: OM2i + */ + val siteCode: String? = null +) From 431cfe7b1cdcc701bf193df6f3f7f6ab2ea73166 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 28 May 2025 22:01:03 +0530 Subject: [PATCH 3/4] [ECO-5375] Updated code as per review comments 1. Updated enum ObjectOperationAction with PascalCase values 2. Created separate file for adapter that extends LiveObjectsAdapter 3. Added custom Binary type to handle ByteArray values --- .../java/io/ably/lib/objects/Adapter.java | 32 +++++++++++++ .../ably/lib/objects/LiveObjectsAdapter.java | 46 ++++++++----------- .../ably/lib/objects/LiveObjectsPlugin.java | 18 +++++++- .../lib/plugins/PluginConnectionAdapter.java | 25 ---------- .../io/ably/lib/plugins/PluginInstance.java | 25 ---------- .../io/ably/lib/realtime/AblyRealtime.java | 3 +- .../kotlin/io/ably/lib/objects/Helpers.kt | 18 ++++++-- .../io/ably/lib/objects/ObjectMessage.kt | 20 ++++---- 8 files changed, 92 insertions(+), 95 deletions(-) create mode 100644 lib/src/main/java/io/ably/lib/objects/Adapter.java delete mode 100644 lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java delete mode 100644 lib/src/main/java/io/ably/lib/plugins/PluginInstance.java diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java new file mode 100644 index 000000000..a9e00beeb --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -0,0 +1,32 @@ +package io.ably.lib.objects; + +import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.util.Log; +import org.jetbrains.annotations.NotNull; + +public class Adapter implements LiveObjectsAdapter { + private final AblyRealtime ably; + private static final String TAG = LiveObjectsAdapter.class.getName(); + + public Adapter(@NotNull AblyRealtime ably) { + this.ably = ably; + } + + @Override + public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) { + if (ably.channels.containsKey(channelName)) { + ably.channels.get(channelName).properties.channelSerial = channelSerial; + } else { + Log.e(TAG, "setChannelSerial(): channel not found: " + channelName); + } + } + + @Override + public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { + // Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment + ably.connection.connectionManager.send(msg, true, listener); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index 9ee842dd4..c6040c1b0 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -1,37 +1,27 @@ package io.ably.lib.objects; -import io.ably.lib.plugins.PluginConnectionAdapter; -import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.AblyException; import io.ably.lib.types.ProtocolMessage; -import io.ably.lib.util.Log; import org.jetbrains.annotations.NotNull; -public interface LiveObjectsAdapter extends PluginConnectionAdapter { - void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial); - - class Adapter implements LiveObjectsAdapter { - private final AblyRealtime ably; - private static final String TAG = LiveObjectsAdapter.class.getName(); - - public Adapter(@NotNull AblyRealtime ably) { - this.ably = ably; - } +public interface LiveObjectsAdapter { + /** + * Sends a protocol message to its intended recipient. + * This method transmits a protocol message, allowing for queuing events if necessary, + * and notifies the provided listener upon the success or failure of the send operation. + * + * @param msg the protocol message to send. + * @param listener a listener to be notified of the success or failure of the send operation. + * @throws AblyException if an error occurs during the send operation. + */ + void send(ProtocolMessage msg, CompletionListener listener) throws AblyException; - @Override - public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) { - if (ably.channels.containsKey(channelName)) { - ably.channels.get(channelName).properties.channelSerial = channelSerial; - } else { - Log.e(TAG, "setChannelSerial(): channel not found: " + channelName); - } - } - - @Override - public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { - // Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment - ably.connection.connectionManager.send(msg, true, listener); - } - } + /** + * Sets the channel serial for a specific channel. + * @param channelName the name of the channel for which to set the serial + * @param channelSerial the serial to set for the channel + */ + void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial); } + diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java index cad3e9f59..171a90347 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java @@ -1,6 +1,6 @@ package io.ably.lib.objects; -import io.ably.lib.plugins.PluginInstance; +import io.ably.lib.types.ProtocolMessage; import org.jetbrains.annotations.NotNull; /** @@ -8,7 +8,7 @@ * live data objects in a real-time environment. It allows for the retrieval, disposal, and * management of LiveObjects instances associated with specific channel names. */ -public interface LiveObjectsPlugin extends PluginInstance { +public interface LiveObjectsPlugin { /** * Retrieves an instance of LiveObjects associated with the specified channel name. @@ -21,6 +21,15 @@ public interface LiveObjectsPlugin extends PluginInstance { @NotNull LiveObjects getInstance(@NotNull String channelName); + /** + * Handles a protocol message. + * This method is invoked whenever a protocol message is received, allowing the implementation + * to process the message and take appropriate actions. + * + * @param message the protocol message to handle. + */ + void handle(@NotNull ProtocolMessage message); + /** * Disposes of the LiveObjects instance associated with the specified channel name. * This method removes the LiveObjects instance for the given channel, releasing any @@ -29,4 +38,9 @@ public interface LiveObjectsPlugin extends PluginInstance { * @param channelName the name of the channel whose LiveObjects instance is to be removed. */ void dispose(@NotNull String channelName); + + /** + * Disposes of the plugin instance and all underlying resources. + */ + void dispose(); } diff --git a/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java b/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java deleted file mode 100644 index 5283d2120..000000000 --- a/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.ably.lib.plugins; - -import io.ably.lib.realtime.CompletionListener; -import io.ably.lib.types.AblyException; -import io.ably.lib.types.ProtocolMessage; - -/** - * The PluginConnectionAdapter interface defines a contract for managing real-time communication - * between plugins and the Ably Realtime system. Implementations of this interface are responsible - * for sending protocol messages to their intended recipients, optionally queuing events, and - * notifying listeners of the operation's outcome. - */ -public interface PluginConnectionAdapter { - - /** - * Sends a protocol message to its intended recipient. - * This method transmits a protocol message, allowing for queuing events if necessary, - * and notifies the provided listener upon the success or failure of the send operation. - * - * @param msg the protocol message to send. - * @param listener a listener to be notified of the success or failure of the send operation. - * @throws AblyException if an error occurs during the send operation. - */ - void send(ProtocolMessage msg, CompletionListener listener) throws AblyException; -} diff --git a/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java b/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java deleted file mode 100644 index 23055f901..000000000 --- a/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.ably.lib.plugins; - -import io.ably.lib.types.ProtocolMessage; -import org.jetbrains.annotations.NotNull; - -/** - * The ProtocolMessageHandler interface defines a contract for handling protocol messages. - * Implementations of this interface are responsible for processing incoming protocol messages - * and performing the necessary actions based on the message content. - */ -public interface PluginInstance { - /** - * Handles a protocol message. - * This method is invoked whenever a protocol message is received, allowing the implementation - * to process the message and take appropriate actions. - * - * @param message the protocol message to handle. - */ - void handle(@NotNull ProtocolMessage message); - - /** - * Disposes of the plugin instance and all underlying resources. - */ - void dispose(); -} diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 3cfe16bd1..a933a7f62 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; +import io.ably.lib.objects.Adapter; import io.ably.lib.objects.LiveObjectsAdapter; import io.ably.lib.objects.LiveObjectsPlugin; import io.ably.lib.rest.AblyRest; @@ -187,7 +188,7 @@ public interface Channels extends ReadOnlyMap { private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() { try { Class liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin"); - LiveObjectsAdapter adapter = new LiveObjectsAdapter.Adapter(this); + LiveObjectsAdapter adapter = new Adapter(this); return (LiveObjectsPlugin) liveObjectsImplementation .getDeclaredConstructor(LiveObjectsAdapter.class) .newInstance(adapter); diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 85a4d25fa..63501106b 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -23,9 +23,21 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) { deferred.await() } -internal enum class MessageFormat(private val value: String) { - MSGPACK("msgpack"), - JSON("json"); +internal enum class ProtocolMessageFormat(private val value: String) { + Msgpack("msgpack"), + Json("json"); override fun toString(): String = value } + +internal class Binary(val data: ByteArray?) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is Binary) return false + return data?.contentEquals(other.data) == true + } + + override fun hashCode(): Int { + return data?.contentHashCode() ?: 0 + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt index 2c2d825f6..5bb75582e 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt @@ -1,18 +1,16 @@ package io.ably.lib.objects -import java.nio.ByteBuffer - /** * An enum class representing the different actions that can be performed on an object. * Spec: OOP2 */ internal enum class ObjectOperationAction(val code: Int) { - MAP_CREATE(0), - MAP_SET(1), - MAP_REMOVE(2), - COUNTER_CREATE(3), - COUNTER_INC(4), - OBJECT_DELETE(5); + MapCreate(0), + MapSet(1), + MapRemove(2), + CounterCreate(3), + CounterInc(4), + ObjectDelete(5); } /** @@ -190,12 +188,12 @@ internal data class ObjectOperation( * the initialValue, nonce, and initialValueEncoding will be removed. * Spec: OOP3h */ - val initialValue: ByteBuffer? = null, + val initialValue: Binary? = null, /** The initial value encoding defines how the initialValue should be interpreted. * Spec: OOP3i */ - val initialValueEncoding: MessageFormat? = null + val initialValueEncoding: ProtocolMessageFormat? = null ) /** @@ -301,7 +299,7 @@ internal data class ObjectMessage( * the `ProtocolMessage` encapsulating it is `OBJECT_SYNC`. * Spec: OM2g */ - val `object`: ObjectState? = null, + val objectState: ObjectState? = null, /** * An opaque string that uniquely identifies this object message. From 4dfb1a0b9abc62e5f5d17a2358d1ce39a020153b Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 5 Jun 2025 11:14:34 +0530 Subject: [PATCH 4/4] [ECO-5375] Updated liveObjectsAdapter sendAsync method to use suspendCancellableCoroutine --- .../main/kotlin/io/ably/lib/objects/Helpers.kt | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 63501106b..9a6606f5c 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -1,26 +1,27 @@ package io.ably.lib.objects import io.ably.lib.realtime.CompletionListener +import io.ably.lib.types.AblyException import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage -import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException -internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) { - val deferred = CompletableDeferred() +internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation -> try { this.send(message, object : CompletionListener { override fun onSuccess() { - deferred.complete(Unit) + continuation.resume(Unit) } override fun onError(reason: ErrorInfo) { - deferred.completeExceptionally(Exception(reason.message)) + continuation.resumeWithException(AblyException.fromErrorInfo(reason)) } }) } catch (e: Exception) { - deferred.completeExceptionally(e) + continuation.resumeWithException(e) } - deferred.await() } internal enum class ProtocolMessageFormat(private val value: String) {