Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/Adapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.util.Log;
import org.jetbrains.annotations.NotNull;

public class Adapter implements LiveObjectsAdapter {
private final AblyRealtime ably;
private static final String TAG = LiveObjectsAdapter.class.getName();

public Adapter(@NotNull AblyRealtime ably) {
this.ably = ably;
}

@Override
public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) {
if (ably.channels.containsKey(channelName)) {
ably.channels.get(channelName).properties.channelSerial = channelSerial;
} else {
Log.e(TAG, "setChannelSerial(): channel not found: " + channelName);
}
}

@Override
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
// Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment
ably.connection.connectionManager.send(msg, true, listener);
}
}
8 changes: 4 additions & 4 deletions lib/src/main/java/io/ably/lib/objects/LiveMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public interface LiveMap {

/**
* Retrieves the value associated with the specified key.
* If this map object is tombstoned (deleted), `undefined` is returned.
* If no entry is associated with the specified key, `undefined` is returned.
* If map entry is tombstoned (deleted), `undefined` is returned.
* If this map object is tombstoned (deleted), null is returned.
* If no entry is associated with the specified key, null is returned.
* If map entry is tombstoned (deleted), null is returned.
* If the value associated with the provided key is an objectId string of another LiveObject, a reference to that LiveObject
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, `undefined` is returned.
* is returned, provided it exists in the local pool and is not tombstoned. Otherwise, null is returned.
* If the value is not an objectId, then that value is returned.
*
* @param keyName the key whose associated value is to be returned.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package io.ably.lib.plugins;
package io.ably.lib.objects;

import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ProtocolMessage;
import org.jetbrains.annotations.NotNull;

/**
* The PluginConnectionAdapter interface defines a contract for managing real-time communication
* between plugins and the Ably Realtime system. Implementations of this interface are responsible
* for sending protocol messages to their intended recipients, optionally queuing events, and
* notifying listeners of the operation's outcome.
*/
public interface PluginConnectionAdapter {

public interface LiveObjectsAdapter {
/**
* Sends a protocol message to its intended recipient.
* This method transmits a protocol message, allowing for queuing events if necessary,
Expand All @@ -22,4 +16,12 @@ public interface PluginConnectionAdapter {
* @throws AblyException if an error occurs during the send operation.
*/
void send(ProtocolMessage msg, CompletionListener listener) throws AblyException;

/**
* Sets the channel serial for a specific channel.
* @param channelName the name of the channel for which to set the serial
* @param channelSerial the serial to set for the channel
*/
void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial);
}

18 changes: 16 additions & 2 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package io.ably.lib.objects;

import io.ably.lib.plugins.PluginInstance;
import io.ably.lib.types.ProtocolMessage;
import org.jetbrains.annotations.NotNull;

/**
* The LiveObjectsPlugin interface provides a mechanism for managing and interacting with
* live data objects in a real-time environment. It allows for the retrieval, disposal, and
* management of LiveObjects instances associated with specific channel names.
*/
public interface LiveObjectsPlugin extends PluginInstance {
public interface LiveObjectsPlugin {

/**
* Retrieves an instance of LiveObjects associated with the specified channel name.
Expand All @@ -21,6 +21,15 @@ public interface LiveObjectsPlugin extends PluginInstance {
@NotNull
LiveObjects getInstance(@NotNull String channelName);

/**
* Handles a protocol message.
* This method is invoked whenever a protocol message is received, allowing the implementation
* to process the message and take appropriate actions.
*
* @param message the protocol message to handle.
*/
void handle(@NotNull ProtocolMessage message);

/**
* Disposes of the LiveObjects instance associated with the specified channel name.
* This method removes the LiveObjects instance for the given channel, releasing any
Expand All @@ -29,4 +38,9 @@ public interface LiveObjectsPlugin extends PluginInstance {
* @param channelName the name of the channel whose LiveObjects instance is to be removed.
*/
void dispose(@NotNull String channelName);

/**
* Disposes of the plugin instance and all underlying resources.
*/
void dispose();
}
25 changes: 0 additions & 25 deletions lib/src/main/java/io/ably/lib/plugins/PluginInstance.java

This file was deleted.

8 changes: 5 additions & 3 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import java.util.List;
import java.util.Map;

import io.ably.lib.objects.Adapter;
import io.ably.lib.objects.LiveObjectsAdapter;
import io.ably.lib.objects.LiveObjectsPlugin;
import io.ably.lib.plugins.PluginConnectionAdapter;
import io.ably.lib.rest.AblyRest;
import io.ably.lib.rest.Auth;
import io.ably.lib.transport.ConnectionManager;
Expand Down Expand Up @@ -187,9 +188,10 @@ public interface Channels extends ReadOnlyMap<String, Channel> {
private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() {
try {
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
LiveObjectsAdapter adapter = new Adapter(this);
return (LiveObjectsPlugin) liveObjectsImplementation
.getDeclaredConstructor(PluginConnectionAdapter.class)
.newInstance(this.connection.connectionManager);
.getDeclaredConstructor(LiveObjectsAdapter.class)
.newInstance(adapter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
import io.ably.lib.http.HttpHelpers;
import io.ably.lib.objects.LiveObjectsPlugin;
import io.ably.lib.plugins.PluginConnectionAdapter;
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.realtime.CompletionListener;
Expand All @@ -37,7 +36,7 @@
import io.ably.lib.util.PlatformAgentProvider;
import io.ably.lib.util.ReconnectionStrategy;

public class ConnectionManager implements ConnectListener, PluginConnectionAdapter {
public class ConnectionManager implements ConnectListener {
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

/**************************************************************
Expand Down Expand Up @@ -1687,11 +1686,6 @@ public QueuedMessage(ProtocolMessage msg, CompletionListener listener) {
}
}

@Override
public void send(ProtocolMessage msg, CompletionListener listener) throws AblyException {
this.send(msg, true, listener);
}

public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException {
State state;
synchronized(this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.ably.lib.objects

import io.ably.lib.types.Callback
import io.ably.lib.types.ProtocolMessage
import io.ably.lib.util.Log

internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects {
private val tag = DefaultLiveObjects::class.simpleName

internal class DefaultLiveObjects(private val channelName: String): LiveObjects {
override fun getRoot(): LiveMap {
TODO("Not yet implemented")
}
Expand Down Expand Up @@ -43,6 +47,16 @@ internal class DefaultLiveObjects(private val channelName: String): LiveObjects
TODO("Not yet implemented")
}

fun handle(msg: ProtocolMessage) {
// RTL15b
msg.channelSerial?.let {
if (msg.action === ProtocolMessage.Action.`object`) {
Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}")
adapter.setChannelSerial(channelName, msg.channelSerial)
}
}
}

fun dispose() {
// Dispose of any resources associated with this LiveObjects instance
// For example, close any open connections or clean up references
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,19 @@
package io.ably.lib.objects

import io.ably.lib.plugins.PluginConnectionAdapter
import io.ably.lib.realtime.CompletionListener
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.ProtocolMessage
import kotlinx.coroutines.CompletableDeferred
import java.util.concurrent.ConcurrentHashMap

public class DefaultLiveObjectsPlugin(private val pluginConnectionAdapter: PluginConnectionAdapter) : LiveObjectsPlugin {
public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) : LiveObjectsPlugin {

private val liveObjects = ConcurrentHashMap<String, DefaultLiveObjects>()

override fun getInstance(channelName: String): LiveObjects {
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName) }
return liveObjects.getOrPut(channelName) { DefaultLiveObjects(channelName, adapter) }
}

public suspend fun send(message: ProtocolMessage) {
val deferred = CompletableDeferred<Unit>()
pluginConnectionAdapter.send(message, object : CompletionListener {
override fun onSuccess() {
deferred.complete(Unit)
}

override fun onError(reason: ErrorInfo) {
deferred.completeExceptionally(Exception(reason.message))
}
})
deferred.await()
}

override fun handle(message: ProtocolMessage) {
TODO("Not yet implemented")
override fun handle(msg: ProtocolMessage) {
val channelName = msg.channel
liveObjects[channelName]?.handle(msg)
}

override fun dispose(channelName: String) {
Expand Down
44 changes: 44 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.ably.lib.objects

import io.ably.lib.realtime.CompletionListener
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.ProtocolMessage
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation ->
try {
this.send(message, object : CompletionListener {
override fun onSuccess() {
continuation.resume(Unit)
}

override fun onError(reason: ErrorInfo) {
continuation.resumeWithException(AblyException.fromErrorInfo(reason))
}
})
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}

internal enum class ProtocolMessageFormat(private val value: String) {
Msgpack("msgpack"),
Json("json");

override fun toString(): String = value
}

internal class Binary(val data: ByteArray?) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is Binary) return false
return data?.contentEquals(other.data) == true
}

override fun hashCode(): Int {
return data?.contentHashCode() ?: 0
}
}
Loading
Loading