-
Notifications
You must be signed in to change notification settings - Fork 213
Rework Local Activity scheduling #1507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. | ||
| * | ||
| * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this material except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package io.temporal.internal.common; | ||
|
|
||
| import io.grpc.Deadline; | ||
| import io.temporal.api.common.v1.RetryPolicy; | ||
| import io.temporal.common.RetryOptions; | ||
| import io.temporal.failure.ActivityFailure; | ||
| import io.temporal.failure.ApplicationFailure; | ||
| import io.temporal.failure.ChildWorkflowFailure; | ||
| import java.time.Duration; | ||
| import java.util.concurrent.TimeUnit; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| public class RetryOptionsUtils { | ||
| public static boolean isNotRetryable(RetryOptions o, @Nullable Throwable e) { | ||
| if (e == null) { | ||
| return false; | ||
| } | ||
| if (e instanceof ActivityFailure || e instanceof ChildWorkflowFailure) { | ||
| e = e.getCause(); | ||
| } | ||
| String type = | ||
| e instanceof ApplicationFailure | ||
| ? ((ApplicationFailure) e).getType() | ||
| : e.getClass().getName(); | ||
| if (o.getDoNotRetry() != null) { | ||
| for (String doNotRetry : o.getDoNotRetry()) { | ||
| if (doNotRetry.equals(type)) { | ||
| return true; | ||
| } | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| public static boolean areAttemptsReached(RetryOptions o, long attempt) { | ||
| return (o.getMaximumAttempts() != 0 && attempt >= o.getMaximumAttempts()); | ||
| } | ||
|
|
||
| public static boolean isDeadlineReached(@Nullable Deadline deadline, long sleepTimeMs) { | ||
| return deadline != null && deadline.timeRemaining(TimeUnit.MILLISECONDS) < sleepTimeMs; | ||
| } | ||
|
|
||
| public static RetryOptions toRetryOptions(RetryPolicy retryPolicy) { | ||
| RetryOptions.Builder roBuilder = RetryOptions.newBuilder(); | ||
|
|
||
| Duration maximumInterval = ProtobufTimeUtils.toJavaDuration(retryPolicy.getMaximumInterval()); | ||
| if (!maximumInterval.isZero()) { | ||
| roBuilder.setMaximumInterval(maximumInterval); | ||
| } | ||
|
|
||
| Duration initialInterval = ProtobufTimeUtils.toJavaDuration(retryPolicy.getInitialInterval()); | ||
| if (!initialInterval.isZero()) { | ||
| roBuilder.setInitialInterval(initialInterval); | ||
| } | ||
|
|
||
| if (retryPolicy.getBackoffCoefficient() >= 1) { | ||
| roBuilder.setBackoffCoefficient(retryPolicy.getBackoffCoefficient()); | ||
| } | ||
|
|
||
| if (retryPolicy.getMaximumAttempts() > 0) { | ||
| roBuilder.setMaximumAttempts(retryPolicy.getMaximumAttempts()); | ||
| } | ||
|
|
||
| roBuilder.setDoNotRetry( | ||
| retryPolicy | ||
| .getNonRetryableErrorTypesList() | ||
| .toArray(new String[retryPolicy.getNonRetryableErrorTypesCount()])); | ||
|
|
||
| return roBuilder.validateBuildWithDefaults(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,7 +57,6 @@ | |
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.locks.Lock; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.function.BiFunction; | ||
|
|
||
| /** | ||
| * Implements workflow executor that relies on replay of a workflow code. An instance of this class | ||
|
|
@@ -70,12 +69,12 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler { | |
|
|
||
| private final Lock lock = new ReentrantLock(); | ||
|
|
||
| private final Functions.Proc1<ActivityTaskHandler.Result> localActivityCompletionSink; | ||
| private final Functions.Proc1<LocalActivityResult> localActivityCompletionSink; | ||
|
|
||
| private final BlockingQueue<ActivityTaskHandler.Result> localActivityCompletionQueue = | ||
| private final BlockingQueue<LocalActivityResult> localActivityCompletionQueue = | ||
| new LinkedBlockingDeque<>(); | ||
|
|
||
| private final BiFunction<LocalActivityTask, Duration, Boolean> localActivityTaskPoller; | ||
| private final LocalActivityDispatcher localActivityDispatcher; | ||
|
|
||
| private final ReplayWorkflow workflow; | ||
|
|
||
|
|
@@ -93,15 +92,15 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler { | |
| PollWorkflowTaskQueueResponseOrBuilder workflowTask, | ||
| SingleWorkerOptions workerOptions, | ||
| Scope metricsScope, | ||
| BiFunction<LocalActivityTask, Duration, Boolean> localActivityTaskPoller) { | ||
| LocalActivityDispatcher localActivityDispatcher) { | ||
| HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0); | ||
| if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) { | ||
| throw new IllegalArgumentException( | ||
| "First event in the history is not WorkflowExecutionStarted"); | ||
| } | ||
| this.startedEvent = startedEvent.getWorkflowExecutionStartedEventAttributes(); | ||
| this.metricsScope = metricsScope; | ||
| this.localActivityTaskPoller = localActivityTaskPoller; | ||
| this.localActivityDispatcher = localActivityDispatcher; | ||
| this.workflow = workflow; | ||
|
|
||
| this.workflowStateMachines = new WorkflowStateMachines(new StatesMachinesCallbackImpl()); | ||
|
|
@@ -286,10 +285,10 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx | |
| // much sense. I believe we should add ScheduleToStart timeout for the local activities | ||
| // as well. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Todo now addressed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope! #1512 Will be in a separate PR. |
||
| long maxWaitTimeNs = Math.max(nextWFTHeartbeatTimeNs - System.nanoTime(), 0); | ||
| boolean accepted = | ||
| localActivityTaskPoller.apply( | ||
| new LocalActivityTask(laRequest, localActivityCompletionSink), | ||
| Duration.ofNanos(maxWaitTimeNs)); | ||
| laRequest.setScheduleToStartTimeout(Duration.ofNanos(maxWaitTimeNs)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can user not set sched-to-start timeout? (Honestly, makes sense)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's old logic. It will be gone in #1512. |
||
| boolean accepted = localActivityDispatcher.dispatch(laRequest, localActivityCompletionSink); | ||
| // TODO this needs to be reworked when we implement a proper scheduleToStart to report a | ||
| // Failure in the callback. A proper test is also needed for this scenario. | ||
| Preconditions.checkState( | ||
| accepted, | ||
| "Unable to schedule local activity for execution, " | ||
|
|
@@ -302,7 +301,7 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx | |
| } | ||
|
|
||
| long maxWaitTimeTillHeartbeatNs = Math.max(nextWFTHeartbeatTimeNs - System.nanoTime(), 0); | ||
| ActivityTaskHandler.Result laCompletion = | ||
| LocalActivityResult laCompletion = | ||
| localActivityCompletionQueue.poll(maxWaitTimeTillHeartbeatNs, TimeUnit.NANOSECONDS); | ||
| if (laCompletion == null) { | ||
| // Need to force a new task as we are out of time | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to set an explicit do not retry field in Java or is it only the string matching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String matching as it gets converted to protobuf's RetryPolicy, at least at this moment.