Skip to content

Rework Local Activity scheduling#1507

Merged
Spikhalskiy merged 1 commit intotemporalio:masterfrom
Spikhalskiy:issue-1004
Nov 24, 2022
Merged

Rework Local Activity scheduling#1507
Spikhalskiy merged 1 commit intotemporalio:masterfrom
Spikhalskiy:issue-1004

Conversation

@Spikhalskiy
Copy link
Copy Markdown
Contributor

@Spikhalskiy Spikhalskiy commented Nov 8, 2022

What was changed

This PR implements a timer thread that independently of the local activity executors performs lifecycle management of local activities and allows to enforce timeouts.
The interface between LA workers and workflow code / state machines is reworked to allow LA workers to communicate different failure reasons / retry back to the workflow.
startToClose timeout and scheduleToClose timeout supports are implemented.
The ground is prepared for adding scheduleToStart timeout for local activities #1512
The workflow/LA interface is improved to prepare for #1261

Why?

Previously Local Activities scheduling and management code were trivial.
One thread executes the activities, waits with sleep until the next attempt, and executes the next attempt.
While being simple, there is a bunch of Cons to this approach:

  1. Executor thread is retained for the whole time of LA execution from the start until the finish.
  2. This model allows us to enforce neither startToClose nor scheduleToClose timeouts because the only thread that manages an activity invocation is tied to the execution of the activity's code.
  3. The is no way to start a new attempt until the end of the previous attempt.

Closes #1004

@Spikhalskiy Spikhalskiy force-pushed the issue-1004 branch 18 times, most recently from c0dd625 to 3d2cfa2 Compare November 15, 2022 01:31
@Spikhalskiy Spikhalskiy force-pushed the issue-1004 branch 12 times, most recently from 644f23a to 777486e Compare November 18, 2022 17:11
Comment on lines +41 to +44
String type =
e instanceof ApplicationFailure
? ((ApplicationFailure) e).getType()
: e.getClass().getName();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to set an explicit do not retry field in Java or is it only the string matching?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String matching as it gets converted to protobuf's RetryPolicy, at least at this moment.

@@ -286,10 +285,10 @@ private void processLocalActivityRequests(long startTimeNs) throws InterruptedEx
// much sense. I believe we should add ScheduleToStart timeout for the local activities
// as well.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo now addressed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! #1512 Will be in a separate PR.

localActivityTaskPoller.apply(
new LocalActivityTask(laRequest, localActivityCompletionSink),
Duration.ofNanos(maxWaitTimeNs));
laRequest.setScheduleToStartTimeout(Duration.ofNanos(maxWaitTimeNs));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can user not set sched-to-start timeout? (Honestly, makes sense)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's old logic. It will be gone in #1512.
We can discuss, but I don't see any "very good" solutions to not set scheduleToStart. I guess keep workflow tasks open will be expected by users if they have some unexpected backfilling in their systems. An alternative reasonable solution may be to set it to localRetryThreshold. The second approach is safer from the load control, but may be not expected by users.

private final PollActivityTaskQueueResponse.Builder activityTask;
public static final long NOT_SCHEDULED = -1;

// It doesn't have all the fields published yet.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What doesn't? This builder? Change to "This builder won't..."

Comment on lines +64 to +65
private static final RetryState LOCAL_RETRY_LIMIT_RETRY_STATE =
RetryState.RETRY_STATE_IN_PROGRESS;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why re-bind this here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It used in couple of places and I wanted to write a long comment on this value choice. And I don't want the comment to be duplicated.

Comment on lines +277 to +339
// Making sure that the result handling code following this statement is mutual exclusive
// with the start to close timeout handler.
boolean startToCloseTimeoutFired =
startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is coming after handle, does this mean the start-to-close timeout cannot "interrupt" the running activity if it fires - but does it tell the workflow about the timeout right away?

So here we're just saying "OK we (elsewhere) told the WF the LA timed out, and now we're just discarding the result when the LA eventually finishes"?

Do we try to notify the LA of cancel / interrupt it at all? I'm guessing not since there's not a good mechanism for it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. RIGHT NOW there is no interruption or cancellations of local activities in java-sdk. We have a task for it #1303 and I guess it's coming at some moment.

}

if (executionThrowable instanceof Error) {
// TODO Error inside Local Activity shouldn't be failing the local activity call.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meant to be done in this PR?

Copy link
Copy Markdown
Contributor Author

@Spikhalskiy Spikhalskiy Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's old existing behavior. It's incorrect, but it's not trivial to fix correctly. And directly relevant to the goal of this PR. I opened a separate task for it: #1533

Comment on lines +330 to +363
if (isRetryPolicyNotSet(activityTask)) {
executionContext.callback(
failed(
activityId,
RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET,
executionFailure,
currentAttempt,
null));
return;
}

RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());

if (RetryOptionsUtils.isNotRetryable(retryOptions, executionThrowable)) {
executionContext.callback(
failed(
activityId,
RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
executionFailure,
currentAttempt,
null));
return;
}

if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) {
executionContext.callback(
failed(
activityId,
RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED,
executionFailure,
currentAttempt,
null));
return;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could combine these all by just setting a retry state local var and then passing that into the failed call -- looks like the next few too

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye, will refactor it.

Comment on lines +460 to +457
public void run() {
LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I realized yesterday Core isn't retrying start-to-close timeouts right now, we opened a bug to do it

executionContext.callback(
failed(
activityId,
LOCAL_RETRY_LIMIT_RETRY_STATE,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the state where we schedule a timer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where we WILL schedule a timer in one of the upcoming PRs, yeah.

@Spikhalskiy Spikhalskiy merged commit c5f0ebb into temporalio:master Nov 24, 2022
@Spikhalskiy Spikhalskiy deleted the issue-1004 branch November 24, 2022 04:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Local Activity Worker ignores startToClose and scheduleToClose timeouts

2 participants