diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java new file mode 100644 index 000000000..a9e00beeb --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -0,0 +1,32 @@ +package io.ably.lib.objects; + +import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.util.Log; +import org.jetbrains.annotations.NotNull; + +public class Adapter implements LiveObjectsAdapter { + private final AblyRealtime ably; + private static final String TAG = LiveObjectsAdapter.class.getName(); + + public Adapter(@NotNull AblyRealtime ably) { + this.ably = ably; + } + + @Override + public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) { + if (ably.channels.containsKey(channelName)) { + ably.channels.get(channelName).properties.channelSerial = channelSerial; + } else { + Log.e(TAG, "setChannelSerial(): channel not found: " + channelName); + } + } + + @Override + public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { + // Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment + ably.connection.connectionManager.send(msg, true, listener); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/LiveMap.java index 7ba4433f9..7a964dc90 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveMap.java @@ -18,11 +18,11 @@ public interface LiveMap { /** * Retrieves the value associated with the specified key. - * If this map object is tombstoned (deleted), `undefined` is returned. - * If no entry is associated with the specified key, `undefined` is returned. - * If map entry is tombstoned (deleted), `undefined` is returned. + * If this map object is tombstoned (deleted), null is returned. + * If no entry is associated with the specified key, null is returned. + * If map entry is tombstoned (deleted), null is returned. * If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject - * is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned. + * is returned, provided it exists in the local pool and is not tombstoned. Otherwise, null is returned. * If the value is not an objectId, then that value is returned. * * @param keyName the key whose associated value is to be returned. diff --git a/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java similarity index 62% rename from lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java rename to lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java index 5283d2120..c6040c1b0 100644 --- a/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java @@ -1,17 +1,11 @@ -package io.ably.lib.plugins; +package io.ably.lib.objects; import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.AblyException; import io.ably.lib.types.ProtocolMessage; +import org.jetbrains.annotations.NotNull; -/** - * The PluginConnectionAdapter interface defines a contract for managing real-time communication - * between plugins and the Ably Realtime system. Implementations of this interface are responsible - * for sending protocol messages to their intended recipients, optionally queuing events, and - * notifying listeners of the operation's outcome. - */ -public interface PluginConnectionAdapter { - +public interface LiveObjectsAdapter { /** * Sends a protocol message to its intended recipient. * This method transmits a protocol message, allowing for queuing events if necessary, @@ -22,4 +16,12 @@ public interface PluginConnectionAdapter { * @throws AblyException if an error occurs during the send operation. */ void send(ProtocolMessage msg, CompletionListener listener) throws AblyException; + + /** + * Sets the channel serial for a specific channel. + * @param channelName the name of the channel for which to set the serial + * @param channelSerial the serial to set for the channel + */ + void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial); } + diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java index cad3e9f59..171a90347 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java @@ -1,6 +1,6 @@ package io.ably.lib.objects; -import io.ably.lib.plugins.PluginInstance; +import io.ably.lib.types.ProtocolMessage; import org.jetbrains.annotations.NotNull; /** @@ -8,7 +8,7 @@ * live data objects in a real-time environment. It allows for the retrieval, disposal, and * management of LiveObjects instances associated with specific channel names. */ -public interface LiveObjectsPlugin extends PluginInstance { +public interface LiveObjectsPlugin { /** * Retrieves an instance of LiveObjects associated with the specified channel name. @@ -21,6 +21,15 @@ public interface LiveObjectsPlugin extends PluginInstance { @NotNull LiveObjects getInstance(@NotNull String channelName); + /** + * Handles a protocol message. + * This method is invoked whenever a protocol message is received, allowing the implementation + * to process the message and take appropriate actions. + * + * @param message the protocol message to handle. + */ + void handle(@NotNull ProtocolMessage message); + /** * Disposes of the LiveObjects instance associated with the specified channel name. * This method removes the LiveObjects instance for the given channel, releasing any @@ -29,4 +38,9 @@ public interface LiveObjectsPlugin extends PluginInstance { * @param channelName the name of the channel whose LiveObjects instance is to be removed. */ void dispose(@NotNull String channelName); + + /** + * Disposes of the plugin instance and all underlying resources. + */ + void dispose(); } diff --git a/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java b/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java deleted file mode 100644 index 23055f901..000000000 --- a/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.ably.lib.plugins; - -import io.ably.lib.types.ProtocolMessage; -import org.jetbrains.annotations.NotNull; - -/** - * The ProtocolMessageHandler interface defines a contract for handling protocol messages. - * Implementations of this interface are responsible for processing incoming protocol messages - * and performing the necessary actions based on the message content. - */ -public interface PluginInstance { - /** - * Handles a protocol message. - * This method is invoked whenever a protocol message is received, allowing the implementation - * to process the message and take appropriate actions. - * - * @param message the protocol message to handle. - */ - void handle(@NotNull ProtocolMessage message); - - /** - * Disposes of the plugin instance and all underlying resources. - */ - void dispose(); -} diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 92e0fdbd8..a933a7f62 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -6,8 +6,9 @@ import java.util.List; import java.util.Map; +import io.ably.lib.objects.Adapter; +import io.ably.lib.objects.LiveObjectsAdapter; import io.ably.lib.objects.LiveObjectsPlugin; -import io.ably.lib.plugins.PluginConnectionAdapter; import io.ably.lib.rest.AblyRest; import io.ably.lib.rest.Auth; import io.ably.lib.transport.ConnectionManager; @@ -187,9 +188,10 @@ public interface Channels extends ReadOnlyMap { private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() { try { Class liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin"); + LiveObjectsAdapter adapter = new Adapter(this); return (LiveObjectsPlugin) liveObjectsImplementation - .getDeclaredConstructor(PluginConnectionAdapter.class) - .newInstance(this.connection.connectionManager); + .getDeclaredConstructor(LiveObjectsAdapter.class) + .newInstance(adapter); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e); diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index f6f20e9eb..ffe6e36f1 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -15,7 +15,6 @@ import io.ably.lib.debug.DebugOptions.RawProtocolListener; import io.ably.lib.http.HttpHelpers; import io.ably.lib.objects.LiveObjectsPlugin; -import io.ably.lib.plugins.PluginConnectionAdapter; import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.CompletionListener; @@ -37,7 +36,7 @@ import io.ably.lib.util.PlatformAgentProvider; import io.ably.lib.util.ReconnectionStrategy; -public class ConnectionManager implements ConnectListener, PluginConnectionAdapter { +public class ConnectionManager implements ConnectListener { final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); /************************************************************** @@ -1687,11 +1686,6 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) { } } - @Override - public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { - this.send(msg, true, listener); - } - public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException { State state; synchronized(this) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index 2fc70a2a9..ea88c5e99 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -1,8 +1,12 @@ package io.ably.lib.objects import io.ably.lib.types.Callback +import io.ably.lib.types.ProtocolMessage +import io.ably.lib.util.Log + +internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects { + private val tag = DefaultLiveObjects::class.simpleName -internal class DefaultLiveObjects(private val channelName: String): LiveObjects { override fun getRoot(): LiveMap { TODO("Not yet implemented") } @@ -43,6 +47,16 @@ internal class DefaultLiveObjects(private val channelName: String): LiveObjects TODO("Not yet implemented") } + fun handle(msg: ProtocolMessage) { + // RTL15b + msg.channelSerial?.let { + if (msg.action === ProtocolMessage.Action.`object`) { + Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}") + adapter.setChannelSerial(channelName, msg.channelSerial) + } + } + } + fun dispose() { // Dispose of any resources associated with this LiveObjects instance // For example, close any open connections or clean up references diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt index 277d4df31..e31002a89 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt @@ -1,36 +1,19 @@ package io.ably.lib.objects -import io.ably.lib.plugins.PluginConnectionAdapter -import io.ably.lib.realtime.CompletionListener -import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage -import kotlinx.coroutines.CompletableDeferred import java.util.concurrent.ConcurrentHashMap -public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin { +public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) : LiveObjectsPlugin { private val liveObjects = ConcurrentHashMap() override fun getInstance(channelName: String): LiveObjects { - return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) } + return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName, adapter) } } - public suspend fun send(message: ProtocolMessage) { - val deferred = CompletableDeferred() - pluginConnectionAdapter.send(message, object : CompletionListener { - override fun onSuccess() { - deferred.complete(Unit) - } - - override fun onError(reason: ErrorInfo) { - deferred.completeExceptionally(Exception(reason.message)) - } - }) - deferred.await() - } - - override fun handle(message: ProtocolMessage) { - TODO("Not yet implemented") + override fun handle(msg: ProtocolMessage) { + val channelName = msg.channel + liveObjects[channelName]?.handle(msg) } override fun dispose(channelName: String) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt new file mode 100644 index 000000000..9a6606f5c --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -0,0 +1,44 @@ +package io.ably.lib.objects + +import io.ably.lib.realtime.CompletionListener +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import io.ably.lib.types.ProtocolMessage +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation -> + try { + this.send(message, object : CompletionListener { + override fun onSuccess() { + continuation.resume(Unit) + } + + override fun onError(reason: ErrorInfo) { + continuation.resumeWithException(AblyException.fromErrorInfo(reason)) + } + }) + } catch (e: Exception) { + continuation.resumeWithException(e) + } +} + +internal enum class ProtocolMessageFormat(private val value: String) { + Msgpack("msgpack"), + Json("json"); + + override fun toString(): String = value +} + +internal class Binary(val data: ByteArray?) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is Binary) return false + return data?.contentEquals(other.data) == true + } + + override fun hashCode(): Int { + return data?.contentHashCode() ?: 0 + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt new file mode 100644 index 000000000..5bb75582e --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt @@ -0,0 +1,315 @@ +package io.ably.lib.objects + +/** + * An enum class representing the different actions that can be performed on an object. + * Spec: OOP2 + */ +internal enum class ObjectOperationAction(val code: Int) { + MapCreate(0), + MapSet(1), + MapRemove(2), + CounterCreate(3), + CounterInc(4), + ObjectDelete(5); +} + +/** + * An enum class representing the conflict-resolution semantics used by a Map object. + * Spec: MAP2 + */ +internal enum class MapSemantics(val code: Int) { + LWW(0); +} + +/** + * An ObjectData represents a value in an object on a channel. + * Spec: OD1 + */ +internal data class ObjectData( + /** + * A reference to another object, used to support composable object structures. + * Spec: OD2a + */ + val objectId: String? = null, + + /** + * Can be set by the client to indicate that value in `string` or `bytes` field have an encoding. + * Spec: OD2b + */ + val encoding: String? = null, + + /** + * String, number, boolean or binary - a concrete value of the object + * Spec: OD2c + */ + val value: Any? = null, +) + +/** + * A MapOp describes an operation to be applied to a Map object. + * Spec: MOP1 + */ +internal data class MapOp( + /** + * The key of the map entry to which the operation should be applied. + * Spec: MOP2a + */ + val key: String, + + /** + * The data that the map entry should contain if the operation is a MAP_SET operation. + * Spec: MOP2b + */ + val data: ObjectData? = null +) + +/** + * A CounterOp describes an operation to be applied to a Counter object. + * Spec: COP1 + */ +internal data class CounterOp( + /** + * The data value that should be added to the counter + * Spec: COP2a + */ + val amount: Double +) + +/** + * A MapEntry represents the value at a given key in a Map object. + * Spec: ME1 + */ +internal data class MapEntry( + /** + * Indicates whether the map entry has been removed. + * Spec: ME2a + */ + val tombstone: Boolean? = null, + + /** + * The serial value of the last operation that was applied to the map entry. + * It is optional in a MAP_CREATE operation and might be missing, in which case the client should use a nullish value for it + * and treat it as the "earliest possible" serial for comparison purposes. + * Spec: ME2b + */ + val timeserial: String? = null, + + /** + * The data that represents the value of the map entry. + * Spec: ME2c + */ + val data: ObjectData? = null +) + +/** + * An ObjectMap object represents a map of key-value pairs. + * Spec: MAP1 + */ +internal data class ObjectMap( + /** + * The conflict-resolution semantics used by the map object. + * Spec: MAP3a + */ + val semantics: MapSemantics? = null, + + /** + * The map entries, indexed by key. + * Spec: MAP3b + */ + val entries: Map? = null +) + +/** + * An ObjectCounter object represents an incrementable and decrementable value + * Spec: CNT1 + */ +internal data class ObjectCounter( + /** + * The value of the counter + * Spec: CNT2a + */ + val count: Double? = null +) + +/** + * An ObjectOperation describes an operation to be applied to an object on a channel. + * Spec: OOP1 + */ +internal data class ObjectOperation( + /** + * Defines the operation to be applied to the object. + * Spec: OOP3a + */ + val action: ObjectOperationAction, + + /** + * The object ID of the object on a channel to which the operation should be applied. + * Spec: OOP3b + */ + val objectId: String, + + /** + * The payload for the operation if it is an operation on a Map object type. + * Spec: OOP3c + */ + val mapOp: MapOp? = null, + + /** + * The payload for the operation if it is an operation on a Counter object type. + * Spec: OOP3d + */ + val counterOp: CounterOp? = null, + + /** + * The payload for the operation if the operation is MAP_CREATE. + * Defines the initial value for the Map object. + * Spec: OOP3e + */ + val map: ObjectMap? = null, + + /** + * The payload for the operation if the operation is COUNTER_CREATE. + * Defines the initial value for the Counter object. + * Spec: OOP3f + */ + val counter: ObjectCounter? = null, + + /** + * The nonce, must be present on create operations. This is the random part + * that has been hashed with the type and initial value to create the object ID. + * Spec: OOP3g + */ + val nonce: String? = null, + + /** + * The initial value bytes for the object. These bytes should be used along with the nonce + * and timestamp to create the object ID. Frontdoor will use this to verify the object ID. + * After verification the bytes will be decoded into the Map or Counter objects and + * the initialValue, nonce, and initialValueEncoding will be removed. + * Spec: OOP3h + */ + val initialValue: Binary? = null, + + /** The initial value encoding defines how the initialValue should be interpreted. + * Spec: OOP3i + */ + val initialValueEncoding: ProtocolMessageFormat? = null +) + +/** + * An ObjectState describes the instantaneous state of an object on a channel. + * Spec: OST1 + */ +internal data class ObjectState( + /** + * The identifier of the object. + * Spec: OST2a + */ + val objectId: String, + + /** + * A map of serials keyed by a {@link ObjectMessage.siteCode}, + * representing the last operations applied to this object + * Spec: OST2b + */ + val siteTimeserials: Map, + + /** + * True if the object has been tombstoned. + * Spec: OST2c + */ + val tombstone: Boolean, + + /** + * The operation that created the object. + * Can be missing if create operation for the object is not known at this point. + * Spec: OST2d + */ + val createOp: ObjectOperation? = null, + + /** + * The data that represents the result of applying all operations to a Map object + * excluding the initial value from the create operation if it is a Map object type. + * Spec: OST2e + */ + val map: ObjectMap? = null, + + /** + * The data that represents the result of applying all operations to a Counter object + * excluding the initial value from the create operation if it is a Counter object type. + * Spec: OST2f + */ + val counter: ObjectCounter? = null +) + +/** + * An @ObjectMessage@ represents an individual object message to be sent or received via the Ably Realtime service. + * Spec: OM1 + */ +internal data class ObjectMessage( + /** + * unique ID for this object message. This attribute is always populated for object messages received over REST. + * For object messages received over Realtime, if the object message does not contain an @id@, + * it should be set to @protocolMsgId:index@, where @protocolMsgId@ is the id of the @ProtocolMessage@ encapsulating it, + * and @index@ is the index of the object message inside the @state@ array of the @ProtocolMessage@ + * Spec: OM2a + */ + val id: String? = null, + + /** + * time in milliseconds since epoch. If an object message received from Ably does not contain a @timestamp@, + * it should be set to the @timestamp@ of the encapsulating @ProtocolMessage@ + * Spec: OM2e + */ + val timestamp: Long? = null, + + /** + * Spec: OM2b + */ + val clientId: String? = null, + + /** + * If an object message received from Ably does not contain a @connectionId@, + * it should be set to the @connectionId@ of the encapsulating @ProtocolMessage@ + * Spec: OM2c + */ + val connectionId: String? = null, + + /** + * JSON-encodable object, used to contain any arbitrary key value pairs which may also contain other primitive JSON types, + * JSON-encodable objects or JSON-encodable arrays. The @extras@ field is provided to contain message metadata and/or + * ancillary payloads in support of specific functionality. For 3.1 no specific functionality is specified for + * @extras@ in object messages. Unless otherwise specified, the client library should not attempt to do any filtering + * or validation of the @extras@ field itself, but should treat it opaquely, encoding it and passing it to realtime unaltered + * Spec: OM2d + */ + val extras: Any? = null, + + /** + * Describes an operation to be applied to an object. + * Mutually exclusive with the `object` field. This field is only set on object messages if the `action` field of the + * `ProtocolMessage` encapsulating it is `OBJECT`. + * Spec: OM2f + */ + val operation: ObjectOperation? = null, + + /** + * Describes the instantaneous state of an object. + * Mutually exclusive with the `operation` field. This field is only set on object messages if the `action` field of + * the `ProtocolMessage` encapsulating it is `OBJECT_SYNC`. + * Spec: OM2g + */ + val objectState: ObjectState? = null, + + /** + * An opaque string that uniquely identifies this object message. + * Spec: OM2h + */ + val serial: String? = null, + + /** + * An opaque string used as a key to update the map of serial values on an object. + * Spec: OM2i + */ + val siteCode: String? = null +)