diff --git a/build.gradle b/build.gradle
index 0a1d3e8da0..14b5eee778 100644
--- a/build.gradle
+++ b/build.gradle
@@ -31,9 +31,9 @@ ext {
// Platforms
grpcVersion = '1.51.0' // [1.34.0,)
jacksonVersion = '2.14.0' // [2.9.0,)
- micrometerVersion = '1.9.5' // [1.0.0,)
+ micrometerVersion = '1.9.6' // [1.0.0,) // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
- slf4jVersion = '1.7.36' // [1.4.0,) // stay on 1.x for a while to don't use any APIs from 2.x which may break our users which decide on 1.x
+ slf4jVersion = '1.7.36' // [1.4.0,) // stay on 1.x for a while to don't use any APIs from 2.x which may break our users which stay on 1.x
protoVersion = '3.21.9' // [3.10.0,)
annotationApiVersion = '1.3.2'
guavaVersion = '31.1-jre' // [10.0,)
@@ -46,7 +46,7 @@ ext {
springBootVersion = '2.7.5'// [2.4.0,)
// test scoped
- logbackVersion = '1.2.11'
+ logbackVersion = '1.2.11' // we don't upgrade to 1.3 and 1.4 because they require slf4j 2.x
mockitoVersion = '4.9.0'
junitVersion = '4.13.2'
}
diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java
index 504a4388f5..bad70e53b5 100644
--- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java
@@ -147,9 +147,23 @@ public Builder setTaskQueue(String taskQueue) {
}
/**
- * RetryOptions that define how activity is retried in case of failure. If this is not set, then
- * the server-defined default activity retry policy will be used. To ensure zero retries, set
- * maximum attempts to 1.
+ * RetryOptions that define how an Activity is retried in case of failure.
+ *
+ *
If not provided, the server-defined default activity retry policy will be used. If not
+ * overridden, the server default activity retry policy is:
+ *
+ *
+ * InitialInterval: 1 second
+ * BackoffCoefficient: 2
+ * MaximumInterval: 100 seconds // 100 * InitialInterval
+ * MaximumAttempts: 0 // Unlimited
+ * NonRetryableErrorTypes: []
+ *
+ *
+ * If both {@link #setScheduleToCloseTimeout(Duration)} and {@link
+ * RetryOptions.Builder#setMaximumAttempts(int)} are not set, the Activity will not be retried.
+ *
+ *
To ensure zero retries, set {@link RetryOptions.Builder#setMaximumAttempts(int)} to 1.
*/
public Builder setRetryOptions(RetryOptions retryOptions) {
this.retryOptions = retryOptions;
diff --git a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java
index edc1a9e347..7b3c5847a9 100644
--- a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java
@@ -126,8 +126,22 @@ public Builder mergeActivityOptions(LocalActivityOptions override) {
}
/**
- * {@link RetryOptions} that define how an Activity is retried in case of failure. Activities
- * use a default RetryPolicy if not provided.
+ * {@link RetryOptions} that define how an Activity is retried in case of failure.
+ *
+ *
If not provided, the default activity retry policy is:
+ *
+ *
+ * InitialInterval: 1 second
+ * BackoffCoefficient: 2
+ * MaximumInterval: 100 seconds // 100 * InitialInterval
+ * MaximumAttempts: 0 // Unlimited
+ * NonRetryableErrorTypes: []
+ *
+ *
+ * If both {@link #setScheduleToCloseTimeout(Duration)} and {@link
+ * RetryOptions.Builder#setMaximumAttempts(int)} are not set, the Activity will not be retried.
+ *
+ *
To ensure zero retries, set {@link RetryOptions.Builder#setMaximumAttempts(int)} to 1.
*/
public Builder setRetryOptions(RetryOptions retryOptions) {
this.retryOptions = retryOptions;
diff --git a/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java b/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java
index cc83b00426..ad4d0dad62 100644
--- a/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/common/RetryOptions.java
@@ -20,6 +20,7 @@
package io.temporal.common;
+import com.google.common.base.Preconditions;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.CanceledFailure;
@@ -33,6 +34,7 @@
public final class RetryOptions {
private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;
+ private static final Duration DEFAULT_INITIAL_INTERVAL = Duration.ofSeconds(1);
private static final int DEFAULT_MAXIMUM_MULTIPLIER = 100;
public static Builder newBuilder() {
@@ -130,8 +132,6 @@ private void validate() {
public static final class Builder {
- private static final Duration DEFAULT_INITIAL_INTERVAL = Duration.ofSeconds(1);
-
private Duration initialInterval;
private double backoffCoefficient;
@@ -168,11 +168,12 @@ public Builder setInitialInterval(Duration initialInterval) {
/**
* Coefficient used to calculate the next retry interval. The next retry interval is previous
* interval multiplied by this coefficient. Must be 1 or larger. Default is 2.0.
+ *
+ * @throws IllegalArgumentException if {@code backoffCoefficient} is less than 1.0
*/
public Builder setBackoffCoefficient(double backoffCoefficient) {
- if (backoffCoefficient < 1d) {
- throw new IllegalArgumentException("coefficient less than 1.0: " + backoffCoefficient);
- }
+ Preconditions.checkArgument(
+ backoffCoefficient >= 1, "backoffCoefficient must be >= 1, was %s", backoffCoefficient);
this.backoffCoefficient = backoffCoefficient;
return this;
}
@@ -182,11 +183,11 @@ public Builder setBackoffCoefficient(double backoffCoefficient) {
* Default is unlimited.
*
* @param maximumAttempts Maximum number of attempts. Default will be used if set to {@code 0}.
+ * @throws IllegalArgumentException if {@code maximumAttempts} is less than 0
*/
public Builder setMaximumAttempts(int maximumAttempts) {
- if (maximumAttempts < 0) {
- throw new IllegalArgumentException("Invalid maximumAttempts: " + maximumAttempts);
- }
+ Preconditions.checkArgument(
+ maximumAttempts >= 0, "maximumAttempts must be >= 0, was %s", maximumAttempts);
this.maximumAttempts = maximumAttempts;
return this;
}
@@ -198,11 +199,13 @@ public Builder setMaximumAttempts(int maximumAttempts) {
*
* @param maximumInterval the maximum interval value. Default will be used if set to {@code
* null}.
+ * @throws IllegalArgumentException if {@code maximumInterval} is not null and not positive
*/
public Builder setMaximumInterval(Duration maximumInterval) {
- if (maximumInterval != null && (maximumInterval.isNegative() || maximumInterval.isZero())) {
- throw new IllegalArgumentException("Invalid maximum interval: " + maximumInterval);
- }
+ Preconditions.checkArgument(
+ maximumInterval == null || maximumInterval.compareTo(Duration.ZERO) > 0,
+ "Invalid maximum interval: %s",
+ maximumInterval);
this.maximumInterval = maximumInterval;
return this;
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java
new file mode 100644
index 0000000000..322028ed62
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/internal/common/RetryOptionsUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.internal.common;
+
+import io.grpc.Deadline;
+import io.temporal.api.common.v1.RetryPolicy;
+import io.temporal.common.RetryOptions;
+import io.temporal.failure.ActivityFailure;
+import io.temporal.failure.ApplicationFailure;
+import io.temporal.failure.ChildWorkflowFailure;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+public class RetryOptionsUtils {
+ public static boolean isNotRetryable(RetryOptions o, @Nullable Throwable e) {
+ if (e == null) {
+ return false;
+ }
+ if (e instanceof ActivityFailure || e instanceof ChildWorkflowFailure) {
+ e = e.getCause();
+ }
+ String type =
+ e instanceof ApplicationFailure
+ ? ((ApplicationFailure) e).getType()
+ : e.getClass().getName();
+ if (o.getDoNotRetry() != null) {
+ for (String doNotRetry : o.getDoNotRetry()) {
+ if (doNotRetry.equals(type)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public static boolean areAttemptsReached(RetryOptions o, long attempt) {
+ return (o.getMaximumAttempts() != 0 && attempt >= o.getMaximumAttempts());
+ }
+
+ public static boolean isDeadlineReached(@Nullable Deadline deadline, long sleepTimeMs) {
+ return deadline != null && deadline.timeRemaining(TimeUnit.MILLISECONDS) < sleepTimeMs;
+ }
+
+ public static RetryOptions toRetryOptions(RetryPolicy retryPolicy) {
+ RetryOptions.Builder roBuilder = RetryOptions.newBuilder();
+
+ Duration maximumInterval = ProtobufTimeUtils.toJavaDuration(retryPolicy.getMaximumInterval());
+ if (!maximumInterval.isZero()) {
+ roBuilder.setMaximumInterval(maximumInterval);
+ }
+
+ Duration initialInterval = ProtobufTimeUtils.toJavaDuration(retryPolicy.getInitialInterval());
+ if (!initialInterval.isZero()) {
+ roBuilder.setInitialInterval(initialInterval);
+ }
+
+ if (retryPolicy.getBackoffCoefficient() >= 1) {
+ roBuilder.setBackoffCoefficient(retryPolicy.getBackoffCoefficient());
+ }
+
+ if (retryPolicy.getMaximumAttempts() > 0) {
+ roBuilder.setMaximumAttempts(retryPolicy.getMaximumAttempts());
+ }
+
+ roBuilder.setDoNotRetry(
+ retryPolicy
+ .getNonRetryableErrorTypesList()
+ .toArray(new String[retryPolicy.getNonRetryableErrorTypesCount()]));
+
+ return roBuilder.validateBuildWithDefaults();
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java
index f1fcc323ae..49274f5f3e 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java
@@ -57,7 +57,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiFunction;
/**
* Implements workflow executor that relies on replay of a workflow code. An instance of this class
@@ -70,12 +69,12 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
private final Lock lock = new ReentrantLock();
- private final Functions.Proc1 localActivityCompletionSink;
+ private final Functions.Proc1 localActivityCompletionSink;
- private final BlockingQueue localActivityCompletionQueue =
+ private final BlockingQueue localActivityCompletionQueue =
new LinkedBlockingDeque<>();
- private final BiFunction localActivityTaskPoller;
+ private final LocalActivityDispatcher localActivityDispatcher;
private final ReplayWorkflow workflow;
@@ -93,7 +92,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
PollWorkflowTaskQueueResponseOrBuilder workflowTask,
SingleWorkerOptions workerOptions,
Scope metricsScope,
- BiFunction localActivityTaskPoller) {
+ LocalActivityDispatcher localActivityDispatcher) {
HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0);
if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) {
throw new IllegalArgumentException(
@@ -101,7 +100,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
}
this.startedEvent = startedEvent.getWorkflowExecutionStartedEventAttributes();
this.metricsScope = metricsScope;
- this.localActivityTaskPoller = localActivityTaskPoller;
+ this.localActivityDispatcher = localActivityDispatcher;
this.workflow = workflow;
this.workflowStateMachines = new WorkflowStateMachines(new StatesMachinesCallbackImpl());
@@ -286,10 +285,10 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx
// much sense. I believe we should add ScheduleToStart timeout for the local activities
// as well.
long maxWaitTimeNs = Math.max(nextWFTHeartbeatTimeNs - System.nanoTime(), 0);
- boolean accepted =
- localActivityTaskPoller.apply(
- new LocalActivityTask(laRequest, localActivityCompletionSink),
- Duration.ofNanos(maxWaitTimeNs));
+ laRequest.setScheduleToStartTimeout(Duration.ofNanos(maxWaitTimeNs));
+ boolean accepted = localActivityDispatcher.dispatch(laRequest, localActivityCompletionSink);
+ // TODO this needs to be reworked when we implement a proper scheduleToStart to report a
+ // Failure in the callback. A proper test is also needed for this scenario.
Preconditions.checkState(
accepted,
"Unable to schedule local activity for execution, "
@@ -302,7 +301,7 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx
}
long maxWaitTimeTillHeartbeatNs = Math.max(nextWFTHeartbeatTimeNs - System.nanoTime(), 0);
- ActivityTaskHandler.Result laCompletion =
+ LocalActivityResult laCompletion =
localActivityCompletionQueue.poll(maxWaitTimeTillHeartbeatNs, TimeUnit.NANOSECONDS);
if (laCompletion == null) {
// Need to force a new task as we are out of time
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java
index 9c8d495b86..34f066de0b 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java
@@ -51,7 +51,6 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +66,7 @@ public final class ReplayWorkflowTaskHandler implements WorkflowTaskHandler {
private final Duration stickyTaskQueueScheduleToStartTimeout;
private final WorkflowServiceStubs service;
private final String stickyTaskQueueName;
- private final BiFunction localActivityTaskPoller;
+ private final LocalActivityDispatcher localActivityDispatcher;
public ReplayWorkflowTaskHandler(
String namespace,
@@ -77,7 +76,7 @@ public ReplayWorkflowTaskHandler(
String stickyTaskQueueName,
Duration stickyTaskQueueScheduleToStartTimeout,
WorkflowServiceStubs service,
- BiFunction localActivityTaskPoller) {
+ LocalActivityDispatcher localActivityDispatcher) {
this.namespace = namespace;
this.workflowFactory = asyncWorkflowFactory;
this.cache = cache;
@@ -85,7 +84,7 @@ public ReplayWorkflowTaskHandler(
this.stickyTaskQueueName = stickyTaskQueueName;
this.stickyTaskQueueScheduleToStartTimeout = stickyTaskQueueScheduleToStartTimeout;
this.service = Objects.requireNonNull(service);
- this.localActivityTaskPoller = localActivityTaskPoller;
+ this.localActivityDispatcher = localActivityDispatcher;
}
@Override
@@ -365,7 +364,7 @@ private WorkflowRunTaskHandler createStatefulHandler(
}
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
return new ReplayWorkflowRunTaskHandler(
- namespace, workflow, workflowTask, options, metricsScope, localActivityTaskPoller);
+ namespace, workflow, workflowTask, options, metricsScope, localActivityDispatcher);
}
private void resetStickyTaskQueue(WorkflowExecution execution) {
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java
index 709b1200dd..36f1b724ed 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java
@@ -20,42 +20,113 @@
package io.temporal.internal.statemachines;
+import io.temporal.api.common.v1.ActivityType;
+import io.temporal.api.common.v1.Payloads;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
+import io.temporal.internal.common.ProtobufTimeUtils;
import java.time.Duration;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
public class ExecuteLocalActivityParameters {
- private final PollActivityTaskQueueResponse.Builder activityTask;
+ public static final long NOT_SCHEDULED = -1;
+
+ // This builder doesn't have all the fields published yet (a specific attempt for example)
+ // It contains only the fields known at the moment of scheduling from the workflow.
+ // This template gets adjusted for each attempt.
+ private final @Nonnull PollActivityTaskQueueResponse.Builder activityTaskBuilder;
+ private final @Nonnull PollActivityTaskQueueResponse initialActivityTask;
+
private final Duration localRetryThreshold;
private final boolean doNotIncludeArgumentsIntoMarker;
+ private @Nullable Duration scheduleToStartTimeout;
+
+ /**
+ * Timestamp of the moment when the first attempt of this local activity was scheduled. Comes into
+ * play when localRetryThreshold is reached. If {@link #NOT_SCHEDULED} then the first attempt was
+ * not scheduled yet, and we are going to do it now locally. This timestamp is registered by the
+ * worker performing the first attempt, so this mechanic needs reasonably synchronized worker
+ * clocks.
+ */
+ private long originalScheduledTimestamp = NOT_SCHEDULED;
+
public ExecuteLocalActivityParameters(
- PollActivityTaskQueueResponse.Builder activityTask,
- Duration localRetryThreshold,
- boolean doNotIncludeArgumentsIntoMarker) {
- this.activityTask = activityTask;
- this.localRetryThreshold = localRetryThreshold;
+ @Nonnull PollActivityTaskQueueResponse.Builder activityTaskBuilder,
+ @Nullable Duration scheduleToStartTimeout,
+ boolean doNotIncludeArgumentsIntoMarker,
+ Duration localRetryThreshold) {
+ this.activityTaskBuilder = Objects.requireNonNull(activityTaskBuilder, "activityTaskBuilder");
+ this.initialActivityTask = activityTaskBuilder.build();
+ this.scheduleToStartTimeout = scheduleToStartTimeout;
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
+ this.localRetryThreshold = localRetryThreshold;
}
- public PollActivityTaskQueueResponse.Builder getActivityTask() {
- return activityTask;
+ public String getActivityId() {
+ return activityTaskBuilder.getActivityId();
}
- public Duration getLocalRetryThreshold() {
- return localRetryThreshold;
+ public ActivityType getActivityType() {
+ return activityTaskBuilder.getActivityType();
+ }
+
+ public Payloads getInput() {
+ return activityTaskBuilder.getInput();
+ }
+
+ public int getInitialAttempt() {
+ return initialActivityTask.getAttempt();
+ }
+
+ /*
+ * TODO This setter is exposed and the field is made non-final to support the legacy calculation of this timeout.
+ * This legacy logic doesn't make much sense anymore in presence of workflow task heartbeat and should be replaced
+ * with an explicit schedule to start timeout coming from the local activity options.
+ */
+ public void setScheduleToStartTimeout(@Nullable Duration scheduleToStartTimeout) {
+ this.scheduleToStartTimeout = scheduleToStartTimeout;
+ }
+
+ @Nullable
+ public Duration getScheduleToStartTimeout() {
+ return scheduleToStartTimeout;
+ }
+
+ @Nullable
+ public Duration getScheduleToCloseTimeout() {
+ if (activityTaskBuilder.hasScheduleToCloseTimeout()) {
+ return ProtobufTimeUtils.toJavaDuration(activityTaskBuilder.getScheduleToCloseTimeout());
+ } else {
+ return null;
+ }
}
public boolean isDoNotIncludeArgumentsIntoMarker() {
return doNotIncludeArgumentsIntoMarker;
}
- @Override
- public String toString() {
- return "ExecuteLocalActivityParameters{"
- + "activityTask="
- + activityTask
- + ", localRetryThreshold="
- + localRetryThreshold
- + '}';
+ public Duration getLocalRetryThreshold() {
+ return localRetryThreshold;
+ }
+
+ public void setOriginalScheduledTimestamp(long scheduledTimestamp) {
+ this.originalScheduledTimestamp = scheduledTimestamp;
+ }
+
+ public long getOriginalScheduledTimestamp() {
+ return originalScheduledTimestamp;
+ }
+
+ @Nonnull
+ public PollActivityTaskQueueResponse getInitialActivityTask() {
+ return initialActivityTask;
+ }
+
+ // Keep usage of this method limited as modifying the protobuf builder is not thread safe
+ @Nonnull
+ public PollActivityTaskQueueResponse.Builder getActivityTaskBuilder() {
+ return activityTaskBuilder;
}
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java
index 77019d66ad..9f0ec6303b 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java
@@ -26,20 +26,17 @@
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
-import io.temporal.api.enums.v1.RetryState;
import io.temporal.api.failure.v1.ActivityFailureInfo;
import io.temporal.api.failure.v1.CanceledFailureInfo;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
-import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.StdConverterBackwardsCompatAdapter;
-import io.temporal.failure.FailureConverter;
import io.temporal.internal.history.LocalActivityMarkerUtils;
import io.temporal.internal.history.MarkerUtils;
-import io.temporal.internal.worker.ActivityTaskHandler;
+import io.temporal.internal.worker.LocalActivityResult;
import io.temporal.workflow.Functions;
import java.util.HashMap;
import java.util.Map;
@@ -51,6 +48,14 @@ final class LocalActivityStateMachine
LocalActivityStateMachine.State,
LocalActivityStateMachine.ExplicitEvent,
LocalActivityStateMachine> {
+ static final String LOCAL_ACTIVITY_FAILED_MESSAGE =
+ "Local " + ActivityStateMachine.ACTIVITY_FAILED_MESSAGE;
+
+ static final String LOCAL_ACTIVITY_TIMED_OUT_MESSAGE =
+ "Local " + ActivityStateMachine.ACTIVITY_TIMED_OUT_MESSAGE;
+
+ static final String LOCAL_ACTIVITY_CANCELED_MESSAGE =
+ "Local " + ActivityStateMachine.ACTIVITY_CANCELED_MESSAGE;
private final Functions.Proc1 localActivityRequestSink;
private final Functions.Proc2, Failure> callback;
@@ -60,7 +65,6 @@ final class LocalActivityStateMachine
/** Accepts proposed current time. Returns accepted current time. */
private final Functions.Func1 setCurrentTimeCallback;
- private final boolean hasRetryPolicy;
private final String activityId;
private final ActivityType activityType;
@@ -73,63 +77,9 @@ final class LocalActivityStateMachine
private final long systemNanoTimeWhenStarted;
private Failure failure;
- private ActivityTaskHandler.Result result;
+ private LocalActivityResult result;
private Optional laResult;
- /**
- * Creates new local activity marker
- *
- * @param localActivityParameters used to produce side effect value. null if replaying.
- * @param callback returns side effect value or failure
- * @param commandSink callback to send commands to
- */
- public static LocalActivityStateMachine newInstance(
- Functions.Func replaying,
- Functions.Func1 setCurrentTimeCallback,
- ExecuteLocalActivityParameters localActivityParameters,
- Functions.Proc2, Failure> callback,
- Functions.Proc1 localActivityRequestSink,
- Functions.Proc1 commandSink,
- Functions.Proc1 stateMachineSink,
- long workflowTimeMillisWhenStarted) {
- return new LocalActivityStateMachine(
- replaying,
- setCurrentTimeCallback,
- localActivityParameters,
- callback,
- localActivityRequestSink,
- commandSink,
- stateMachineSink,
- workflowTimeMillisWhenStarted,
- System.nanoTime());
- }
-
- private LocalActivityStateMachine(
- Functions.Func replaying,
- Functions.Func1 setCurrentTimeCallback,
- ExecuteLocalActivityParameters localActivityParameters,
- Functions.Proc2, Failure> callback,
- Functions.Proc1 localActivityRequestSink,
- Functions.Proc1 commandSink,
- Functions.Proc1 stateMachineSink,
- long workflowTimeMillisWhenStarted,
- long systemNanoTimeWhenStarted) {
- super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
- this.replaying = replaying;
- this.setCurrentTimeCallback = setCurrentTimeCallback;
- this.localActivityParameters = localActivityParameters;
- PollActivityTaskQueueResponse.Builder activityTask = localActivityParameters.getActivityTask();
- this.hasRetryPolicy = activityTask.hasRetryPolicy();
- this.activityId = activityTask.getActivityId();
- this.activityType = activityTask.getActivityType();
- this.localActivityRequestSink = localActivityRequestSink;
- this.callback = callback;
- this.workflowTimeMillisWhenStarted = workflowTimeMillisWhenStarted;
- this.systemNanoTimeWhenStarted = systemNanoTimeWhenStarted;
- explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
- explicitEvent(ExplicitEvent.SCHEDULE);
- }
-
enum ExplicitEvent {
CHECK_EXECUTION_STATE,
SCHEDULE,
@@ -205,6 +155,58 @@ enum State {
State.REQUEST_PREPARED,
LocalActivityStateMachine::sendRequest);
+ /**
+ * Creates new local activity marker
+ *
+ * @param localActivityParameters used to produce side effect value. null if replaying.
+ * @param callback returns side effect value or failure
+ * @param commandSink callback to send commands to
+ */
+ public static LocalActivityStateMachine newInstance(
+ Functions.Func replaying,
+ Functions.Func1 setCurrentTimeCallback,
+ ExecuteLocalActivityParameters localActivityParameters,
+ Functions.Proc2, Failure> callback,
+ Functions.Proc1 localActivityRequestSink,
+ Functions.Proc1 commandSink,
+ Functions.Proc1 stateMachineSink,
+ long workflowTimeMillisWhenStarted) {
+ return new LocalActivityStateMachine(
+ replaying,
+ setCurrentTimeCallback,
+ localActivityParameters,
+ callback,
+ localActivityRequestSink,
+ commandSink,
+ stateMachineSink,
+ workflowTimeMillisWhenStarted,
+ System.nanoTime());
+ }
+
+ private LocalActivityStateMachine(
+ Functions.Func replaying,
+ Functions.Func1 setCurrentTimeCallback,
+ ExecuteLocalActivityParameters localActivityParameters,
+ Functions.Proc2, Failure> callback,
+ Functions.Proc1 localActivityRequestSink,
+ Functions.Proc1 commandSink,
+ Functions.Proc1 stateMachineSink,
+ long workflowTimeMillisWhenStarted,
+ long systemNanoTimeWhenStarted) {
+ super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
+ this.replaying = replaying;
+ this.setCurrentTimeCallback = setCurrentTimeCallback;
+ this.localActivityParameters = localActivityParameters;
+ this.activityId = localActivityParameters.getActivityId();
+ this.activityType = localActivityParameters.getActivityType();
+ this.localActivityRequestSink = localActivityRequestSink;
+ this.callback = callback;
+ this.workflowTimeMillisWhenStarted = workflowTimeMillisWhenStarted;
+ this.systemNanoTimeWhenStarted = systemNanoTimeWhenStarted;
+ explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
+ explicitEvent(ExplicitEvent.SCHEDULE);
+ }
+
State getExecutionState() {
return replaying.apply() ? State.REPLAYING : State.EXECUTING;
}
@@ -228,7 +230,7 @@ public void markAsSent() {
explicitEvent(ExplicitEvent.MARK_AS_SENT);
}
- public void handleCompletion(ActivityTaskHandler.Result result) {
+ public void handleCompletion(LocalActivityResult result) {
this.result = result;
explicitEvent(ExplicitEvent.HANDLE_RESULT);
}
@@ -260,11 +262,10 @@ private void createMarker() {
if (localActivityParameters != null
&& !localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
details.put(
- LocalActivityMarkerUtils.MARKER_ACTIVITY_INPUT_KEY,
- localActivityParameters.getActivityTask().getInput());
+ LocalActivityMarkerUtils.MARKER_ACTIVITY_INPUT_KEY, localActivityParameters.getInput());
}
- if (result.getTaskCompleted() != null) {
- RespondActivityTaskCompletedRequest completed = result.getTaskCompleted();
+ if (result.getExecutionCompleted() != null) {
+ RespondActivityTaskCompletedRequest completed = result.getExecutionCompleted();
if (completed.hasResult()) {
Payloads p = completed.getResult();
laResult = Optional.of(p);
@@ -272,28 +273,28 @@ private void createMarker() {
} else {
laResult = Optional.empty();
}
- } else if (result.getTaskFailed() != null) {
- // TODO(maxim): Result should contain Failure, not an exception
- ActivityTaskHandler.Result.TaskFailedResult failed = result.getTaskFailed();
- // TODO(maxim): Return RetryState in the result
- RetryState retryState =
- hasRetryPolicy
- ? RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED
- : RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET;
+ } else if (result.getExecutionFailed() != null) {
+ LocalActivityResult.ExecutionFailedResult failedResult = result.getExecutionFailed();
+ String message =
+ failedResult.isTimeout()
+ ? LOCAL_ACTIVITY_TIMED_OUT_MESSAGE
+ : LOCAL_ACTIVITY_FAILED_MESSAGE;
failure =
Failure.newBuilder()
+ .setMessage(message)
.setActivityFailureInfo(
ActivityFailureInfo.newBuilder()
- .setRetryState(retryState)
+ .setRetryState(failedResult.getRetryState())
.setActivityId(activityId)
.setActivityType(activityType))
- .setCause(FailureConverter.exceptionToFailure(failed.getFailure()))
+ .setCause(failedResult.getFailure())
.build();
markerAttributes.setFailure(failure);
- } else if (result.getTaskCanceled() != null) {
- RespondActivityTaskCanceledRequest failed = result.getTaskCanceled();
+ } else if (result.getExecutionCanceled() != null) {
+ RespondActivityTaskCanceledRequest failed = result.getExecutionCanceled();
markerAttributes.setFailure(
Failure.newBuilder()
+ .setMessage(LOCAL_ACTIVITY_CANCELED_MESSAGE)
.setCanceledFailureInfo(
CanceledFailureInfo.newBuilder().setDetails(failed.getDetails())));
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
index eb895293a6..bdb92b55ef 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
@@ -46,7 +46,7 @@
import io.temporal.internal.history.LocalActivityMarkerUtils;
import io.temporal.internal.history.VersionMarkerUtils;
import io.temporal.internal.sync.WorkflowThread;
-import io.temporal.internal.worker.ActivityTaskHandler;
+import io.temporal.internal.worker.LocalActivityResult;
import io.temporal.worker.NonDeterministicException;
import io.temporal.workflow.ChildWorkflowCancellationType;
import io.temporal.workflow.Functions;
@@ -448,9 +448,9 @@ private void prepareImpl() {
* Local activity is different from all other entities. It doesn't schedule a marker command when
* the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
* called. The marker is scheduled only when activity completes through ({@link
- * #handleLocalActivityCompletion(ActivityTaskHandler.Result)}). That's why the normal logic of
- * {@link #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent
- * command during replay, doesn't work. Instead, local activities are matched by their id using
+ * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
+ * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
+ * during replay, doesn't work. Instead, local activities are matched by their id using
* localActivityMap.
*
* @return true if matched and false if normal event handling should continue.
@@ -781,14 +781,13 @@ public List takeLocalActivityRequests() {
List result = localActivityRequests;
localActivityRequests = new ArrayList<>();
for (ExecuteLocalActivityParameters parameters : result) {
- LocalActivityStateMachine stateMachine =
- localActivityMap.get(parameters.getActivityTask().getActivityId());
+ LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
stateMachine.markAsSent();
}
return result;
}
- public void handleLocalActivityCompletion(ActivityTaskHandler.Result laCompletion) {
+ public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
String activityId = laCompletion.getActivityId();
LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
if (laStateMachine == null) {
@@ -802,7 +801,7 @@ public Functions.Proc scheduleLocalActivityTask(
ExecuteLocalActivityParameters parameters,
Functions.Proc2, Failure> callback) {
checkEventLoopExecuting();
- String activityId = parameters.getActivityTask().getActivityId();
+ String activityId = parameters.getActivityId();
if (Strings.isNullOrEmpty(activityId)) {
throw new IllegalArgumentException("Missing activityId: " + activityId);
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
index 8948a14b94..ea667d57bb 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
@@ -392,13 +392,21 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
.setWorkflowType(this.replayContext.getWorkflowType())
.setWorkflowExecution(this.replayContext.getWorkflowExecution())
.setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
- .setStartToCloseTimeout(
- ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
- .setScheduleToCloseTimeout(
- ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
.setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
.setActivityType(ActivityType.newBuilder().setName(name))
.setAttempt(attempt);
+
+ Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
+ if (scheduleToCloseTimeout != null) {
+ activityTask.setScheduleToCloseTimeout(
+ ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
+ }
+
+ Duration startToCloseTimeout = options.getStartToCloseTimeout();
+ if (startToCloseTimeout != null) {
+ activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
+ }
+
io.temporal.api.common.v1.Header grpcHeader =
toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
activityTask.setHeader(grpcHeader);
@@ -411,7 +419,7 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(6);
}
return new ExecuteLocalActivityParameters(
- activityTask, localRetryThreshold, options.isDoNotIncludeArgumentsIntoMarker());
+ activityTask, null, options.isDoNotIncludeArgumentsIntoMarker(), localRetryThreshold);
}
@Override
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTaskHandler.java
index 1c20e2388c..b5ba3be591 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTaskHandler.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTaskHandler.java
@@ -24,7 +24,6 @@
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
-import java.time.Duration;
/**
* Interface of an activity task handler.
@@ -40,8 +39,6 @@ final class Result {
private final TaskFailedResult taskFailed;
private final RespondActivityTaskCanceledRequest taskCanceled;
private final boolean manualCompletion;
- private int attempt;
- private Duration backoff;
@Override
public String toString() {
@@ -55,10 +52,6 @@ public String toString() {
+ taskFailed
+ ", taskCanceled="
+ taskCanceled
- + ", attempt="
- + attempt
- + ", backoff="
- + backoff
+ '}';
}
@@ -115,22 +108,6 @@ public RespondActivityTaskCanceledRequest getTaskCanceled() {
return taskCanceled;
}
- public void setAttempt(int attempt) {
- this.attempt = attempt;
- }
-
- public int getAttempt() {
- return attempt;
- }
-
- public void setBackoff(Duration backoff) {
- this.backoff = backoff;
- }
-
- public Duration getBackoff() {
- return backoff;
- }
-
public boolean isManualCompletion() {
return manualCompletion;
}
@@ -138,7 +115,7 @@ public boolean isManualCompletion() {
/**
* The implementation should be called when a polling activity worker receives a new activity
- * task. This method shouldn't throw any exception unless there is a need to not reply to the
+ * task. This method shouldn't throw any Throwables unless there is a need to not reply to the
* task.
*
* @param activityTask activity task which is response to PollActivityTaskQueue call.
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityAttemptTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityAttemptTask.java
new file mode 100644
index 0000000000..ad47f4fb5d
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityAttemptTask.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.internal.worker;
+
+import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
+import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
+import javax.annotation.Nonnull;
+
+// TODO This class is an absolutely trivial wrapper and may go away with reworking of local activity
+// scheduling from the generic poller classes.
+class LocalActivityAttemptTask {
+ private final @Nonnull LocalActivityExecutionContext executionContext;
+ private final @Nonnull PollActivityTaskQueueResponse attemptTask;
+
+ public LocalActivityAttemptTask(
+ @Nonnull LocalActivityExecutionContext executionContext,
+ @Nonnull PollActivityTaskQueueResponse attemptTask) {
+ this.executionContext = executionContext;
+ this.attemptTask = attemptTask;
+ }
+
+ @Nonnull
+ public LocalActivityExecutionContext getExecutionContext() {
+ return executionContext;
+ }
+
+ public String getActivityId() {
+ return executionContext.getActivityId();
+ }
+
+ @Nonnull
+ public ExecuteLocalActivityParameters getExecutionParams() {
+ return executionContext.getExecutionParams();
+ }
+
+ @Nonnull
+ public PollActivityTaskQueueResponse getAttemptTask() {
+ return attemptTask;
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityDispatcher.java
similarity index 59%
rename from temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityTask.java
rename to temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityDispatcher.java
index 8c588e54f3..f70dbc3b91 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityTask.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityDispatcher.java
@@ -23,26 +23,12 @@
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.workflow.Functions;
-public class LocalActivityTask {
- private final ExecuteLocalActivityParameters params;
- private final Functions.Proc1 resultCallback;
-
- public LocalActivityTask(
- ExecuteLocalActivityParameters params,
- Functions.Proc1 resultCallback) {
- this.params = params;
- this.resultCallback = resultCallback;
- }
-
- public String getActivityId() {
- return params.getActivityTask().getActivityId();
- }
-
- public ExecuteLocalActivityParameters getParams() {
- return params;
- }
-
- public Functions.Proc1 getResultCallback() {
- return resultCallback;
- }
+public interface LocalActivityDispatcher {
+ /**
+ * Synchronously dispatches the local activity to the local activity worker.
+ *
+ * @return true if the local activity was accepted, false if it was rejected
+ */
+ boolean dispatch(
+ ExecuteLocalActivityParameters params, Functions.Proc1 resultCallback);
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java
new file mode 100644
index 0000000000..91f1ed6838
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.internal.worker;
+
+import io.grpc.Deadline;
+import io.temporal.api.failure.v1.Failure;
+import io.temporal.api.workflow.v1.PendingActivityInfo;
+import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
+import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
+import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
+import io.temporal.workflow.Functions;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+class LocalActivityExecutionContext {
+ private final @Nonnull ExecuteLocalActivityParameters executionParams;
+ private final @Nonnull Deadline localRetryDeadline;
+ private final @Nonnull CompletableFuture executionResult =
+ new CompletableFuture<>();
+
+ private final @Nullable Deadline scheduleToCloseDeadline;
+
+ private final @Nonnull AtomicInteger currentAttempt;
+
+ private final @Nonnull AtomicReference lastFailure = new AtomicReference<>();
+
+ private @Nullable ScheduledFuture> scheduleToCloseFuture;
+
+ public LocalActivityExecutionContext(
+ @Nonnull ExecuteLocalActivityParameters executionParams,
+ @Nonnull Functions.Proc1 resultCallback,
+ @Nonnull Deadline localRetryDeadline,
+ @Nullable Deadline scheduleToCloseDeadline) {
+ this.executionParams = Objects.requireNonNull(executionParams, "executionParams");
+ this.executionResult.thenAccept(
+ Objects.requireNonNull(resultCallback, "resultCallback")::apply);
+ this.localRetryDeadline = localRetryDeadline;
+ this.scheduleToCloseDeadline = scheduleToCloseDeadline;
+ this.currentAttempt = new AtomicInteger(executionParams.getInitialAttempt());
+ }
+
+ public String getActivityId() {
+ return executionParams.getActivityId();
+ }
+
+ @Nonnull
+ public ExecuteLocalActivityParameters getExecutionParams() {
+ return executionParams;
+ }
+
+ @Nonnull
+ public Deadline getLocalRetryDeadline() {
+ return localRetryDeadline;
+ }
+
+ public int getCurrentAttempt() {
+ return currentAttempt.get();
+ }
+
+ /**
+ * The last failure preserved for this activity execution. This field is mimicking the behavior of
+ * {@link PendingActivityInfo#getLastFailure()} that is maintained by the server in the mutable
+ * state and is used to create ActivityFailures with meaningful causes and returned by {@link
+ * io.temporal.api.workflowservice.v1.WorkflowServiceGrpc.WorkflowServiceBlockingStub#describeWorkflowExecution(DescribeWorkflowExecutionRequest)}
+ */
+ @Nullable
+ public Failure getLastFailure() {
+ return lastFailure.get();
+ }
+
+ @Nonnull
+ public PollActivityTaskQueueResponse getNextAttemptActivityTask(@Nullable Failure lastFailure) {
+ // synchronization here is not absolutely needed as LocalActivityWorker#scheduleNextAttempt
+ // shouldn't be executed concurrently. But to make sure this code is safe for future changes,
+ // let's make this method atomic and protect thread-unsafe protobuf builder modification.
+
+ // executionResult here is used just as an internal monitor object that is final and never
+ // escapes the class
+ synchronized (executionResult) {
+ int nextAttempt = currentAttempt.incrementAndGet();
+ if (lastFailure != null) {
+ this.lastFailure.set(lastFailure);
+ }
+ return executionParams.getActivityTaskBuilder().setAttempt(nextAttempt).build();
+ }
+ }
+
+ @Nullable
+ public Deadline getScheduleToCloseDeadline() {
+ return scheduleToCloseDeadline;
+ }
+
+ public void setScheduleToCloseFuture(@Nullable ScheduledFuture> scheduleToCloseFuture) {
+ this.scheduleToCloseFuture = scheduleToCloseFuture;
+ }
+
+ public boolean callback(LocalActivityResult result) {
+ if (scheduleToCloseFuture != null) {
+ scheduleToCloseFuture.cancel(false);
+ }
+ return executionResult.complete(result);
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityPollTask.java
index 6db897aaa1..4b45542443 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityPollTask.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityPollTask.java
@@ -20,27 +20,23 @@
package io.temporal.internal.worker;
-import java.time.Duration;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class LocalActivityPollTask
- implements Poller.PollTask,
- BiFunction {
+final class LocalActivityPollTask implements Poller.PollTask {
private static final Logger log = LoggerFactory.getLogger(LocalActivityPollTask.class);
- private static final int QUEUE_SIZE = 1000;
- private final BlockingQueue pendingTasks =
- new ArrayBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue pendingTasks;
+
+ public LocalActivityPollTask(BlockingQueue pendingTasks) {
+ this.pendingTasks = pendingTasks;
+ }
@Override
- public LocalActivityTask poll() {
+ public LocalActivityAttemptTask poll() {
try {
- LocalActivityTask task = pendingTasks.take();
+ LocalActivityAttemptTask task = pendingTasks.take();
log.trace("LocalActivity Task poll returned: {}", task.getActivityId());
return task;
} catch (InterruptedException e) {
@@ -48,23 +44,4 @@ public LocalActivityTask poll() {
return null;
}
}
-
- @Override
- public Boolean apply(LocalActivityTask task, Duration maxWaitAllowed) {
- try {
- boolean accepted = pendingTasks.offer(task, maxWaitAllowed.toMillis(), TimeUnit.MILLISECONDS);
- if (accepted) {
- log.trace("LocalActivity queued: {}", task.getActivityId());
- } else {
- log.trace(
- "LocalActivity queue submitting timed out for activity {}, maxWaitAllowed: {}",
- task.getActivityId(),
- maxWaitAllowed);
- }
- return accepted;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityResult.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityResult.java
new file mode 100644
index 0000000000..33c755a27b
--- /dev/null
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityResult.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.internal.worker;
+
+import io.temporal.api.enums.v1.RetryState;
+import io.temporal.api.failure.v1.Failure;
+import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
+import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
+import java.time.Duration;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+public final class LocalActivityResult {
+ private final @Nonnull String activityId;
+ private final @Nullable RespondActivityTaskCompletedRequest executionCompleted;
+ private final @Nullable ExecutionFailedResult executionFailed;
+ private final @Nullable RespondActivityTaskCanceledRequest executionCanceled;
+
+ static LocalActivityResult completed(ActivityTaskHandler.Result ahResult) {
+ return new LocalActivityResult(
+ ahResult.getActivityId(), ahResult.getTaskCompleted(), null, null);
+ }
+
+ static LocalActivityResult failed(
+ String activityId,
+ RetryState retryState,
+ Failure timeoutFailure,
+ int attempt,
+ @Nullable Duration backoff) {
+ ExecutionFailedResult failedResult =
+ new ExecutionFailedResult(retryState, timeoutFailure, attempt, backoff);
+ return new LocalActivityResult(activityId, null, failedResult, null);
+ }
+
+ static LocalActivityResult cancelled(ActivityTaskHandler.Result ahResult) {
+ return new LocalActivityResult(
+ ahResult.getActivityId(), null, null, ahResult.getTaskCanceled());
+ }
+
+ /**
+ * Only zero (manual activity completion) or one request is allowed. Task token and identity
+ * fields shouldn't be filled in. Retry options are the service call. These options override the
+ * default ones set on the activity worker.
+ */
+ public LocalActivityResult(
+ @Nonnull String activityId,
+ @Nullable RespondActivityTaskCompletedRequest executionCompleted,
+ @Nullable ExecutionFailedResult executionFailed,
+ @Nullable RespondActivityTaskCanceledRequest executionCanceled) {
+ this.activityId = activityId;
+ this.executionCompleted = executionCompleted;
+ this.executionFailed = executionFailed;
+ this.executionCanceled = executionCanceled;
+ }
+
+ @Nonnull
+ public String getActivityId() {
+ return activityId;
+ }
+
+ @Nullable
+ public RespondActivityTaskCompletedRequest getExecutionCompleted() {
+ return executionCompleted;
+ }
+
+ @Nullable
+ public ExecutionFailedResult getExecutionFailed() {
+ return executionFailed;
+ }
+
+ @Nullable
+ public RespondActivityTaskCanceledRequest getExecutionCanceled() {
+ return executionCanceled;
+ }
+
+ @Override
+ public String toString() {
+ return "LocalActivityResult{"
+ + "activityId='"
+ + activityId
+ + '\''
+ + ", executionCompleted="
+ + executionCompleted
+ + ", executionFailed="
+ + executionFailed
+ + ", executionCanceled="
+ + executionCanceled
+ + '}';
+ }
+
+ public static class ExecutionFailedResult {
+ @Nonnull private final RetryState retryState;
+ @Nonnull private final Failure failure;
+ private final int lastAttempt;
+ @Nullable private final Duration backoff;
+
+ public ExecutionFailedResult(
+ @Nonnull RetryState retryState,
+ @Nonnull Failure failure,
+ int lastAttempt,
+ @Nullable Duration backoff) {
+ this.retryState = retryState;
+ this.failure = failure;
+ this.lastAttempt = lastAttempt;
+ this.backoff = backoff;
+ }
+
+ @Nonnull
+ public RetryState getRetryState() {
+ return retryState;
+ }
+
+ @Nonnull
+ public Failure getFailure() {
+ return failure;
+ }
+
+ public int getLastAttempt() {
+ return lastAttempt;
+ }
+
+ @Nullable
+ public Duration getBackoff() {
+ return backoff;
+ }
+
+ public boolean isTimeout() {
+ return failure.hasTimeoutFailureInfo();
+ }
+
+ @Override
+ public String toString() {
+ return "ExecutionFailedResult{"
+ + "retryState="
+ + retryState
+ + ", failure="
+ + failure
+ + ", lastAttempt="
+ + lastAttempt
+ + ", backoff="
+ + backoff
+ + '}';
+ }
+ }
+}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
index 6546b8da89..616c71b98b 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
@@ -20,30 +20,35 @@
package io.temporal.internal.worker;
-import com.google.protobuf.util.Timestamps;
+import static io.temporal.internal.worker.LocalActivityResult.failed;
+
+import com.google.common.base.Preconditions;
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.ImmutableMap;
-import io.temporal.api.common.v1.RetryPolicy;
+import io.grpc.Deadline;
+import io.temporal.api.enums.v1.RetryState;
+import io.temporal.api.enums.v1.TimeoutType;
import io.temporal.api.failure.v1.Failure;
+import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
-import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
+import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.FailureConverter;
import io.temporal.internal.common.ProtobufTimeUtils;
+import io.temporal.internal.common.RetryOptionsUtils;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.worker.MetricsType;
import io.temporal.worker.WorkerMetricsTag;
+import io.temporal.workflow.Functions;
import java.time.Duration;
import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.concurrent.*;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -51,15 +56,32 @@
final class LocalActivityWorker implements SuspendableWorker {
private static final Logger log = LoggerFactory.getLogger(LocalActivityWorker.class);
- @Nonnull private SuspendableWorker poller = new NoopSuspendableWorker();
+ // RETRY_STATE_IN_PROGRESS shows that it's not the end
+ // for this local activity execution from the workflow point of view.
+ // It's also not conflicting with any other situations
+ // and uniquely identifies the reach of the local retries
+ // and a need to schedule a timer.
+ private static final RetryState LOCAL_RETRY_LIMIT_RETRY_STATE =
+ RetryState.RETRY_STATE_IN_PROGRESS;
+
private final ActivityTaskHandler handler;
private final String namespace;
private final String taskQueue;
+ private final ScheduledExecutorService scheduledExecutor;
+
private final SingleWorkerOptions options;
+
+ private static final int QUEUE_SIZE = 1000;
+ private final BlockingQueue pendingTasks =
+ new ArrayBlockingQueue<>(QUEUE_SIZE);
private final LocalActivityPollTask laPollTask;
+ private final LocalActivityDispatcherImpl laScheduler;
+
private final PollerOptions pollerOptions;
private final Scope workerMetricsScope;
+ @Nonnull private SuspendableWorker poller = new NoopSuspendableWorker();
+
public LocalActivityWorker(
@Nonnull String namespace,
@Nonnull String taskQueue,
@@ -67,8 +89,18 @@ public LocalActivityWorker(
@Nonnull ActivityTaskHandler handler) {
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
+ this.scheduledExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread thread = new Thread(r);
+ thread.setName(
+ WorkerThreadsNameHelper.getLocalActivitySchedulerThreadPrefix(
+ namespace, taskQueue));
+ return thread;
+ });
this.handler = handler;
- this.laPollTask = new LocalActivityPollTask();
+ this.laPollTask = new LocalActivityPollTask(pendingTasks);
+ this.laScheduler = new LocalActivityDispatcherImpl();
this.options = Objects.requireNonNull(options);
this.pollerOptions = getPollerOptions(options);
this.workerMetricsScope =
@@ -79,7 +111,7 @@ public LocalActivityWorker(
@Override
public void start() {
if (handler.isAnyTypeSupported()) {
- PollTaskExecutor pollTaskExecutor =
+ PollTaskExecutor pollTaskExecutor =
new PollTaskExecutor<>(
namespace,
taskQueue,
@@ -100,67 +132,154 @@ public void start() {
}
}
- public boolean isAnyTypeSupported() {
- return handler.isAnyTypeSupported();
+ private boolean submitAnAttempt(LocalActivityAttemptTask task) {
+ try {
+ @Nullable
+ Duration scheduleToStartTimeout = task.getExecutionParams().getScheduleToStartTimeout();
+ boolean accepted =
+ scheduleToStartTimeout != null
+ ? pendingTasks.offer(task, scheduleToStartTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ : pendingTasks.offer(task);
+ if (accepted) {
+ log.trace("LocalActivity queued: {}", task.getActivityId());
+ } else {
+ log.trace(
+ "LocalActivity queue submitting timed out for activity {}, scheduleToStartTimeout: {}",
+ task.getActivityId(),
+ scheduleToStartTimeout);
+ }
+ return accepted;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
}
- @Override
- public boolean isStarted() {
- return poller.isStarted();
- }
+ /**
+ * @param executionContext execution context of the activity
+ * @param activityTask activity task
+ * @param executionThrowable exception happened during the activity execution. Can be null (for
+ * startToClose timeout)
+ * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
+ * applicable
+ */
+ @Nonnull
+ private RetryDecision shouldRetry(
+ LocalActivityExecutionContext executionContext,
+ PollActivityTaskQueueResponse activityTask,
+ @Nullable Throwable executionThrowable) {
+ int currentAttempt = activityTask.getAttempt();
+
+ if (isNonRetryableApplicationFailure(executionThrowable)) {
+ return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
+ }
- @Override
- public boolean isShutdown() {
- return poller.isShutdown();
- }
+ if (executionThrowable instanceof Error) {
+ // TODO Error inside Local Activity shouldn't be failing the local activity call.
+ // Instead we should fail Workflow Task. Implement a special flag for that in the result.
+ // task.callback(executionFailed(activityHandlerResult,
+ // RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, currentAttempt));
+ // don't just swallow Error from activities, propagate it to the top
+ throw (Error) executionThrowable;
+ }
- @Override
- public boolean isTerminated() {
- return poller.isTerminated();
- }
+ if (isRetryPolicyNotSet(activityTask)) {
+ return new RetryDecision(RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET, null);
+ }
- @Override
- public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
- return poller.shutdown(shutdownManager, interruptTasks);
- }
+ RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
- @Override
- public void awaitTermination(long timeout, TimeUnit unit) {
- poller.awaitTermination(timeout, unit);
- }
+ if (RetryOptionsUtils.isNotRetryable(retryOptions, executionThrowable)) {
+ return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
+ }
- @Override
- public void suspendPolling() {
- poller.suspendPolling();
- }
+ if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) {
+ return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null);
+ }
- @Override
- public void resumePolling() {
- poller.resumePolling();
- }
+ long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
+ if (RetryOptionsUtils.isDeadlineReached(
+ executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
+ return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null);
+ }
- @Override
- public boolean isSuspended() {
- return poller.isSuspended();
+ if (executionContext.getLocalRetryDeadline().timeRemaining(TimeUnit.MILLISECONDS)
+ <= sleepMillis) {
+ return new RetryDecision(LOCAL_RETRY_LIMIT_RETRY_STATE, Duration.ofMillis(sleepMillis));
+ }
+
+ return new RetryDecision(Duration.ofMillis(sleepMillis));
}
- private PollerOptions getPollerOptions(SingleWorkerOptions options) {
- PollerOptions pollerOptions = options.getPollerOptions();
- if (pollerOptions.getPollThreadNamePrefix() == null) {
- pollerOptions =
- PollerOptions.newBuilder(pollerOptions)
- .setPollThreadNamePrefix(
- WorkerThreadsNameHelper.getLocalActivityPollerThreadPrefix(namespace, taskQueue))
- .build();
- }
- return pollerOptions;
+ /**
+ * @param executionContext execution context of the activity
+ * @param backoff delay time in milliseconds to the next attempt
+ * @param failure if supplied, it will be used to override {@link
+ * LocalActivityExecutionContext#getLastFailure()}
+ */
+ private void scheduleNextAttempt(
+ LocalActivityExecutionContext executionContext,
+ @Nonnull Duration backoff,
+ @Nullable Failure failure) {
+ PollActivityTaskQueueResponse nextActivityTask =
+ executionContext.getNextAttemptActivityTask(failure);
+ LocalActivityAttemptTask task =
+ new LocalActivityAttemptTask(executionContext, nextActivityTask);
+ Deadline.after(backoff.toMillis(), TimeUnit.MILLISECONDS)
+ .runOnExpiration(new LocalActivityRetryHandler(task), scheduledExecutor);
}
- public BiFunction getLocalActivityTaskPoller() {
- return laPollTask;
+ private class LocalActivityDispatcherImpl implements LocalActivityDispatcher {
+ @Override
+ public boolean dispatch(
+ ExecuteLocalActivityParameters params,
+ Functions.Proc1 resultCallback) {
+
+ long localRetryThresholdMs = params.getLocalRetryThreshold().toMillis();
+ Preconditions.checkState(localRetryThresholdMs > 0, "localRetryThresholdMs must be > 0");
+ Deadline localRetryDeadline = Deadline.after(localRetryThresholdMs, TimeUnit.MILLISECONDS);
+
+ long passedFromOriginalSchedulingMs = 0;
+ if (params.getOriginalScheduledTimestamp() != ExecuteLocalActivityParameters.NOT_SCHEDULED) {
+ passedFromOriginalSchedulingMs =
+ System.currentTimeMillis() - params.getOriginalScheduledTimestamp();
+ } else {
+ params.setOriginalScheduledTimestamp(System.currentTimeMillis());
+ }
+
+ Duration scheduleToCloseTimeout = params.getScheduleToCloseTimeout();
+ Deadline scheduleToCloseDeadline = null;
+ if (scheduleToCloseTimeout != null) {
+ scheduleToCloseDeadline =
+ Deadline.after(
+ scheduleToCloseTimeout.toMillis() - passedFromOriginalSchedulingMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ LocalActivityExecutionContext executionContext =
+ new LocalActivityExecutionContext(
+ params, resultCallback, localRetryDeadline, scheduleToCloseDeadline);
+
+ LocalActivityAttemptTask task =
+ new LocalActivityAttemptTask(executionContext, params.getInitialActivityTask());
+ boolean accepted = submitAnAttempt(task);
+
+ if (accepted) {
+ if (scheduleToCloseDeadline != null) {
+ ScheduledFuture> scheduledScheduleToClose =
+ scheduledExecutor.schedule(
+ new ScheduleToCloseTimeoutHandler(executionContext),
+ scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ executionContext.setScheduleToCloseFuture(scheduledScheduleToClose);
+ }
+ }
+
+ return accepted;
+ }
}
- private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler {
+ private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler {
private final ActivityTaskHandler handler;
@@ -169,9 +288,11 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
}
@Override
- public void handle(LocalActivityTask task) throws Exception {
- ExecuteLocalActivityParameters params = task.getParams();
- PollActivityTaskQueueResponse.Builder activityTask = params.getActivityTask();
+ public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
+ LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
+ PollActivityTaskQueueResponse activityTask = attemptTask.getAttemptTask();
+ String activityId = activityTask.getActivityId();
+
Scope metricsScope =
workerMetricsScope.tagged(
ImmutableMap.of(
@@ -180,37 +301,101 @@ public void handle(LocalActivityTask task) throws Exception {
MetricsTag.WORKFLOW_TYPE,
activityTask.getWorkflowType().getName()));
- MDC.put(LoggerTag.ACTIVITY_ID, activityTask.getActivityId());
+ int currentAttempt = activityTask.getAttempt();
+
+ MDC.put(LoggerTag.ACTIVITY_ID, activityId);
MDC.put(LoggerTag.ACTIVITY_TYPE, activityTask.getActivityType().getName());
MDC.put(LoggerTag.WORKFLOW_ID, activityTask.getWorkflowExecution().getWorkflowId());
MDC.put(LoggerTag.WORKFLOW_TYPE, activityTask.getWorkflowType().getName());
MDC.put(LoggerTag.RUN_ID, activityTask.getWorkflowExecution().getRunId());
-
- ActivityTaskHandler.Result result = null;
try {
- result =
- handleLocalActivity(
- params.getActivityTask(),
- System.currentTimeMillis(),
- params.getLocalRetryThreshold().toMillis(),
- metricsScope);
+ ScheduledFuture> startToCloseTimeoutFuture = null;
+
+ if (activityTask.hasStartToCloseTimeout()) {
+ startToCloseTimeoutFuture =
+ scheduledExecutor.schedule(
+ new StartToCloseTimeoutHandler(attemptTask),
+ ProtobufTimeUtils.toJavaDuration(
+ attemptTask.getAttemptTask().getStartToCloseTimeout())
+ .toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
+
+ ActivityTaskHandler.Result activityHandlerResult;
+ Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
+ try {
+ activityHandlerResult =
+ handler.handle(new ActivityTask(activityTask, () -> {}), metricsScope, true);
+ } finally {
+ sw.stop();
+ }
+
+ // Making sure that the result handling code following this statement is mutual exclusive
+ // with the start to close timeout handler.
+ boolean startToCloseTimeoutFired =
+ startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false);
+
+ if (startToCloseTimeoutFired) {
+ // If start to close timeout fired, the result of this activity execution should be
+ // discarded.
+ // Scheduling of the next attempt is taken care by the StartToCloseTimeoutHandler.
+ return;
+ }
+
+ if (activityHandlerResult.getTaskCompleted() != null) {
+ com.uber.m3.util.Duration e2eDuration =
+ ProtobufTimeUtils.toM3DurationSinceNow(activityTask.getScheduledTime());
+ metricsScope.timer(MetricsType.LOCAL_ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
+ executionContext.callback(LocalActivityResult.completed(activityHandlerResult));
+ return;
+ }
+
+ if (activityHandlerResult.getTaskCanceled() != null) {
+ executionContext.callback(LocalActivityResult.cancelled(activityHandlerResult));
+ return;
+ }
+
+ Preconditions.checkState(
+ activityHandlerResult.getTaskFailed() != null,
+ "One of taskCompleted, taskCanceled or taskFailed must be set");
+
+ Failure executionFailure =
+ activityHandlerResult.getTaskFailed().getTaskFailedRequest().getFailure();
+
+ RetryDecision retryDecision =
+ shouldRetry(
+ executionContext, activityTask, activityHandlerResult.getTaskFailed().getFailure());
+ if (retryDecision.doNextAttempt()) {
+ scheduleNextAttempt(
+ executionContext,
+ Objects.requireNonNull(
+ retryDecision.nextAttemptBackoff,
+ "nextAttemptBackoff is expected to not be null"),
+ executionFailure);
+ } else {
+ executionContext.callback(
+ failed(
+ activityId,
+ retryDecision.retryState,
+ executionFailure,
+ currentAttempt,
+ retryDecision.nextAttemptBackoff));
+ }
+
} catch (Throwable ex) {
- // handleLocalActivity if expected to never throw an exception and return result
+ // handleLocalActivity is expected to never throw an exception and return a result
// that can be used for a workflow callback if this method throws, it's a bug.
log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
-
- // Even if some unexpected underlying exception happened, wee have to create the result
- // and make it published to the caller any way. But it's not a normal code path.
Failure failure = FailureConverter.exceptionToFailure(ex);
- RespondActivityTaskFailedRequest.Builder taskFailedRequest =
- RespondActivityTaskFailedRequest.newBuilder().setFailure(failure);
- result =
- new ActivityTaskHandler.Result(
- params.getActivityTask().getActivityId(),
- null,
- new ActivityTaskHandler.Result.TaskFailedResult(taskFailedRequest.build(), ex),
- null,
- false);
+ executionContext.callback(
+ failed(
+ activityId,
+ RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR,
+ failure,
+ currentAttempt,
+ null));
throw ex;
} finally {
MDC.remove(LoggerTag.ACTIVITY_ID);
@@ -218,117 +403,200 @@ public void handle(LocalActivityTask task) throws Exception {
MDC.remove(LoggerTag.WORKFLOW_ID);
MDC.remove(LoggerTag.WORKFLOW_TYPE);
MDC.remove(LoggerTag.RUN_ID);
-
- task.getResultCallback().apply(result);
- }
-
- if (result.getTaskFailed() != null && result.getTaskFailed().getFailure() instanceof Error) {
- // don't just swallow Error from activities, propagate it to the top
- throw (Error) result.getTaskFailed().getFailure();
}
}
@Override
- public Throwable wrapFailure(LocalActivityTask task, Throwable failure) {
+ public Throwable wrapFailure(LocalActivityAttemptTask task, Throwable failure) {
return new RuntimeException("Failure processing local activity task.", failure);
}
+ }
- private @Nonnull ActivityTaskHandler.Result handleLocalActivity(
- PollActivityTaskQueueResponse.Builder activityTask,
- long activityStartTimeMs,
- long localRetryThresholdMs,
- Scope metricsScope) {
- int attempt = activityTask.getAttempt();
+ private class LocalActivityRetryHandler implements Runnable {
+ private final LocalActivityAttemptTask localActivityAttemptTask;
- metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
+ private LocalActivityRetryHandler(LocalActivityAttemptTask localActivityAttemptTask) {
+ this.localActivityAttemptTask = localActivityAttemptTask;
+ }
- ActivityTaskHandler.Result result;
- Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
- try {
- result =
- handler.handle(new ActivityTask(activityTask.build(), () -> {}), metricsScope, true);
- } finally {
- sw.stop();
- }
- result.setAttempt(attempt);
+ @Override
+ public void run() {
+ submitAnAttempt(localActivityAttemptTask);
+ }
+ }
- if (isNonRetryableApplicationFailure(result)) {
- return result;
- }
+ private static class ScheduleToCloseTimeoutHandler implements Runnable {
+ private final LocalActivityExecutionContext executionContext;
- if (result.getTaskCompleted() != null) {
- com.uber.m3.util.Duration e2eDuration =
- ProtobufTimeUtils.toM3DurationSinceNow(activityTask.getScheduledTime());
- metricsScope.timer(MetricsType.LOCAL_ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
- }
+ private ScheduleToCloseTimeoutHandler(LocalActivityExecutionContext executionContext) {
+ this.executionContext = executionContext;
+ }
- if (result.getTaskCompleted() != null
- || result.getTaskCanceled() != null
- || !activityTask.hasRetryPolicy()) {
- return result;
- }
+ @Override
+ public void run() {
+ executionContext.callback(
+ failed(
+ executionContext.getActivityId(),
+ RetryState.RETRY_STATE_TIMEOUT,
+ newTimeoutFailure(
+ TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext.getLastFailure()),
+ executionContext.getCurrentAttempt(),
+ null));
+ }
+ }
- RetryOptions retryOptions = buildRetryOptions(activityTask.getRetryPolicy());
-
- long sleepMillis = retryOptions.calculateSleepTime(attempt);
- long elapsedTask = System.currentTimeMillis() - activityStartTimeMs;
- long sinceScheduled =
- System.currentTimeMillis() - Timestamps.toMillis(activityTask.getScheduledTime());
- long elapsedTotal = elapsedTask + sinceScheduled;
- Duration timeout = ProtobufTimeUtils.toJavaDuration(activityTask.getScheduleToCloseTimeout());
- Optional expiration =
- timeout.compareTo(Duration.ZERO) > 0 ? Optional.of(timeout) : Optional.empty();
- if (retryOptions.shouldRethrow(
- result.getTaskFailed().getFailure(), expiration, attempt, elapsedTotal, sleepMillis)) {
- return result;
- } else {
- result.setBackoff(Duration.ofMillis(sleepMillis));
- }
+ private class StartToCloseTimeoutHandler implements Runnable {
+ private final LocalActivityAttemptTask attemptTask;
- // For small backoff we do local retry. Otherwise we will schedule timer on server side.
- // TODO(maxim): Use timer queue for retries to avoid tying up a thread.
- if (elapsedTask + sleepMillis < localRetryThresholdMs) {
- try {
- Thread.sleep(sleepMillis);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // if the activity thread got interrupted, we use the last failed result and exit fast
- return result;
- }
- activityTask.setAttempt(attempt + 1);
- return handleLocalActivity(
- activityTask, activityStartTimeMs, localRetryThresholdMs, metricsScope);
+ private StartToCloseTimeoutHandler(LocalActivityAttemptTask attemptTask) {
+ this.attemptTask = attemptTask;
+ }
+
+ @Override
+ public void run() {
+ LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
+ PollActivityTaskQueueResponse activityTask = attemptTask.getAttemptTask();
+ String activityId = activityTask.getActivityId();
+
+ int timingOutAttempt = activityTask.getAttempt();
+
+ RetryDecision retryDecision = shouldRetry(executionContext, activityTask, null);
+ if (retryDecision.doNextAttempt()) {
+ scheduleNextAttempt(
+ executionContext,
+ Objects.requireNonNull(
+ retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
+ // null because schedule to start / close is not meaningful
+ newTimeoutFailure(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, null));
} else {
- return result;
+ // RetryState.RETRY_STATE_TIMEOUT happens only when scheduleToClose is fired
+ // scheduleToClose timeout is effectively replacing the original startToClose
+ TimeoutType timeoutType =
+ RetryState.RETRY_STATE_TIMEOUT.equals(retryDecision.retryState)
+ ? TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
+ : TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE;
+ executionContext.callback(
+ failed(
+ activityId,
+ retryDecision.retryState,
+ newTimeoutFailure(timeoutType, executionContext.getLastFailure()),
+ timingOutAttempt,
+ retryDecision.nextAttemptBackoff));
}
}
}
- static RetryOptions buildRetryOptions(RetryPolicy retryPolicy) {
- String[] doNotRetry = new String[retryPolicy.getNonRetryableErrorTypesCount()];
- retryPolicy.getNonRetryableErrorTypesList().toArray(doNotRetry);
- RetryOptions.Builder roBuilder = RetryOptions.newBuilder();
- Duration maximumInterval = ProtobufTimeUtils.toJavaDuration(retryPolicy.getMaximumInterval());
- if (!maximumInterval.isZero()) {
- roBuilder.setMaximumInterval(maximumInterval);
+ public boolean isAnyTypeSupported() {
+ return handler.isAnyTypeSupported();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return poller.isStarted();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return poller.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return poller.isTerminated() && scheduledExecutor.isTerminated();
+ }
+
+ @Override
+ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
+ return poller
+ .shutdown(shutdownManager, interruptTasks)
+ .thenCompose(
+ r ->
+ shutdownManager.shutdownExecutor(
+ scheduledExecutor, this + "#scheduledExecutor", Duration.ofSeconds(1)))
+ .exceptionally(
+ e -> {
+ log.error("[BUG] Unexpected exception during shutdown", e);
+ return null;
+ });
+ }
+
+ @Override
+ public void awaitTermination(long timeout, TimeUnit unit) {
+ long timeoutMillis = unit.toMillis(timeout);
+ timeoutMillis = ShutdownManager.awaitTermination(poller, timeoutMillis);
+ ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
+ }
+
+ @Override
+ public void suspendPolling() {
+ poller.suspendPolling();
+ }
+
+ @Override
+ public void resumePolling() {
+ poller.resumePolling();
+ }
+
+ @Override
+ public boolean isSuspended() {
+ return poller.isSuspended();
+ }
+
+ private PollerOptions getPollerOptions(SingleWorkerOptions options) {
+ PollerOptions pollerOptions = options.getPollerOptions();
+ if (pollerOptions.getPollThreadNamePrefix() == null) {
+ pollerOptions =
+ PollerOptions.newBuilder(pollerOptions)
+ .setPollThreadNamePrefix(
+ WorkerThreadsNameHelper.getLocalActivityPollerThreadPrefix(namespace, taskQueue))
+ .build();
}
- Duration initialInterval = ProtobufTimeUtils.toJavaDuration(retryPolicy.getInitialInterval());
- if (!initialInterval.isZero()) {
- roBuilder.setInitialInterval(initialInterval);
+ return pollerOptions;
+ }
+
+ public LocalActivityDispatcher getLocalActivityScheduler() {
+ return laScheduler;
+ }
+
+ private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) {
+ TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
+ Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
+ if (cause != null) {
+ result.setCause(cause);
}
- if (retryPolicy.getBackoffCoefficient() >= 1) {
- roBuilder.setBackoffCoefficient(retryPolicy.getBackoffCoefficient());
+ return result.build();
+ }
+
+ private static boolean isRetryPolicyNotSet(
+ PollActivityTaskQueueResponseOrBuilder pollActivityTask) {
+ return !pollActivityTask.hasScheduleToCloseTimeout()
+ && (!pollActivityTask.hasRetryPolicy()
+ || pollActivityTask.getRetryPolicy().getMaximumAttempts() <= 0);
+ }
+
+ private static boolean isNonRetryableApplicationFailure(@Nullable Throwable executionThrowable) {
+ return executionThrowable instanceof ApplicationFailure
+ && ((ApplicationFailure) executionThrowable).isNonRetryable();
+ }
+
+ private static class RetryDecision {
+ private final @Nullable RetryState retryState;
+ private final @Nullable Duration nextAttemptBackoff;
+
+ // No next local attempts
+ public RetryDecision(@Nonnull RetryState retryState, @Nullable Duration nextAttemptBackoff) {
+ this.retryState = retryState;
+ this.nextAttemptBackoff = nextAttemptBackoff;
}
- if (retryPolicy.getMaximumAttempts() > 0) {
- roBuilder.setMaximumAttempts(retryPolicy.getMaximumAttempts());
+
+ // Do the next attempt
+ public RetryDecision(@Nonnull Duration nextAttemptBackoff) {
+ this.retryState = null;
+ this.nextAttemptBackoff = Objects.requireNonNull(nextAttemptBackoff);
}
- return roBuilder.setDoNotRetry(doNotRetry).validateBuildWithDefaults();
- }
- private static boolean isNonRetryableApplicationFailure(ActivityTaskHandler.Result result) {
- return result.getTaskFailed() != null
- && result.getTaskFailed().getFailure() != null
- && result.getTaskFailed().getFailure() instanceof ApplicationFailure
- && ((ApplicationFailure) result.getTaskFailed().getFailure()).isNonRetryable();
+ public boolean doNextAttempt() {
+ return retryState == null;
+ }
}
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java
index e3b9a692c9..2a4e07735b 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java
@@ -110,7 +110,7 @@ public SyncWorkflowWorker(
stickyTaskQueueName,
singleWorkerOptions.getStickyQueueScheduleToStartTimeout(),
service,
- laWorker.getLocalActivityTaskPoller());
+ laWorker.getLocalActivityScheduler());
workflowWorker =
new WorkflowWorker(
@@ -135,7 +135,7 @@ public SyncWorkflowWorker(
null,
Duration.ZERO,
service,
- laWorker.getLocalActivityTaskPoller());
+ laWorker.getLocalActivityScheduler());
queryReplayHelper = new QueryReplayHelper(nonStickyReplayTaskHandler);
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkerThreadsNameHelper.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkerThreadsNameHelper.java
index 3a35a072b8..dbf453c34b 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkerThreadsNameHelper.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkerThreadsNameHelper.java
@@ -28,6 +28,9 @@ class WorkerThreadsNameHelper {
public static final String SHUTDOWN_MANAGER_THREAD_NAME_PREFIX = "TemporalShutdownManager";
public static final String ACTIVITY_HEARTBEAT_THREAD_NAME_PREFIX = "TemporalActivityHeartbeat-";
+ public static final String LOCAL_ACTIVITY_SCHEDULER_THREAD_NAME_PREFIX =
+ "LocalActivityScheduler-";
+
public static String getWorkflowPollerThreadPrefix(String namespace, String taskQueue) {
return WORKFLOW_POLL_THREAD_NAME_PREFIX
+ "\""
@@ -58,4 +61,8 @@ public static String getActivityPollerThreadPrefix(String namespace, String task
public static String getActivityHeartbeatThreadPrefix(String namespace, String taskQueue) {
return ACTIVITY_HEARTBEAT_THREAD_NAME_PREFIX + namespace + "-" + taskQueue;
}
+
+ public static String getLocalActivitySchedulerThreadPrefix(String namespace, String taskQueue) {
+ return LOCAL_ACTIVITY_SCHEDULER_THREAD_NAME_PREFIX + namespace + "-" + taskQueue;
+ }
}
diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
index 8097b70240..0e8a5244b5 100644
--- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
@@ -271,7 +271,8 @@ public Builder setDefaultDeadlockDetectionTimeout(long defaultDeadlockDetectionT
public Builder setMaxHeartbeatThrottleInterval(@Nullable Duration interval) {
Preconditions.checkArgument(
interval == null || !interval.isNegative(),
- "Negative maxHeartbeatThrottleInterval value: " + interval);
+ "Negative maxHeartbeatThrottleInterval value: %s",
+ interval);
this.maxHeartbeatThrottleInterval = interval;
return this;
}
@@ -286,7 +287,8 @@ public Builder setMaxHeartbeatThrottleInterval(@Nullable Duration interval) {
public Builder setDefaultHeartbeatThrottleInterval(@Nullable Duration interval) {
Preconditions.checkArgument(
interval == null || !interval.isNegative(),
- "Negative defaultHeartbeatThrottleInterval value: " + interval);
+ "Negative defaultHeartbeatThrottleInterval value: %s",
+ interval);
this.defaultHeartbeatThrottleInterval = interval;
return this;
}
diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/LocalActivityWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java
similarity index 89%
rename from temporal-sdk/src/test/java/io/temporal/internal/worker/LocalActivityWorkerTest.java
rename to temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java
index 1a65104e73..511224e171 100644
--- a/temporal-sdk/src/test/java/io/temporal/internal/worker/LocalActivityWorkerTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/internal/common/RetryOptionsUtilsTest.java
@@ -18,17 +18,16 @@
* limitations under the License.
*/
-package io.temporal.internal.worker;
+package io.temporal.internal.common;
import static org.junit.Assert.assertEquals;
import io.temporal.api.common.v1.RetryPolicy;
import io.temporal.common.RetryOptions;
-import io.temporal.internal.common.ProtobufTimeUtils;
import java.time.Duration;
import org.junit.Test;
-public class LocalActivityWorkerTest {
+public class RetryOptionsUtilsTest {
@Test
public void buildRetryOptions() {
Duration initialInterval = Duration.ofSeconds(2);
@@ -42,7 +41,7 @@ public void buildRetryOptions() {
.addNonRetryableErrorTypes(IllegalStateException.class.getName())
.build();
- RetryOptions retryOptions = LocalActivityWorker.buildRetryOptions(retryPolicy);
+ RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(retryPolicy);
assertEquals(initialInterval, retryOptions.getInitialInterval());
assertEquals(maxInterval, retryOptions.getMaximumInterval());
assertEquals(5, retryOptions.getMaximumAttempts());
diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java
index eab41672e5..d1a8b2e8cc 100644
--- a/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java
@@ -45,6 +45,7 @@
import io.temporal.internal.history.LocalActivityMarkerUtils;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.internal.statemachines.WorkflowStateMachines;
+import io.temporal.internal.worker.LocalActivityDispatcher;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkflowImplementationOptions;
@@ -52,7 +53,6 @@
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
-import java.util.function.BiFunction;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
@@ -101,7 +101,7 @@ public void queryIsOutdated() {
wft,
SingleWorkerOptions.newBuilder().build(),
new NoopScope(),
- mock(BiFunction.class));
+ mock(LocalActivityDispatcher.class));
stateMachines = handler.getWorkflowStateMachines();
QueryResult queryResult =
@@ -152,7 +152,8 @@ private ReplayWorkflow createReplayWorkflow(WorkflowExecutionHistory workflowExe
.getMarkerRecordedEventAttributes()))
.build()),
null,
- false),
+ false,
+ null),
(r, e) -> {});
return false;
})
diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java
index 29a4f95634..49ae3be82a 100644
--- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java
@@ -39,7 +39,7 @@
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.internal.history.LocalActivityMarkerUtils;
import io.temporal.internal.history.MarkerUtils;
-import io.temporal.internal.worker.ActivityTaskHandler;
+import io.temporal.internal.worker.LocalActivityResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -96,21 +96,24 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) {
.setActivityId("id1")
.setActivityType(ActivityType.newBuilder().setName("activity1")),
null,
- true);
+ true,
+ null);
ExecuteLocalActivityParameters parameters2 =
new ExecuteLocalActivityParameters(
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id2")
.setActivityType(ActivityType.newBuilder().setName("activity2")),
null,
- false);
+ false,
+ null);
ExecuteLocalActivityParameters parameters3 =
new ExecuteLocalActivityParameters(
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id3")
.setActivityType(ActivityType.newBuilder().setName("activity3")),
null,
- true);
+ true,
+ null);
builder
., Failure>add2(
@@ -180,30 +183,28 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) {
h.handleWorkflowTask(stateMachines, 1);
List requests = stateMachines.takeLocalActivityRequests();
assertEquals(2, requests.size());
- assertEquals("id1", requests.get(0).getActivityTask().getActivityId());
- assertEquals("id2", requests.get(1).getActivityTask().getActivityId());
+ assertEquals("id1", requests.get(0).getActivityId());
+ assertEquals("id2", requests.get(1).getActivityId());
Payloads result2 = converter.toPayloads("result2").get();
- ActivityTaskHandler.Result completionActivity2 =
- new ActivityTaskHandler.Result(
+ LocalActivityResult completionActivity2 =
+ new LocalActivityResult(
"id2",
RespondActivityTaskCompletedRequest.newBuilder().setResult(result2).build(),
null,
- null,
- false);
+ null);
stateMachines.handleLocalActivityCompletion(completionActivity2);
requests = stateMachines.takeLocalActivityRequests();
assertEquals(1, requests.size());
- assertEquals("id3", requests.get(0).getActivityTask().getActivityId());
+ assertEquals("id3", requests.get(0).getActivityId());
Payloads result3 = converter.toPayloads("result3").get();
- ActivityTaskHandler.Result completionActivity3 =
- new ActivityTaskHandler.Result(
+ LocalActivityResult completionActivity3 =
+ new LocalActivityResult(
"id3",
RespondActivityTaskCompletedRequest.newBuilder().setResult(result3).build(),
null,
- null,
- false);
+ null);
stateMachines.handleLocalActivityCompletion(completionActivity3);
requests = stateMachines.takeLocalActivityRequests();
assertTrue(requests.isEmpty());
@@ -235,13 +236,12 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) {
assertTrue(requests.isEmpty());
Payloads result = converter.toPayloads("result1").get();
- ActivityTaskHandler.Result completionActivity1 =
- new ActivityTaskHandler.Result(
+ LocalActivityResult completionActivity1 =
+ new LocalActivityResult(
"id1",
RespondActivityTaskCompletedRequest.newBuilder().setResult(result).build(),
null,
- null,
- false);
+ null);
stateMachines.handleLocalActivityCompletion(completionActivity1);
requests = stateMachines.takeLocalActivityRequests();
assertTrue(requests.isEmpty());
@@ -285,7 +285,8 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) {
.setActivityId("id1")
.setActivityType(ActivityType.newBuilder().setName("activity1")),
null,
- false);
+ false,
+ null);
builder
., Failure>add2(
(r, c) -> stateMachines.scheduleLocalActivityTask(parameters1, c))
@@ -326,7 +327,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) {
h.handleWorkflowTask(stateMachines);
List requests = stateMachines.takeLocalActivityRequests();
assertEquals(1, requests.size());
- assertEquals("id1", requests.get(0).getActivityTask().getActivityId());
+ assertEquals("id1", requests.get(0).getActivityId());
List commands = stateMachines.takeCommands();
assertTrue(commands.isEmpty());
}
diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java
index 603497ce69..6694b255f6 100644
--- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ActivityTimeoutTest.java
@@ -100,7 +100,7 @@ public void tearDown() {
* go into the cause chain.
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void maximumAttemptsReached_startToCloseTimingOutActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Collections.singletonList(Outcome.SLEEP), 1, 100);
@@ -143,7 +143,7 @@ public void maximumAttemptsReached_startToCloseTimingOutActivity(boolean local)
* {@link TimeoutFailure}({@link TimeoutType#TIMEOUT_TYPE_START_TO_CLOSE})
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void maximumAttemptsReached_twiceStartToCloseTimingOutActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Collections.singletonList(Outcome.SLEEP), 2, 100);
@@ -193,7 +193,7 @@ public void maximumAttemptsReached_twiceStartToCloseTimingOutActivity(boolean lo
* ActivityFailure is limited by 2
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void maximumAttemptsReached_threeStartToCloseTimingOutActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Collections.singletonList(Outcome.SLEEP), 3, 100);
@@ -245,8 +245,8 @@ public void maximumAttemptsReached_threeStartToCloseTimingOutActivity(boolean lo
* {@link ApplicationFailure}
*/
@Test
- @Parameters({"false"})
- public void maximumAttemptsReached_onceFailing_onceStartToCloseTimingOutActivity(boolean local) {
+ @Parameters({"false", "true"})
+ public void maximumAttemptsReached_failing_startToCloseTimingOutActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Arrays.asList(Outcome.FAIL, Outcome.SLEEP), 2, 5);
@@ -285,8 +285,8 @@ public void maximumAttemptsReached_onceFailing_onceStartToCloseTimingOutActivity
}
// TODO Parametrize when scheduleToStart support is added for local activities
- @Parameters({"false"})
@Test
+ @Parameters({"false"})
public void scheduleToStartTimeout(boolean local) {
Worker worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(TestActivityTimeoutWorkflowImpl.class);
@@ -327,7 +327,7 @@ public void scheduleToStartTimeout(boolean local) {
* go into the cause chain.
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void scheduleToCloseTimeout_startToCloseTimingOutActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Collections.singletonList(Outcome.SLEEP), 1, 100);
@@ -372,7 +372,7 @@ public void scheduleToCloseTimeout_startToCloseTimingOutActivity(boolean local)
* {@link TimeoutFailure}({@link TimeoutType#TIMEOUT_TYPE_START_TO_CLOSE})
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void scheduleToCloseTimeout_twiceStartToCloseTimingOutActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Collections.singletonList(Outcome.SLEEP), 2, 100);
@@ -426,8 +426,8 @@ public void scheduleToCloseTimeout_twiceStartToCloseTimingOutActivity(boolean lo
* an original first failure is not preserved in the chain
*/
@Test
- @Parameters({"false"})
- public void scheduleToCloseTimeout_onceFailing_twiceStartToCloseTimingOut(boolean local) {
+ @Parameters({"false", "true"})
+ public void scheduleToCloseTimeout_failing_twiceStartToCloseTimingOut(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Arrays.asList(Outcome.FAIL, Outcome.SLEEP, Outcome.SLEEP), 3, 5);
@@ -478,7 +478,7 @@ public void scheduleToCloseTimeout_onceFailing_twiceStartToCloseTimingOut(boolea
* {@link ApplicationFailure}[from the second attempt]
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void scheduleToCloseTimeout_startToClose_failing_startToCloseTimingOut(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Arrays.asList(Outcome.SLEEP, Outcome.FAIL, Outcome.SLEEP), 3, 5);
@@ -524,7 +524,7 @@ public void scheduleToCloseTimeout_startToClose_failing_startToCloseTimingOut(bo
* {@link ApplicationFailure}
*/
@Test
- @Parameters({"false"})
+ @Parameters({"false", "true"})
public void scheduleToCloseTimeout_failingActivity(boolean local) {
ControlledActivityImpl activity =
new ControlledActivityImpl(Collections.singletonList(Outcome.FAIL), 3, -1);
@@ -550,7 +550,116 @@ public void scheduleToCloseTimeout_failingActivity(boolean local) {
assertTrue(activityFailure.getCause() instanceof ApplicationFailure);
ApplicationFailure applicationFailure = (ApplicationFailure) activityFailure.getCause();
+ assertEquals("intentional failure", applicationFailure.getOriginalMessage());
+
+ assertNull(applicationFailure.getCause());
+
+ activity.verifyAttempts();
+ }
+
+ /**
+ * This test verifies the behavior and observed result of a present scheduleToClose timeout ond an
+ * activity that doesn't fit into scheduleToClose on the first attempt.
+ *
+ * The expected structure is
+ * {@link ActivityFailure}({@link RetryState#RETRY_STATE_TIMEOUT}) ->
+ * {@link TimeoutFailure}({@link TimeoutType#TIMEOUT_TYPE_SCHEDULE_TO_CLOSE})
+ */
+ @Test
+ @Parameters({"false", "true"})
+ public void scheduleToCloseTimeout_timingOutActivity(boolean local) {
+ ControlledActivityImpl activity =
+ new ControlledActivityImpl(Collections.singletonList(Outcome.SLEEP), 1, 100);
+
+ Worker worker = testEnvironment.newWorker(TASK_QUEUE);
+ worker.registerWorkflowImplementationTypes(TestActivityTimeoutWorkflowImpl.class);
+ worker.registerActivitiesImplementations(activity);
+ testEnvironment.start();
+ WorkflowClient client = testEnvironment.getWorkflowClient();
+ WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
+ TestActivityTimeoutWorkflow workflow =
+ client.newWorkflowStub(TestActivityTimeoutWorkflow.class, options);
+
+ WorkflowException e =
+ assertThrows(WorkflowException.class, () -> workflow.workflow(5, -1, -1, -1, local));
+
+ assertTrue(e.getCause() instanceof ActivityFailure);
+ ActivityFailure activityFailure = (ActivityFailure) e.getCause();
+
+ if (ExternalServiceTestConfigurator.isUseExternalService() && !local) {
+ // Real temporal server return this specific case of scheduleToClose as non-retryable failure.
+ // It's inconsistent with other situations and conflicts with non-retryable application
+ // failures.
+ // TODO This comment should be updated with an issue when filed.
+ assertEquals(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, activityFailure.getRetryState());
+ } else {
+ assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());
+ }
+
+ MatcherAssert.assertThat(
+ activityFailure.getMessage(), CoreMatchers.containsString("Activity task timed out"));
+ assertTrue(activityFailure.getCause() instanceof TimeoutFailure);
+ TimeoutFailure scheduleToClose = (TimeoutFailure) activityFailure.getCause();
+ assertEquals(TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, scheduleToClose.getTimeoutType());
+
+ assertNull(scheduleToClose.getCause());
+
+ activity.verifyAttempts();
+ }
+
+ /**
+ * This test hits a scenario when an activity
+ *
+ *
+ * - fails on the first attempt
+ *
- reaches scheduleToClose on the second attempt
+ *
+ *
+ * The expected structure is
+ * {@link ActivityFailure}({@link RetryState#RETRY_STATE_TIMEOUT}) ->
+ * {@link TimeoutFailure}({@link TimeoutType#TIMEOUT_TYPE_SCHEDULE_TO_CLOSE}) ->
+ * {@link ApplicationFailure}
+ */
+ @Test
+ @Parameters({"false", "true"})
+ public void scheduleToCloseTimeout_failing_timingOutActivity(boolean local) {
+ ControlledActivityImpl activity =
+ new ControlledActivityImpl(Arrays.asList(Outcome.FAIL, Outcome.SLEEP), 2, 100);
+
+ Worker worker = testEnvironment.newWorker(TASK_QUEUE);
+ worker.registerWorkflowImplementationTypes(TestActivityTimeoutWorkflowImpl.class);
+ worker.registerActivitiesImplementations(activity);
+ testEnvironment.start();
+ WorkflowClient client = testEnvironment.getWorkflowClient();
+ WorkflowOptions options = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build();
+ TestActivityTimeoutWorkflow workflow =
+ client.newWorkflowStub(TestActivityTimeoutWorkflow.class, options);
+
+ WorkflowException e =
+ assertThrows(WorkflowException.class, () -> workflow.workflow(5, -1, -1, -1, local));
+
+ assertTrue(e.getCause() instanceof ActivityFailure);
+ ActivityFailure activityFailure = (ActivityFailure) e.getCause();
+
+ if (ExternalServiceTestConfigurator.isUseExternalService() && !local) {
+ // Real temporal server return this specific case of scheduleToClose as non-retryable failure.
+ // It's inconsistent with other situations and conflicts with non-retryable application
+ // failures.
+ // TODO This comment should be updated with an issue when filed.
+ assertEquals(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, activityFailure.getRetryState());
+ } else {
+ assertEquals(RetryState.RETRY_STATE_TIMEOUT, activityFailure.getRetryState());
+ }
+
+ MatcherAssert.assertThat(
+ activityFailure.getMessage(), CoreMatchers.containsString("Activity task timed out"));
+
+ assertTrue(activityFailure.getCause() instanceof TimeoutFailure);
+ TimeoutFailure scheduleToClose = (TimeoutFailure) activityFailure.getCause();
+ assertEquals(TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, scheduleToClose.getTimeoutType());
+
+ ApplicationFailure applicationFailure = (ApplicationFailure) scheduleToClose.getCause();
assertEquals("intentional failure", applicationFailure.getOriginalMessage());
assertNull(applicationFailure.getCause());
diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatBufferedEventTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatBufferedEventTest.java
index 0d93404108..b12a47b3dc 100644
--- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatBufferedEventTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatBufferedEventTest.java
@@ -91,7 +91,8 @@ public static class TestLongLocalActivityWorkflowTaskHeartbeatWorkflowImpl
public String execute(String taskQueue) {
VariousTestActivities localActivities =
Workflow.newLocalActivityStub(
- VariousTestActivities.class, SDKTestOptions.newLocalActivityOptions());
+ VariousTestActivities.class,
+ SDKTestOptions.newLocalActivityOptions20sScheduleToClose());
Async.function(localActivities::sleepActivity, 10 * 1000L, 123);
signaled.get();
return "foo";
diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatTest.java
index b4ac4295b5..8b3a42d343 100644
--- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LongLocalActivityWorkflowTaskHeartbeatTest.java
@@ -73,7 +73,8 @@ public static class TestLongLocalActivityWorkflowTaskHeartbeatWorkflowImpl
public String execute(String taskQueue) {
VariousTestActivities localActivities =
Workflow.newLocalActivityStub(
- VariousTestActivities.class, SDKTestOptions.newLocalActivityOptions());
+ VariousTestActivities.class,
+ SDKTestOptions.newLocalActivityOptions20sScheduleToClose());
return localActivities.sleepActivity(5000, 123);
}
}
diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowCancellationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowCancellationTest.java
index 8441ab489c..fc93ebd24e 100644
--- a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowCancellationTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ChildWorkflowCancellationTest.java
@@ -164,7 +164,8 @@ public static class TestChildWorkflowImpl implements NoArgsWorkflow {
public void execute() {
TestActivities.VariousTestActivities localActivities =
Workflow.newLocalActivityStub(
- TestActivities.VariousTestActivities.class, SDKTestOptions.newLocalActivityOptions());
+ TestActivities.VariousTestActivities.class,
+ SDKTestOptions.newLocalActivityOptions20sScheduleToClose());
try {
Workflow.sleep(Duration.ofHours(1));
} catch (CanceledFailure e) {