Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
## v1.1.0

### Updates
* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104))
* Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
* Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115))
* Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,33 +282,33 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
*/
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;

/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
*/
public void suspendInstance (String instanceId) {
this.suspendInstance(instanceId, null);
}

/**
* Resumes a running orchestration instance.
* @param instanceId the ID of the orchestration instance to resume
*/
public void resumeInstance(String instanceId) {
this.resumeInstance(instanceId, null);
}

/**
* Suspends a running orchestration instance.
* @param instanceId the ID of the orchestration instance to suspend
* @param reason the reason for suspending the orchestration instance
*/
public abstract void suspendInstance(String instanceId, @Nullable String reason);

/**
* Resumes a running orchestration instance.
* @param instanceId the ID of the orchestration instance to resume
* @param reason the reason for resuming the orchestration instance
*/
public abstract void resumeInstance(String instanceId, @Nullable String reason);
// /**
// * Suspends a running orchestration instance.
// * @param instanceId the ID of the orchestration instance to suspend
// */
// public void suspendInstance (String instanceId) {
// this.suspendInstance(instanceId, null);
// }
//
// /**
// * Resumes a running orchestration instance.
// * @param instanceId the ID of the orchestration instance to resume
// */
// public void resumeInstance(String instanceId) {
// this.resumeInstance(instanceId, null);
// }
//
// /**
// * Suspends a running orchestration instance.
// * @param instanceId the ID of the orchestration instance to suspend
// * @param reason the reason for suspending the orchestration instance
// */
// public abstract void suspendInstance(String instanceId, @Nullable String reason);
//
// /**
// * Resumes a running orchestration instance.
// * @param instanceId the ID of the orchestration instance to resume
// * @param reason the reason for resuming the orchestration instance
// */
// public abstract void resumeInstance(String instanceId, @Nullable String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,25 +283,25 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
}
}

@Override
public void suspendInstance(String instanceId, @Nullable String reason) {
SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
suspendRequestBuilder.setInstanceId(instanceId);
if (reason != null) {
suspendRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
}

@Override
public void resumeInstance(String instanceId, @Nullable String reason) {
ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder();
resumeRequestBuilder.setInstanceId(instanceId);
if (reason != null) {
resumeRequestBuilder.setReason(StringValue.of(reason));
}
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
}
// @Override
// public void suspendInstance(String instanceId, @Nullable String reason) {
// SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
// suspendRequestBuilder.setInstanceId(instanceId);
// if (reason != null) {
// suspendRequestBuilder.setReason(StringValue.of(reason));
// }
// this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
// }
//
// @Override
// public void resumeInstance(String instanceId, @Nullable String reason) {
// ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder();
// resumeRequestBuilder.setInstanceId(instanceId);
// if (reason != null) {
// resumeRequestBuilder.setReason(StringValue.of(reason));
// }
// this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
// }

private PurgeResult toPurgeResult(PurgeInstancesResponse response){
return new PurgeResult(response.getDeletedInstanceCount());
Expand Down
130 changes: 65 additions & 65 deletions client/src/test/java/com/microsoft/durabletask/IntegrationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,71 +408,71 @@ void termination() throws TimeoutException {
}
}

@Test
void suspendResumeOrchestration() throws TimeoutException, InterruptedException {
final String orchestratorName = "suspend";
final String eventName = "MyEvent";
final String eventPayload = "testPayload";
final Duration suspendTimeout = Duration.ofSeconds(5);

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
ctx.complete(payload);
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
client.suspendInstance(instanceId);
OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout);
assertNotNull(instance);
assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus());

client.raiseEvent(instanceId, eventName, eventPayload);

assertThrows(
TimeoutException.class,
() -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false),
"Expected to throw TimeoutException, but it didn't"
);

String resumeReason = "Resume for testing.";
client.resumeInstance(instanceId, resumeReason);
instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(instance);
assertEquals(instanceId, instance.getInstanceId());
assertEquals(eventPayload, instance.readOutputAs(String.class));
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
}
}

@Test
void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
final String orchestratorName = "suspendResume";
final String eventName = "MyEvent";
final String eventPayload = "testPayload";

DurableTaskGrpcWorker worker = this.createWorkerBuilder()
.addOrchestrator(orchestratorName, ctx -> {
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
ctx.complete(payload);
})
.buildAndStart();

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
String suspendReason = "Suspend for testing.";
client.suspendInstance(instanceId, suspendReason);
client.terminate(instanceId, null);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
assertNotNull(instance);
assertEquals(instanceId, instance.getInstanceId());
assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus());
}
}
// @Test
// void suspendResumeOrchestration() throws TimeoutException, InterruptedException {
// final String orchestratorName = "suspend";
// final String eventName = "MyEvent";
// final String eventPayload = "testPayload";
// final Duration suspendTimeout = Duration.ofSeconds(5);
//
// DurableTaskGrpcWorker worker = this.createWorkerBuilder()
// .addOrchestrator(orchestratorName, ctx -> {
// String payload = ctx.waitForExternalEvent(eventName, String.class).await();
// ctx.complete(payload);
// })
// .buildAndStart();
//
// DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
// try (worker; client) {
// String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
// client.suspendInstance(instanceId);
// OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout);
// assertNotNull(instance);
// assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus());
//
// client.raiseEvent(instanceId, eventName, eventPayload);
//
// assertThrows(
// TimeoutException.class,
// () -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false),
// "Expected to throw TimeoutException, but it didn't"
// );
//
// String resumeReason = "Resume for testing.";
// client.resumeInstance(instanceId, resumeReason);
// instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
// assertNotNull(instance);
// assertEquals(instanceId, instance.getInstanceId());
// assertEquals(eventPayload, instance.readOutputAs(String.class));
// assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
// }
// }
//
// @Test
// void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
// final String orchestratorName = "suspendResume";
// final String eventName = "MyEvent";
// final String eventPayload = "testPayload";
//
// DurableTaskGrpcWorker worker = this.createWorkerBuilder()
// .addOrchestrator(orchestratorName, ctx -> {
// String payload = ctx.waitForExternalEvent(eventName, String.class).await();
// ctx.complete(payload);
// })
// .buildAndStart();
//
// DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
// try (worker; client) {
// String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
// String suspendReason = "Suspend for testing.";
// client.suspendInstance(instanceId, suspendReason);
// client.terminate(instanceId, null);
// OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
// assertNotNull(instance);
// assertEquals(instanceId, instance.getInstanceId());
// assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus());
// }
// }

@Test
void activityFanOut() throws IOException, TimeoutException {
Expand Down