Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public class ConnectionManager implements ConnectListener {
static ErrorInfo REASON_REFUSED = new ErrorInfo("Access refused", 401, 40100);
static ErrorInfo REASON_TOO_BIG = new ErrorInfo("Connection closed; message too large", 400, 40000);

/**
* When connection manager entering terminal state {@code currentState.terminal == true} it should clean up
* {@link #handlerThread} and invoke {@link #stopConnectivityListener}.
* <p>
* If this flag is true that means that current state is terminal but cleaning up still in progress
*/
private boolean cleaningUpAfterEnteringTerminalState = false;

/**
* Methods on the channels map owned by the {@link AblyRealtime} instance
* which the {@link ConnectionManager} needs access to.
Expand Down Expand Up @@ -696,6 +704,8 @@ public void run() {
/* indicate that this thread is committed to die */
handlerThread = null;
stopConnectivityListener();
cleaningUpAfterEnteringTerminalState = false;
ConnectionManager.this.notifyAll();
return;
}

Expand Down Expand Up @@ -790,7 +800,13 @@ public synchronized State getConnectionState() {
public synchronized void connect() {
/* connect() is the only action that will bring the ConnectionManager out of a terminal currentState */
if(currentState.terminal || currentState.state == ConnectionState.initialized) {
startup();
try {
startup();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
Log.e(TAG, "Failed to start up connection", e);
return;
}
}
requestState(ConnectionState.connecting);
}
Expand Down Expand Up @@ -853,6 +869,7 @@ private synchronized ConnectionStateChange setState(ITransport transport, StateI
Log.v(TAG, "setState(): setting " + newState.state + "; reason " + reason);
ConnectionStateChange change = new ConnectionStateChange(currentState.state, newConnectionState, newState.timeout, reason);
currentState = newState;
cleaningUpAfterEnteringTerminalState = currentState.terminal;
stateError = reason;

return change;
Expand Down Expand Up @@ -1338,10 +1355,17 @@ private void onHeartbeat(ProtocolMessage message) {
* ConnectionManager lifecycle
******************************/

private synchronized void startup() {
if(handlerThread == null) {
private synchronized void startup() throws InterruptedException {
while (cleaningUpAfterEnteringTerminalState) {
Log.v(TAG, "Waiting for termination action to clean up handler thread");
wait();
}

if (handlerThread == null) {
(handlerThread = new Thread(new ActionHandler())).start();
startConnectivityListener();
} else {
Log.v(TAG, "`connect()` has been called twice on uninitialized or terminal state");
}
}

Expand Down