Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ dependencies {
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.0.0'
implementation 'io.grpc:grpc-netty-shaded:1.29.0'
implementation group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.12.1'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.3'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.0'


testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
testImplementation group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/io/temporal/activity/Activity.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
package io.temporal.activity;

import io.temporal.client.ActivityCompletionException;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ChildWorkflowFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.internal.sync.ActivityInternal;
import io.temporal.internal.sync.WorkflowInternal;
import io.temporal.proto.common.WorkflowExecution;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.ActivityException;
import io.temporal.workflow.ActivityTimeoutException;
import java.lang.reflect.Type;
import java.util.Optional;

Expand Down Expand Up @@ -234,7 +235,7 @@ public static ActivityTask getTask() {
* Use to notify Temporal service that activity execution is alive.
*
* @param details In case of activity timeout can be accessed through {@link
* ActivityTimeoutException#getDetails(Class)} method.
* TimeoutFailure#getLastHeartbeatDetails()} method.
* @throws ActivityCompletionException Indicates that activity execution is expected to be
* interrupted. The reason for interruption is indicated by a type of subclass of the
* exception.
Expand Down Expand Up @@ -291,10 +292,10 @@ public static String getNamespace() {
* <p>The reason for such design is that returning originally thrown exception from a remote call
* (which child workflow and activity invocations are ) would not allow adding context information
* about a failure, like activity and child workflow id. So stubs always throw a subclass of
* {@link ActivityException} from calls to an activity and subclass of {@link
* io.temporal.workflow.ChildWorkflowException} from calls to a child workflow. The original
* exception is attached as a cause to these wrapper exceptions. So as exceptions are always
* wrapped adding checked ones to method signature causes more pain than benefit.
* {@link ActivityFailure} from calls to an activity and subclass of {@link ChildWorkflowFailure}
* from calls to a child workflow. The original exception is attached as a cause to these wrapper
* exceptions. So as exceptions are always wrapped adding checked ones to method signature causes
* more pain than benefit.
*
* <p>Throws original exception if e is {@link RuntimeException} or {@link Error}. Never returns.
* But return type is not empty to be able to use it as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package io.temporal.activity;

import io.temporal.failure.CanceledFailure;
import io.temporal.workflow.CancellationScope;
import java.util.concurrent.CancellationException;

/**
* Defines behaviour of the parent workflow when {@link CancellationScope} that wraps child workflow
* execution request is cancelled. The result of the cancellation independently of the type is a
* {@link CancellationException} thrown from the child workflow method.
* {@link CanceledFailure} thrown from the child workflow method.
*/
public enum ActivityCancellationType {
/**
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/temporal/activity/ActivityOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import io.temporal.common.MethodRetry;
import io.temporal.common.RetryOptions;
import io.temporal.common.context.ContextPropagator;
import io.temporal.failure.CanceledFailure;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CancellationException;

/** Options used to configure how an activity is invoked. */
public final class ActivityOptions {
Expand Down Expand Up @@ -145,9 +145,9 @@ public Builder setContextPropagators(List<ContextPropagator> contextPropagators)
}

/**
* In case of an activity cancellation it fails with a {@link CancellationException}Exception.
* If this flag is set to false then the exception is thrown not immediately but only after an
* activity completes its cleanup. If true a CancellationException is thrown immediately and an
* In case of an activity cancellation it fails with a {@link CanceledFailure}. If this flag is
* set to false then the exception is thrown not immediately but only after an activity
* completes its cleanup. If true a {@link CanceledFailure} is thrown immediately and an
* activity cancellation is going to happen in the background.
*/
public Builder setCancellationType(ActivityCancellationType cancellationType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package io.temporal.client;

import io.temporal.activity.Activity;
import io.temporal.failure.CanceledFailure;
import io.temporal.proto.common.WorkflowExecution;
import java.util.concurrent.CancellationException;

/**
* Used to complete asynchronously activities that called {@link Activity#doNotCompleteOnReturn()}.
Expand Down Expand Up @@ -52,7 +52,7 @@ <V> void reportCancellation(WorkflowExecution execution, String activityId, V de
/**
* Warning: heartbeating by ids is not implemented yet.
*
* @throws CancellationException if activity is cancelled.
* @throws CanceledFailure if activity is cancelled.
*/
<V> void heartbeat(WorkflowExecution execution, String activityId, V details)
throws ActivityCompletionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package io.temporal.client;

import io.temporal.activity.ActivityTask;
import io.temporal.failure.TemporalException;
import io.temporal.proto.common.WorkflowExecution;

/** Base exception for all failures returned by an activity completion client. */
public class ActivityCompletionException extends RuntimeException {
/** Base exception for all failures returned by an activity completion client. Do not extend! */
public class ActivityCompletionException extends TemporalException {

private final WorkflowExecution execution;

Expand All @@ -32,9 +33,7 @@ public class ActivityCompletionException extends RuntimeException {
private final String activityId;

protected ActivityCompletionException(ActivityTask task) {
execution = task.getWorkflowExecution();
activityType = task.getActivityType();
activityId = task.getActivityId();
this(task, null);
}

protected ActivityCompletionException(ActivityTask task, Throwable cause) {
Expand All @@ -60,7 +59,7 @@ protected ActivityCompletionException(ActivityTask task, Throwable cause) {
}

protected ActivityCompletionException(String activityId, Throwable cause) {
super("ActivityId" + activityId, cause);
super("ActivityId=" + activityId, cause);
this.execution = null;
this.activityType = null;
this.activityId = activityId;
Expand All @@ -71,7 +70,7 @@ protected ActivityCompletionException(Throwable cause) {
}

protected ActivityCompletionException() {
super();
super(null, null);
execution = null;
activityType = null;
activityId = null;
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/temporal/client/WorkflowClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@
* If you need to wait for a workflow completion after an asynchronous start, maybe even from a
* different process, the simplest way is to call the blocking version again. If {@link
* WorkflowOptions#getWorkflowIdReusePolicy()} is not {@code AllowDuplicate} then instead of
* throwing {@link io.temporal.client.DuplicateWorkflowException}, it reconnects to an existing
* workflow and waits for its completion. The following example shows how to do this from a
* different process than the one that started the workflow. All this process needs is a {@code
* WorkflowId}.
* throwing {@link WorkflowExecutionAlreadyStarted}, it reconnects to an existing workflow and waits
* for its completion. The following example shows how to do this from a different process than the
* one that started the workflow. All this process needs is a {@code WorkflowId}.
*
* <pre><code>
* FileProcessingWorkflow workflow = workflowClient.newWorkflowStub(FileProcessingWorkflow.class, workflowId);
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/temporal/client/WorkflowClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GsonJsonDataConverter;
import io.temporal.proto.query.QueryRejectCondition;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
Expand Down Expand Up @@ -82,7 +81,7 @@ public Builder setNamespace(String namespace) {
* Overrides a data converter implementation used serialize workflow and activity arguments and
* results.
*
* <p>Default is {@link GsonJsonDataConverter} data converter.
* <p>Default is {@link DataConverter#getDefaultInstance()}.
*/
public Builder setDataConverter(DataConverter dataConverter) {
this.dataConverter = Objects.requireNonNull(dataConverter);
Expand Down Expand Up @@ -155,7 +154,7 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
}
return new WorkflowClientOptions(
namespace == null ? DEFAULT_NAMESPACE : namespace,
dataConverter == null ? GsonJsonDataConverter.getInstance() : dataConverter,
dataConverter == null ? DataConverter.getDefaultInstance() : dataConverter,
interceptors == null ? EMPTY_INTERCEPTOR_ARRAY : interceptors,
name,
contextPropagators == null ? EMPTY_CONTEXT_PROPAGATORS : contextPropagators,
Expand Down
53 changes: 22 additions & 31 deletions src/main/java/io/temporal/client/WorkflowException.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,28 @@

package io.temporal.client;

import io.temporal.failure.TemporalException;
import io.temporal.proto.common.WorkflowExecution;
import io.temporal.workflow.ChildWorkflowException;
import java.util.Objects;
import java.util.Optional;

/**
* Base exception for all workflow failures returned by an external client. Note that inside a
* workflow implementation child workflows throw subclasses of {@link ChildWorkflowException}.
*/
public class WorkflowException extends RuntimeException {
/** Base exception for all workflow failures. */
public abstract class WorkflowException extends TemporalException {

private final WorkflowExecution execution;
private final Optional<String> workflowType;

protected WorkflowException(
String message, WorkflowExecution execution, Optional<String> workflowType, Throwable cause) {
super(getMessage(message, execution, workflowType), cause);
this.execution = execution;
this.workflowType = workflowType;
protected WorkflowException(WorkflowExecution execution, String workflowType, Throwable cause) {
super(getMessage(execution, workflowType), cause);
this.execution = Objects.requireNonNull(execution);
this.workflowType = Optional.ofNullable(workflowType);
}

private static String getMessage(
String message, WorkflowExecution execution, Optional<String> workflowType) {
StringBuilder result = new StringBuilder();
if (message != null) {
result.append(message);
result.append(", ");
}
if (workflowType.isPresent()) {
result.append("WorkflowType=\"");
result.append(workflowType.get());
}
if (execution != null) {
if (result.length() > 0) {
result.append("\", ");
}
result.append("WorkflowExecution=\"");
result.append(execution);
result.append("\"");
}
return result.toString();
protected WorkflowException(
String message, WorkflowExecution execution, String workflowType, Throwable cause) {
super(message, cause);
this.execution = Objects.requireNonNull(execution);
this.workflowType = Optional.ofNullable(workflowType);
}

public WorkflowExecution getExecution() {
Expand All @@ -68,4 +50,13 @@ public WorkflowExecution getExecution() {
public Optional<String> getWorkflowType() {
return workflowType;
}

public static String getMessage(WorkflowExecution execution, String workflowType) {
return "workflowId='"
+ execution.getWorkflowId()
+ "', runId='"
+ execution.getRunId()
+ (workflowType == null ? "" : "', workflowType='" + workflowType + '\'')
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.temporal.client;

import io.temporal.proto.common.WorkflowExecution;
import java.util.Optional;

/**
* This exception is thrown in the following cases:
Expand Down Expand Up @@ -50,10 +49,9 @@
* io.temporal.proto.common.WorkflowIdReusePolicy#AllowDuplicate}
* </ul>
*/
public final class DuplicateWorkflowException extends WorkflowException {

public DuplicateWorkflowException(
WorkflowExecution execution, String workflowType, String message) {
super(message, execution, Optional.of(workflowType), null);
public final class WorkflowExecutionAlreadyStarted extends WorkflowException {
public WorkflowExecutionAlreadyStarted(
WorkflowExecution execution, String workflowType, Throwable cause) {
super(execution, workflowType, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,54 @@

package io.temporal.client;

import io.temporal.proto.common.RetryStatus;
import io.temporal.proto.common.WorkflowExecution;
import java.util.Optional;

/**
* Indicates that a workflow failed. An original cause of the workflow failure can be retrieved
* through {@link #getCause()}.
*/
public final class WorkflowFailureException extends WorkflowException {
public final class WorkflowFailedException extends WorkflowException {

private final RetryStatus retryStatus;
private final long decisionTaskCompletedEventId;

public WorkflowFailureException(
WorkflowExecution execution,
Optional<String> workflowType,
public WorkflowFailedException(
WorkflowExecution workflowExecution,
String workflowType,
long decisionTaskCompletedEventId,
Throwable failure) {
super(getMessage(execution, workflowType), execution, workflowType, failure);
RetryStatus retryStatus,
Throwable cause) {
super(
getMessage(workflowExecution, workflowType, decisionTaskCompletedEventId, retryStatus),
workflowExecution,
workflowType,
cause);
this.retryStatus = retryStatus;
this.decisionTaskCompletedEventId = decisionTaskCompletedEventId;
}

private static String getMessage(WorkflowExecution execution, Optional<String> workflowType) {
StringBuilder result = new StringBuilder();
if (workflowType.isPresent()) {
result.append("WorkflowType=\"");
result.append(workflowType.get());
result.append("\", ");
}
result.append("WorkflowId=\"");
result.append(execution.getWorkflowId());
result.append("\", RunId=\"");
result.append(execution.getRunId());
return result.toString();
public RetryStatus getRetryStatus() {
return retryStatus;
}

public long getDecisionTaskCompletedEventId() {
return decisionTaskCompletedEventId;
}

public static String getMessage(
WorkflowExecution workflowExecution,
String workflowType,
long decisionTaskCompletedEventId,
RetryStatus retryStatus) {
return "workflowId='"
+ workflowExecution.getWorkflowId()
+ "', runId='"
+ workflowExecution.getRunId()
+ (workflowType == null ? "'" : "', workflowType='" + workflowType + '\'')
+ ", retryStatus="
+ retryStatus
+ ", decisionTaskCompletedEventId="
+ decisionTaskCompletedEventId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
package io.temporal.client;

import io.temporal.proto.common.WorkflowExecution;
import java.util.Optional;

/**
* Thrown when workflow with the given id is not known to the Temporal service. It could be because
* id is not correct or workflow was purged from the service after reaching its retention limit.
*/
public final class WorkflowNotFoundException extends WorkflowException {

public WorkflowNotFoundException(
WorkflowExecution execution, Optional<String> workflowType, String message) {
super(message, execution, workflowType, null);
public WorkflowNotFoundException(WorkflowExecution execution, String workflowType) {
super(execution, workflowType, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
package io.temporal.client;

import io.temporal.proto.common.WorkflowExecution;
import java.util.Optional;

public class WorkflowQueryException extends WorkflowException {

public WorkflowQueryException(WorkflowExecution execution, String message) {
super(message, execution, Optional.empty(), null);
public WorkflowQueryException(WorkflowExecution execution, String workflowType, Throwable cause) {
super(execution, workflowType, cause);
}
}
Loading