Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 13 additions & 53 deletions lib/src/main/java/io/ably/lib/objects/Adapter.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.realtime.ChannelBase;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ChannelMode;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.util.Log;
import org.jetbrains.annotations.NotNull;

Expand All @@ -20,54 +17,6 @@ public Adapter(@NotNull AblyRealtime ably) {
this.ably = ably;
}

@Override
public void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial) {
if (ably.channels.containsKey(channelName)) {
ably.channels.get(channelName).properties.channelSerial = channelSerial;
} else {
Log.e(TAG, "setChannelSerial(): channel not found: " + channelName);
}
}

@Override
public void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener listener) throws AblyException {
// Always queue LiveObjects messages to ensure reliable state synchronization and proper acknowledgment
ably.connection.connectionManager.send(msg, true, listener);
}

@Override
public int maxMessageSizeLimit() {
return ably.connection.connectionManager.maxMessageSize;
}

@Override
public ChannelMode[] getChannelModes(@NotNull String channelName) {
if (ably.channels.containsKey(channelName)) {
// RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set
ChannelMode[] modes = ably.channels.get(channelName).getModes();
if (modes != null) {
return modes;
}
// RTO2b - otherwise as a best effort use user provided channel options
ChannelOptions options = ably.channels.get(channelName).getOptions();
if (options != null && options.hasModes()) {
return options.modes;
}
return null;
}
Log.e(TAG, "getChannelMode(): channel not found: " + channelName);
return null;
}

@Override
public ChannelState getChannelState(@NotNull String channelName) {
if (ably.channels.containsKey(channelName)) {
return ably.channels.get(channelName).state;
}
Log.e(TAG, "getChannelState(): channel not found: " + channelName);
return null;
}

