diff --git a/android/build.gradle.kts b/android/build.gradle.kts index b4a5071d7..fb70f02e0 100644 --- a/android/build.gradle.kts +++ b/android/build.gradle.kts @@ -51,6 +51,7 @@ android { dependencies { api(libs.gson) implementation(libs.bundles.common) + compileOnly(libs.jetbrains) testImplementation(libs.bundles.tests) implementation(project(":network-client-core")) runtimeOnly(project(":network-client-default")) diff --git a/android/src/main/java/io/ably/lib/realtime/Channel.java b/android/src/main/java/io/ably/lib/realtime/Channel.java index daafe5188..baf086cbc 100644 --- a/android/src/main/java/io/ably/lib/realtime/Channel.java +++ b/android/src/main/java/io/ably/lib/realtime/Channel.java @@ -3,6 +3,8 @@ import io.ably.lib.types.AblyException; import io.ably.lib.types.ChannelOptions; import io.ably.lib.push.PushChannel; +import io.ably.lib.objects.LiveObjectsPlugin; + public class Channel extends ChannelBase { /** @@ -12,8 +14,8 @@ public class Channel extends ChannelBase { */ public final PushChannel push; - Channel(AblyRealtime ably, String name, ChannelOptions options) throws AblyException { - super(ably, name, options); + Channel(AblyRealtime ably, String name, ChannelOptions options, LiveObjectsPlugin liveObjectsPlugin) throws AblyException { + super(ably, name, options, liveObjectsPlugin); this.push = ((io.ably.lib.rest.AblyRest) ably).channels.get(name, options).push; } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0ebca67ac..f1e77a7c5 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -22,6 +22,7 @@ test-retry = "1.6.0" kotlin = "2.1.10" coroutine = "1.9.0" turbine = "1.2.0" +jetbrains-annoations = "26.0.2" [libraries] gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" } @@ -47,6 +48,7 @@ okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhtt coroutine-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutine" } coroutine-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutine" } turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine" } +jetbrains = { group = "org.jetbrains", name = "annotations", version.ref = "jetbrains-annoations" } [bundles] common = ["msgpack", "vcdiff-core"] diff --git a/java/build.gradle.kts b/java/build.gradle.kts index 45b0c4e39..33c89a2f8 100644 --- a/java/build.gradle.kts +++ b/java/build.gradle.kts @@ -20,6 +20,7 @@ tasks.withType { dependencies { api(libs.gson) implementation(libs.bundles.common) + compileOnly(libs.jetbrains) implementation(project(":network-client-core")) if (findProperty("okhttp") == null) { runtimeOnly(project(":network-client-default")) diff --git a/java/src/main/java/io/ably/lib/realtime/Channel.java b/java/src/main/java/io/ably/lib/realtime/Channel.java index 9c7f64995..b48c929b1 100644 --- a/java/src/main/java/io/ably/lib/realtime/Channel.java +++ b/java/src/main/java/io/ably/lib/realtime/Channel.java @@ -1,11 +1,12 @@ package io.ably.lib.realtime; +import io.ably.lib.objects.LiveObjectsPlugin; import io.ably.lib.types.AblyException; import io.ably.lib.types.ChannelOptions; public class Channel extends ChannelBase { - Channel(AblyRealtime ably, String name, ChannelOptions options) throws AblyException { - super(ably, name, options); + Channel(AblyRealtime ably, String name, ChannelOptions options, LiveObjectsPlugin liveObjectsPlugin) throws AblyException { + super(ably, name, options, liveObjectsPlugin); } public interface MessageListener extends ChannelBase.MessageListener {} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java b/lib/src/main/java/io/ably/lib/objects/LiveCounter.java new file mode 100644 index 000000000..fd44b853c --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/LiveCounter.java @@ -0,0 +1,62 @@ +package io.ably.lib.objects; + +import io.ably.lib.types.Callback; +import org.jetbrains.annotations.Blocking; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Contract; + +/** + * The LiveCounter interface provides methods to interact with a live counter. + * It allows incrementing, decrementing, and retrieving the current value of the counter, + * both synchronously and asynchronously. + */ +public interface LiveCounter { + + /** + * Increments the value of the counter by 1. + * Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object. + * This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when + * the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + */ + @Blocking + void increment(); + + /** + * Increments the value of the counter by 1 asynchronously. + * Send a COUNTER_INC operation to the realtime system to increment a value on this LiveCounter object. + * This does not modify the underlying data of this LiveCounter object. Instead, the change will be applied when + * the published COUNTER_INC operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @param callback the callback to be invoked upon completion of the operation. + */ + @NonBlocking + void incrementAsync(@NotNull Callback callback); + + /** + * Decrements the value of the counter by 1. + * An alias for calling {@link LiveCounter#increment()} with a negative amount. + */ + @Blocking + void decrement(); + + /** + * Decrements the value of the counter by 1 asynchronously. + * An alias for calling {@link LiveCounter#increment()} with a negative amount. + * + * @param callback the callback to be invoked upon completion of the operation. + */ + @NonBlocking + void decrementAsync(@NotNull Callback callback); + + /** + * Retrieves the current value of the counter. + * + * @return the current value of the counter as a Long. + */ + @NotNull + @Contract(pure = true) // Indicates this method does not modify the state of the object. + Long value(); +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/LiveMap.java new file mode 100644 index 000000000..7ba4433f9 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/LiveMap.java @@ -0,0 +1,121 @@ +package io.ably.lib.objects; + +import io.ably.lib.types.Callback; +import org.jetbrains.annotations.Blocking; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.Unmodifiable; + +import java.util.Map; + +/** + * The LiveMap interface provides methods to interact with a live, real-time map structure. + * It supports both synchronous and asynchronous operations for managing key-value pairs. + */ +public interface LiveMap { + + /** + * Retrieves the value associated with the specified key. + * If this map object is tombstoned (deleted), `undefined` is returned. + * If no entry is associated with the specified key, `undefined` is returned. + * If map entry is tombstoned (deleted), `undefined` is returned. + * If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject + * is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned. + * If the value is not an objectId, then that value is returned. + * + * @param keyName the key whose associated value is to be returned. + * @return the value associated with the specified key, or null if the key does not exist. + */ + @Nullable + Object get(@NotNull String keyName); + + /** + * Retrieves all entries (key-value pairs) in the map. + * + * @return an iterable collection of all entries in the map. + */ + @NotNull + @Unmodifiable + Iterable> entries(); + + /** + * Retrieves all keys in the map. + * + * @return an iterable collection of all keys in the map. + */ + @NotNull + @Unmodifiable + Iterable keys(); + + /** + * Retrieves all values in the map. + * + * @return an iterable collection of all values in the map. + */ + @NotNull + @Unmodifiable + Iterable values(); + + /** + * Sets the specified key to the given value in the map. + * Send a MAP_SET operation to the realtime system to set a key on this LiveMap object to a specified value. + * This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when + * the published MAP_SET operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @param keyName the key to be set. + * @param value the value to be associated with the key. + */ + @Blocking + void set(@NotNull String keyName, @NotNull Object value); + + /** + * Removes the specified key and its associated value from the map. + * Send a MAP_REMOVE operation to the realtime system to tombstone a key on this LiveMap object. + * This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when + * the published MAP_REMOVE operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @param keyName the key to be removed. + */ + @Blocking + void remove(@NotNull String keyName); + + /** + * Retrieves the number of entries in the map. + * + * @return the size of the map. + */ + @Contract(pure = true) // Indicates this method does not modify the state of the object. + @NotNull + Long size(); + + /** + * Asynchronously sets the specified key to the given value in the map. + * Send a MAP_SET operation to the realtime system to set a key on this LiveMap object to a specified value. + * This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when + * the published MAP_SET operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @param keyName the key to be set. + * @param value the value to be associated with the key. + * @param callback the callback to handle the result or any errors. + */ + @NonBlocking + void setAsync(@NotNull String keyName, @NotNull Object value, @NotNull Callback callback); + + /** + * Asynchronously removes the specified key and its associated value from the map. + * Send a MAP_REMOVE operation to the realtime system to tombstone a key on this LiveMap object. + * This does not modify the underlying data of this LiveMap object. Instead, the change will be applied when + * the published MAP_REMOVE operation is echoed back to the client and applied to the object following the regular + * operation application procedure. + * + * @param keyName the key to be removed. + * @param callback the callback to handle the result or any errors. + */ + @NonBlocking + void removeAsync(@NotNull String keyName, @NotNull Callback callback); +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java new file mode 100644 index 000000000..adf05df6e --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java @@ -0,0 +1,151 @@ +package io.ably.lib.objects; + +import io.ably.lib.types.Callback; +import org.jetbrains.annotations.Blocking; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; + + +import java.util.Map; + +/** + * The LiveObjects interface provides methods to interact with live data objects, + * such as maps and counters, in a real-time environment. It supports both synchronous + * and asynchronous operations for retrieving and creating live objects. + * + *

Implementations of this interface must be thread-safe as they may be accessed + * from multiple threads concurrently. + */ +public interface LiveObjects { + + /** + * Retrieves the root LiveMap object. + * When called without a type variable, we return a default root type which is based on globally defined interface for Objects feature. + * A user can provide an explicit type for the getRoot method to explicitly set the type structure on this particular channel. + * This is useful when working with multiple channels with different underlying data structure. + * + * @return the root LiveMap instance. + */ + @Blocking + @NotNull + LiveMap getRoot(); + + /** + * Creates a new LiveMap based on an existing LiveMap. + * Send a MAP_CREATE operation to the realtime system to create a new map object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed MAP_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param liveMap the existing LiveMap to base the new LiveMap on. + * @return the newly created LiveMap instance. + */ + @Blocking + @NotNull + LiveMap createMap(@NotNull LiveMap liveMap); + + /** + * Creates a new LiveMap based on a LiveCounter. + * Send a MAP_CREATE operation to the realtime system to create a new map object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed MAP_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param liveCounter the LiveCounter to base the new LiveMap on. + * @return the newly created LiveMap instance. + */ + @Blocking + @NotNull + LiveMap createMap(@NotNull LiveCounter liveCounter); + + /** + * Creates a new LiveMap based on a standard Java Map. + * Send a MAP_CREATE operation to the realtime system to create a new map object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed MAP_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param map the Java Map to base the new LiveMap on. + * @return the newly created LiveMap instance. + */ + @Blocking + @NotNull + LiveMap createMap(@NotNull Map map); + + /** + * Creates a new LiveCounter with an initial value. + * Send a COUNTER_CREATE operation to the realtime system to create a new counter object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed COUNTER_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param initialValue the initial value of the LiveCounter. + * @return the newly created LiveCounter instance. + */ + @Blocking + @NotNull + LiveCounter createCounter(@NotNull Long initialValue); + + /** + * Asynchronously retrieves the root LiveMap object. + * When called without a type variable, we return a default root type which is based on globally defined interface for Objects feature. + * A user can provide an explicit type for the getRoot method to explicitly set the type structure on this particular channel. + * This is useful when working with multiple channels with different underlying data structure. + * + * @param callback the callback to handle the result or error. + */ + @NonBlocking + void getRootAsync(@NotNull Callback<@NotNull LiveMap> callback); + + /** + * Asynchronously creates a new LiveMap based on an existing LiveMap. + * Send a MAP_CREATE operation to the realtime system to create a new map object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed MAP_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param liveMap the existing LiveMap to base the new LiveMap on. + * @param callback the callback to handle the result or error. + */ + @NonBlocking + void createMapAsync(@NotNull LiveMap liveMap, @NotNull Callback<@NotNull LiveMap> callback); + + /** + * Asynchronously creates a new LiveMap based on a LiveCounter. + * Send a MAP_CREATE operation to the realtime system to create a new map object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed MAP_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param liveCounter the LiveCounter to base the new LiveMap on. + * @param callback the callback to handle the result or error. + */ + @NonBlocking + void createMapAsync(@NotNull LiveCounter liveCounter, @NotNull Callback<@NotNull LiveMap> callback); + + /** + * Asynchronously creates a new LiveMap based on a standard Java Map. + * Send a MAP_CREATE operation to the realtime system to create a new map object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed MAP_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param map the Java Map to base the new LiveMap on. + * @param callback the callback to handle the result or error. + */ + @NonBlocking + void createMapAsync(@NotNull Map map, @NotNull Callback<@NotNull LiveMap> callback); + + /** + * Asynchronously creates a new LiveCounter with an initial value. + * Send a COUNTER_CREATE operation to the realtime system to create a new counter object in the pool. + * Once the ACK message is received, the method returns the object from the local pool if it got created due to + * the echoed COUNTER_CREATE operation, or if it wasn't received yet, the method creates a new object locally + * using the provided data and returns it. + * + * @param initialValue the initial value of the LiveCounter. + * @param callback the callback to handle the result or error. + */ + @NonBlocking + void createCounterAsync(@NotNull Long initialValue, @NotNull Callback<@NotNull LiveCounter> callback); +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java new file mode 100644 index 000000000..cad3e9f59 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java @@ -0,0 +1,32 @@ +package io.ably.lib.objects; + +import io.ably.lib.plugins.PluginInstance; +import org.jetbrains.annotations.NotNull; + +/** + * The LiveObjectsPlugin interface provides a mechanism for managing and interacting with + * live data objects in a real-time environment. It allows for the retrieval, disposal, and + * management of LiveObjects instances associated with specific channel names. + */ +public interface LiveObjectsPlugin extends PluginInstance { + + /** + * Retrieves an instance of LiveObjects associated with the specified channel name. + * This method ensures that a LiveObjects instance is available for the given channel, + * creating one if it does not already exist. + * + * @param channelName the name of the channel for which the LiveObjects instance is to be retrieved. + * @return the LiveObjects instance associated with the specified channel name. + */ + @NotNull + LiveObjects getInstance(@NotNull String channelName); + + /** + * Disposes of the LiveObjects instance associated with the specified channel name. + * This method removes the LiveObjects instance for the given channel, releasing any + * resources associated with it. + * + * @param channelName the name of the channel whose LiveObjects instance is to be removed. + */ + void dispose(@NotNull String channelName); +} diff --git a/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java b/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java new file mode 100644 index 000000000..5283d2120 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/plugins/PluginConnectionAdapter.java @@ -0,0 +1,25 @@ +package io.ably.lib.plugins; + +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ProtocolMessage; + +/** + * The PluginConnectionAdapter interface defines a contract for managing real-time communication + * between plugins and the Ably Realtime system. Implementations of this interface are responsible + * for sending protocol messages to their intended recipients, optionally queuing events, and + * notifying listeners of the operation's outcome. + */ +public interface PluginConnectionAdapter { + + /** + * Sends a protocol message to its intended recipient. + * This method transmits a protocol message, allowing for queuing events if necessary, + * and notifies the provided listener upon the success or failure of the send operation. + * + * @param msg the protocol message to send. + * @param listener a listener to be notified of the success or failure of the send operation. + * @throws AblyException if an error occurs during the send operation. + */ + void send(ProtocolMessage msg, CompletionListener listener) throws AblyException; +} diff --git a/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java b/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java new file mode 100644 index 000000000..23055f901 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/plugins/PluginInstance.java @@ -0,0 +1,25 @@ +package io.ably.lib.plugins; + +import io.ably.lib.types.ProtocolMessage; +import org.jetbrains.annotations.NotNull; + +/** + * The ProtocolMessageHandler interface defines a contract for handling protocol messages. + * Implementations of this interface are responsible for processing incoming protocol messages + * and performing the necessary actions based on the message content. + */ +public interface PluginInstance { + /** + * Handles a protocol message. + * This method is invoked whenever a protocol message is received, allowing the implementation + * to process the message and take appropriate actions. + * + * @param message the protocol message to handle. + */ + void handle(@NotNull ProtocolMessage message); + + /** + * Disposes of the plugin instance and all underlying resources. + */ + void dispose(); +} diff --git a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java index 5c29a0dea..92e0fdbd8 100644 --- a/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java +++ b/lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java @@ -1,10 +1,13 @@ package io.ably.lib.realtime; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import io.ably.lib.objects.LiveObjectsPlugin; +import io.ably.lib.plugins.PluginConnectionAdapter; import io.ably.lib.rest.AblyRest; import io.ably.lib.rest.Auth; import io.ably.lib.transport.ConnectionManager; @@ -40,6 +43,13 @@ public class AblyRealtime extends AblyRest { */ public final Channels channels; + /** + * A nullable reference to the LiveObjects plugin. + *

+ * This field is initialized only if the LiveObjects plugin is present in the classpath. + */ + private final LiveObjectsPlugin liveObjectsPlugin; + /** * Constructs a Realtime client object using an Ably API key or token string. *

@@ -62,7 +72,10 @@ public AblyRealtime(ClientOptions options) throws AblyException { super(options); final InternalChannels channels = new InternalChannels(); this.channels = channels; - connection = new Connection(this, channels, platformAgentProvider); + + liveObjectsPlugin = tryInitializeLiveObjectsPlugin(); + + connection = new Connection(this, channels, platformAgentProvider, liveObjectsPlugin); if (!StringUtils.isNullOrEmpty(options.recover)) { RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover); @@ -108,6 +121,9 @@ public void close() { } connection.close(); + if (liveObjectsPlugin != null) { + liveObjectsPlugin.dispose(); + } } /** @@ -168,6 +184,19 @@ public interface Channels extends ReadOnlyMap { void release(String channelName); } + private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() { + try { + Class liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin"); + return (LiveObjectsPlugin) liveObjectsImplementation + .getDeclaredConstructor(PluginConnectionAdapter.class) + .newInstance(this.connection.connectionManager); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | + InvocationTargetException e) { + Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e); + return null; + } + } + private class InternalChannels extends InternalMap implements Channels, ConnectionManager.Channels { /** * Get the named channel; if it does not already exist, @@ -187,7 +216,7 @@ public Channel get(final String channelName, final ChannelOptions channelOptions // We're not using computeIfAbsent because that requires Java 1.8. // Hence there's the slight inefficiency of creating newChannel when it may not be // needed because there is an existingChannel. - final Channel newChannel = new Channel(AblyRealtime.this, channelName, channelOptions); + final Channel newChannel = new Channel(AblyRealtime.this, channelName, channelOptions, liveObjectsPlugin); final Channel existingChannel = map.putIfAbsent(channelName, newChannel); if (existingChannel != null) { @@ -214,6 +243,9 @@ public void release(String channelName) { Log.e(TAG, "Unexpected exception detaching channel; channelName = " + channelName, e); } } + if (liveObjectsPlugin != null) { + liveObjectsPlugin.dispose(channelName); + } } @Override diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index 028f6b23c..16470f1d6 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -13,6 +13,8 @@ import io.ably.lib.http.Http; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; +import io.ably.lib.objects.LiveObjects; +import io.ably.lib.objects.LiveObjectsPlugin; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.transport.ConnectionManager.QueuedMessage; import io.ably.lib.transport.Defaults; @@ -91,6 +93,18 @@ public abstract class ChannelBase extends EventEmitter') to your dependency tree", 400, 40019) + ); + } + return liveObjectsPlugin.getInstance(name); + } + /*** * internal * @@ -1285,7 +1299,7 @@ else if(stateChange.current.equals(failureState)) { } } - ChannelBase(AblyRealtime ably, String name, ChannelOptions options) throws AblyException { + ChannelBase(AblyRealtime ably, String name, ChannelOptions options, LiveObjectsPlugin liveObjectsPlugin) throws AblyException { Log.v(TAG, "RealtimeChannel(); channel = " + name); this.ably = ably; this.name = name; @@ -1295,6 +1309,7 @@ else if(stateChange.current.equals(failureState)) { this.attachResume = false; state = ChannelState.initialized; this.decodingContext = new DecodingContext(); + this.liveObjectsPlugin = liveObjectsPlugin; } void onChannelMessage(ProtocolMessage msg) { diff --git a/lib/src/main/java/io/ably/lib/realtime/Connection.java b/lib/src/main/java/io/ably/lib/realtime/Connection.java index c1ca65c70..3ba28a434 100644 --- a/lib/src/main/java/io/ably/lib/realtime/Connection.java +++ b/lib/src/main/java/io/ably/lib/realtime/Connection.java @@ -1,5 +1,6 @@ package io.ably.lib.realtime; +import io.ably.lib.objects.LiveObjectsPlugin; import io.ably.lib.realtime.ConnectionStateListener.ConnectionStateChange; import io.ably.lib.transport.ConnectionManager; import io.ably.lib.types.AblyException; @@ -122,10 +123,10 @@ public void close() { * internal *****************/ - Connection(AblyRealtime ably, ConnectionManager.Channels channels, PlatformAgentProvider platformAgentProvider) throws AblyException { + Connection(AblyRealtime ably, ConnectionManager.Channels channels, PlatformAgentProvider platformAgentProvider, LiveObjectsPlugin liveObjectsPlugin) throws AblyException { this.ably = ably; this.state = ConnectionState.initialized; - this.connectionManager = new ConnectionManager(ably, this, channels, platformAgentProvider); + this.connectionManager = new ConnectionManager(ably, this, channels, platformAgentProvider, liveObjectsPlugin); } public void onConnectionStateChange(ConnectionStateChange stateChange) { diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 26bd74cb7..f6f20e9eb 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -14,6 +14,8 @@ import io.ably.lib.debug.DebugOptions; import io.ably.lib.debug.DebugOptions.RawProtocolListener; import io.ably.lib.http.HttpHelpers; +import io.ably.lib.objects.LiveObjectsPlugin; +import io.ably.lib.plugins.PluginConnectionAdapter; import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.realtime.CompletionListener; @@ -35,7 +37,7 @@ import io.ably.lib.util.PlatformAgentProvider; import io.ably.lib.util.ReconnectionStrategy; -public class ConnectionManager implements ConnectListener { +public class ConnectionManager implements ConnectListener, PluginConnectionAdapter { final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); /************************************************************** @@ -79,6 +81,13 @@ public class ConnectionManager implements ConnectListener { */ private boolean cleaningUpAfterEnteringTerminalState = false; + /** + * A nullable reference to the LiveObjects plugin. + *

+ * This field is initialized only if the LiveObjects plugin is present in the classpath. + */ + private final LiveObjectsPlugin liveObjectsPlugin; + /** * Methods on the channels map owned by the {@link AblyRealtime} instance * which the {@link ConnectionManager} needs access to. @@ -764,11 +773,12 @@ public void run() { * ConnectionManager ***********************/ - public ConnectionManager(final AblyRealtime ably, final Connection connection, final Channels channels, final PlatformAgentProvider platformAgentProvider) throws AblyException { + public ConnectionManager(final AblyRealtime ably, final Connection connection, final Channels channels, final PlatformAgentProvider platformAgentProvider, LiveObjectsPlugin liveObjectsPlugin) throws AblyException { this.ably = ably; this.connection = connection; this.channels = channels; this.platformAgentProvider = platformAgentProvider; + this.liveObjectsPlugin = liveObjectsPlugin; ClientOptions options = ably.options; this.hosts = new Hosts(options.realtimeHost, Defaults.HOST_REALTIME, options); @@ -1220,6 +1230,16 @@ public void onMessage(ITransport transport, ProtocolMessage message) throws Ably case auth: addAction(new ReauthAction()); break; + case object: + case object_sync: + if (liveObjectsPlugin != null) { + try { + liveObjectsPlugin.handle(message); + } catch (Throwable t) { + Log.e(TAG, "LiveObjectsPlugin threw while handling message", t); + } + } + break; default: onChannelMessage(message); } @@ -1667,6 +1687,11 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) { } } + @Override + public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException { + this.send(msg, true, listener); + } + public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException { State state; synchronized(this) { diff --git a/lib/src/main/java/io/ably/lib/types/ChannelMode.java b/lib/src/main/java/io/ably/lib/types/ChannelMode.java index 26d26ac8f..f20636933 100644 --- a/lib/src/main/java/io/ably/lib/types/ChannelMode.java +++ b/lib/src/main/java/io/ably/lib/types/ChannelMode.java @@ -24,7 +24,27 @@ public enum ChannelMode { /** * The client can receive presence messages. */ - presence_subscribe(Flag.presence_subscribe); + presence_subscribe(Flag.presence_subscribe), + + /** + * The client can publish object messages. + */ + object_publish(Flag.object_publish), + + /** + * The client can subscribe to object messages. + */ + object_subscribe(Flag.object_subscribe), + + /** + * The client can publish annotation messages. + */ + annotation_publish(Flag.annotation_publish), + + /** + * The client can subscribe to annotation messages. + */ + annotation_subscribe(Flag.annotation_subscribe); private final int mask; diff --git a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java index 1d1d3bc69..986a8628b 100644 --- a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java +++ b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java @@ -45,7 +45,11 @@ public enum Action { presence, message, sync, - auth; + auth, + activate, + object, + object_sync, + annotation; public int getValue() { return ordinal(); } public static Action findByValue(int value) { return values()[value]; } @@ -57,12 +61,19 @@ public enum Flag { has_backlog(1), resumed(2), attach_resume(5), - + /* Has object flag */ + has_objects(7), /* Channel mode flags */ presence(16), publish(17), subscribe(18), - presence_subscribe(19); + presence_subscribe(19), + /* Annotation flags */ + annotation_publish(21), + annotation_subscribe(22), + /* Object flags */ + object_subscribe(24), + object_publish(25); private final int mask; diff --git a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java index 383270153..eaaaead97 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/ConnectionManagerTest.java @@ -137,7 +137,7 @@ public void connectionmanager_fallback_none_withoutconnection() throws AblyExcep Connection connection = Mockito.mock(Connection.class); final ConnectionManager.Channels channels = Mockito.mock(ConnectionManager.Channels.class); - ConnectionManager connectionManager = new ConnectionManager(ably, connection, channels, new EmptyPlatformAgentProvider()) { + ConnectionManager connectionManager = new ConnectionManager(ably, connection, channels, new EmptyPlatformAgentProvider(), null) { @Override protected boolean checkConnectivity() { return false; diff --git a/live-objects/build.gradle.kts b/live-objects/build.gradle.kts new file mode 100644 index 000000000..745a9a47c --- /dev/null +++ b/live-objects/build.gradle.kts @@ -0,0 +1,24 @@ +plugins { + `java-library` + alias(libs.plugins.kotlin.jvm) +} + +repositories { + mavenCentral() +} + +dependencies { + implementation(project(":java")) + testImplementation(kotlin("test")) + implementation(libs.coroutine.core) + + testImplementation(libs.coroutine.test) +} + +tasks.test { + useJUnitPlatform() +} + +kotlin { + explicitApi() +} diff --git a/live-objects/gradle.properties b/live-objects/gradle.properties new file mode 100644 index 000000000..29fa6bdb7 --- /dev/null +++ b/live-objects/gradle.properties @@ -0,0 +1,4 @@ +POM_ARTIFACT_ID=live-objects +POM_NAME=Live Objects plugin for Ably Pub/Sub SDK +POM_DESCRIPTION=Live Objects plugin for Ably Pub/Sub SDK +POM_PACKAGING=jar diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt new file mode 100644 index 000000000..2fc70a2a9 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -0,0 +1,50 @@ +package io.ably.lib.objects + +import io.ably.lib.types.Callback + +internal class DefaultLiveObjects(private val channelName: String): LiveObjects { + override fun getRoot(): LiveMap { + TODO("Not yet implemented") + } + + override fun createMap(liveMap: LiveMap): LiveMap { + TODO("Not yet implemented") + } + + override fun createMap(liveCounter: LiveCounter): LiveMap { + TODO("Not yet implemented") + } + + override fun createMap(map: MutableMap): LiveMap { + TODO("Not yet implemented") + } + + override fun getRootAsync(callback: Callback) { + TODO("Not yet implemented") + } + + override fun createMapAsync(liveMap: LiveMap, callback: Callback) { + TODO("Not yet implemented") + } + + override fun createMapAsync(liveCounter: LiveCounter, callback: Callback) { + TODO("Not yet implemented") + } + + override fun createMapAsync(map: MutableMap, callback: Callback) { + TODO("Not yet implemented") + } + + override fun createCounterAsync(initialValue: Long, callback: Callback) { + TODO("Not yet implemented") + } + + override fun createCounter(initialValue: Long): LiveCounter { + TODO("Not yet implemented") + } + + fun dispose() { + // Dispose of any resources associated with this LiveObjects instance + // For example, close any open connections or clean up references + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt new file mode 100644 index 000000000..277d4df31 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt @@ -0,0 +1,47 @@ +package io.ably.lib.objects + +import io.ably.lib.plugins.PluginConnectionAdapter +import io.ably.lib.realtime.CompletionListener +import io.ably.lib.types.ErrorInfo +import io.ably.lib.types.ProtocolMessage +import kotlinx.coroutines.CompletableDeferred +import java.util.concurrent.ConcurrentHashMap + +public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin { + + private val liveObjects = ConcurrentHashMap() + + override fun getInstance(channelName: String): LiveObjects { + return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) } + } + + public suspend fun send(message: ProtocolMessage) { + val deferred = CompletableDeferred() + pluginConnectionAdapter.send(message, object : CompletionListener { + override fun onSuccess() { + deferred.complete(Unit) + } + + override fun onError(reason: ErrorInfo) { + deferred.completeExceptionally(Exception(reason.message)) + } + }) + deferred.await() + } + + override fun handle(message: ProtocolMessage) { + TODO("Not yet implemented") + } + + override fun dispose(channelName: String) { + liveObjects[channelName]?.dispose() + liveObjects.remove(channelName) + } + + override fun dispose() { + liveObjects.values.forEach { + it.dispose() + } + liveObjects.clear() + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 7ccfd6f3f..6d7d6ba8c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -15,3 +15,4 @@ include("network-client-core") include("network-client-default") include("network-client-okhttp") include("pubsub-adapter") +include("live-objects")