@Override
public @NotNull ClientOptions getClientOptions() {
return ably.options;
Expand All @@ -82,4 +31,15 @@ public ChannelState getChannelState(@NotNull String channelName) {
public long getTime() throws AblyException {
return ably.time();
}

@Override
public @NotNull ChannelBase getChannel(@NotNull String channelName) throws AblyException {
if (ably.channels.containsKey(channelName)) {
return ably.channels.get(channelName);
} else {
Log.e(TAG, "attachChannel(): channel not found: " + channelName);
ErrorInfo errorInfo = new ErrorInfo("Channel not found: " + channelName, 404);
throw AblyException.fromErrorInfo(errorInfo);
}
}
}
61 changes: 11 additions & 50 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsAdapter.java
Original file line number Diff line number Diff line change
@@ -1,62 +1,13 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.ChannelState;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.realtime.ChannelBase;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ChannelMode;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ProtocolMessage;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public interface LiveObjectsAdapter {
/**
* Sends a protocol message to its intended recipient.
* This method transmits a protocol message, allowing for queuing events if necessary,
* and notifies the provided listener upon the success or failure of the send operation.
*
* @param msg the protocol message to send.
* @param listener a listener to be notified of the success or failure of the send operation.
* @throws AblyException if an error occurs during the send operation.
*/
void send(@NotNull ProtocolMessage msg, @NotNull CompletionListener listener) throws AblyException;

/**
* Sets the channel serial for a specific channel.
* @param channelName the name of the channel for which to set the serial
* @param channelSerial the serial to set for the channel
*/
void setChannelSerial(@NotNull String channelName, @NotNull String channelSerial);

/**
* Retrieves the maximum message size allowed for the messages.
* This method returns the maximum size in bytes that a message can have.
*
* @return the maximum message size limit in bytes.
*/
int maxMessageSizeLimit();

/**
* Retrieves the channel modes for a specific channel.
* This method returns the modes that are set for the specified channel.
*
* @param channelName the name of the channel for which to retrieve the modes
* @return the array of channel modes for the specified channel, or null if the channel is not found
* Spec: RTO2a, RTO2b
*/
@Nullable ChannelMode[] getChannelModes(@NotNull String channelName);

/**
* Retrieves the current state of a specific channel.
* This method returns the state of the specified channel, which indicates its connection status.
*
* @param channelName the name of the channel for which to retrieve the state
* @return the current state of the specified channel, or null if the channel is not found
*/
@Nullable ChannelState getChannelState(@NotNull String channelName);

/**
* Retrieves the client options configured for the Ably client.
* Used to access client configuration parameters such as echoMessages setting
Expand All @@ -81,5 +32,15 @@ public interface LiveObjectsAdapter {
*/
@Blocking
long getTime() throws AblyException;

/**
* Retrieves the channel instance for the specified channel name.
* If the channel does not exist, an AblyException is thrown.
*
* @param channelName the name of the channel to retrieve
* @return the ChannelBase instance for the specified channel
* @throws AblyException if the channel is not found or cannot be retrieved
*/
@NotNull ChannelBase getChannel(@NotNull String channelName) throws AblyException;
}

3 changes: 3 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,9 @@ public Map<String, String> getParams() {
}

public ChannelMode[] getModes() {
if (modes == null) {
return new ChannelMode[0];
}
return modes.toArray(new ChannelMode[modes.size()]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val

private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) {
adapter.throwIfInvalidAccessApiConfiguration(channelName)
adapter.ensureAttached(channelName)
objectsManager.ensureSynced(state)
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
}
Expand Down
87 changes: 79 additions & 8 deletions live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.ably.lib.realtime.CompletionListener
import io.ably.lib.types.ChannelMode
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.ProtocolMessage
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
Expand All @@ -14,7 +15,7 @@ import kotlin.coroutines.resumeWithException
*/
internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation ->
try {
this.send(message, object : CompletionListener {
connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener {
override fun onSuccess() {
continuation.resume(Unit)
}
Expand All @@ -28,11 +29,54 @@ internal suspend fun LiveObjectsAdapter.sendAsync(message: ProtocolMessage) = su
}
}

internal suspend fun LiveObjectsAdapter.attachAsync(channelName: String) = suspendCancellableCoroutine { continuation ->
try {
getChannel(channelName).attach(object : CompletionListener {
override fun onSuccess() {
continuation.resume(Unit)
}

override fun onError(reason: ErrorInfo) {
continuation.resumeWithException(ablyException(reason))
}
})
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}

/**
* Retrieves the channel modes for a specific channel.
* This method returns the modes that are set for the specified channel.
*
* @param channelName the name of the channel for which to retrieve the modes
* @return the array of channel modes for the specified channel, or null if the channel is not found
* Spec: RTO2a, RTO2b
*/
internal fun LiveObjectsAdapter.getChannelModes(channelName: String): Array<ChannelMode>? {
val channel = getChannel(channelName)

// RTO2a - channel.modes is only populated on channel attachment, so use it only if it is set
channel.modes?.let { modes ->
if (modes.isNotEmpty()) {
return modes
}
}

// RTO2b - otherwise as a best effort use user provided channel options
channel.options?.let { options ->
if (options.hasModes()) {
return options.modes
}
}
return null
}

/**
* Spec: RTO15d
*/
internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Array<ObjectMessage>) {
val maximumAllowedSize = maxMessageSizeLimit()
val maximumAllowedSize = connectionManager.maxMessageSize
val objectsTotalMessageSize = objectMessages.sumOf { it.size() }
if (objectsTotalMessageSize > maximumAllowedSize) {
throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes",
Expand All @@ -44,19 +88,46 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe
if (protocolMessage.action != ProtocolMessage.Action.`object`) return
val channelSerial = protocolMessage.channelSerial
if (channelSerial.isNullOrEmpty()) return
setChannelSerial(channelName, channelSerial)
getChannel(channelName).properties.channelSerial = channelSerial
}

internal suspend fun LiveObjectsAdapter.ensureAttached(channelName: String) {
val channel = getChannel(channelName)
when (val currentChannelStatus = channel.state) {
ChannelState.initialized -> attachAsync(channelName)
ChannelState.attached -> return
ChannelState.attaching -> {
val attachDeferred = CompletableDeferred<Unit>()
getChannel(channelName).once {
when(it.current) {
ChannelState.attached -> attachDeferred.complete(Unit)
else -> {
val exception = ablyException("Channel $channelName is in invalid state: ${it.current}, " +
"error: ${it.reason}", ErrorCode.ChannelStateError)
attachDeferred.completeExceptionally(exception)
}
}
}
if (channel.state == ChannelState.attached) {
attachDeferred.complete(Unit)
}
attachDeferred.await()
}
else ->
throw ablyException("Channel $channelName is in invalid state: $currentChannelStatus", ErrorCode.ChannelStateError)
}
}

// Spec: RTLO4b1, RTLO4b2
internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) {
throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe)
throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed))
throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe)
}

internal fun LiveObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: String) {
throwIfEchoMessagesDisabled()
throwIfMissingChannelMode(channelName, ChannelMode.object_publish)
throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed, ChannelState.suspended))
throwIfMissingChannelMode(channelName, ChannelMode.object_publish)
}

internal fun LiveObjectsAdapter.throwIfUnpublishableState(channelName: String) {
Expand All @@ -67,16 +138,16 @@ internal fun LiveObjectsAdapter.throwIfUnpublishableState(channelName: String) {
}

// Spec: RTO2
internal fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) {
private fun LiveObjectsAdapter.throwIfMissingChannelMode(channelName: String, channelMode: ChannelMode) {
val channelModes = getChannelModes(channelName)
if (channelModes == null || !channelModes.contains(channelMode)) {
// Spec: RTO2a2, RTO2b2
throw ablyException("\"${channelMode.name}\" channel mode must be set for this operation", ErrorCode.ChannelModeRequired)
}
}

internal fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array<ChannelState>) {
val currentState = getChannelState(channelName)
private fun LiveObjectsAdapter.throwIfInChannelState(channelName: String, channelStates: Array<ChannelState>) {
val currentState = getChannel(channelName).state
if (currentState == null || channelStates.contains(currentState)) {
throw ablyException("Channel is in invalid state: $currentState", ErrorCode.ChannelStateError)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DefaultLiveObjectsTest : IntegrationTest() {
// Initialize the root map on the channel with initial data
restObjects.initializeRootMap(channelName)

val channel = getRealtimeChannel(channelName, autoAttach = false)
val channel = getRealtimeChannel(channelName)
val objects = channel.objects
assertNotNull(objects)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ abstract class IntegrationTest {
/**
* Retrieves a realtime channel for the specified channel name and client ID
* If a client with the given clientID does not exist, a new client is created using the provided options.
* The channel is attached and ensured to be in the attached state before returning.
*
* @param channelName Name of the channel
* @param clientId The ID of the client to use or create. Defaults to "client1".
* @return The attached realtime channel.
* @throws Exception If the channel fails to attach or the client fails to connect.
* @return The realtime channel in the INITIALIZED state.
* @throws Exception If the client fails to connect.
*/
internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1", autoAttach: Boolean = true): Channel {
internal suspend fun getRealtimeChannel(channelName: String, clientId: String = "client1"): Channel {
val client = realtimeClients.getOrPut(clientId) {
sandbox.createRealtimeClient {
this.clientId = clientId
Expand All @@ -46,12 +45,7 @@ abstract class IntegrationTest {
val channelOpts = ChannelOptions().apply {
modes = arrayOf(ChannelMode.object_publish, ChannelMode.object_subscribe)
}
return client.channels.get(channelName, channelOpts).apply {
if (autoAttach) {
attach()
ensureAttached()
}
}
return client.channels.get(channelName, channelOpts)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,3 @@ internal suspend fun AblyRealtime.ensureConnected() {
}
connectedDeferred.await()
}

internal suspend fun Channel.ensureAttached() {
if (this.state == ChannelState.attached) {
return
}
val attachedDeferred = CompletableDeferred<Unit>()
this.on {
if (it.event == ChannelEvent.attached) {
attachedDeferred.complete(Unit)
this.off()
} else if (it.event != ChannelEvent.attaching) {
attachedDeferred.completeExceptionally(ablyException(it.reason))
this.off()
}
}
attachedDeferred.await()
}
Loading
Loading