diff --git a/cdk-stepfunction-durable-lambda-function/.gitignore b/cdk-stepfunction-durable-lambda-function/.gitignore new file mode 100644 index 000000000..37833f8be --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/.gitignore @@ -0,0 +1,10 @@ +*.swp +package-lock.json +__pycache__ +.pytest_cache +.venv +*.egg-info + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/cdk-stepfunction-durable-lambda-function/README.md b/cdk-stepfunction-durable-lambda-function/README.md new file mode 100644 index 000000000..dcb300b22 --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/README.md @@ -0,0 +1,199 @@ +# AWS Step Functions to AWS Lambda durable functions + +This pattern demonstrates how to integrate AWS Lambda durable functions into an AWS Step Functions workflow. This pattern covers both the synchronous invocation (using default Request Response pattern) and asynchronous invocation (using the Step Function Wait for Callback with Task Token integration pattern) of the durable Lambda function. It addresses the challenge of running long-running Lambda functions (beyond 15 minutes) within a Step Functions orchestration, using asynchronous invocation and durable checkpointing. + +Announced at re:Invent 2025, [Lambda durable functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) introduce a checkpoint/replay mechanism that allows Lambda executions to run for up to one year, automatically recovering from interruptions. This pattern shows how to combine durable functions with Step Functions in a hybrid architecture: durable functions handle application-level logic within Lambda, while Step Functions coordinates the high-level workflow across multiple AWS services. + +Learn more about this pattern at Serverless Land Patterns: << Add the live URL here >> + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## When to Use This Pattern +Use this pattern when: +- Your Lambda function execution time exceeds 15 minutes and must be orchestrated by Step Functions +- You want to keep complex business logic within a Lambda function rather than splitting into a fanout architecture +- Your team prefers standard programming languages and IDE-based development over visual/JSON workflow designers +- You need fine-grained control over execution state in code + +Use Step Functions alone when: +- You are orchestrating multiple AWS services with native integrations +- Non-technical stakeholders need to understand and validate workflow logic +- You require zero-maintenance, fully managed infrastructure + +Many applications benefit from using both services. A common pattern is using durable functions for application-level logic within Lambda, while Step Functions coordinates high-level workflows across multiple AWS services beyond Lambda functions. + + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) (AWS CDK >= 2.240.0) Installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd cdk-stepfunction-durable-lambda-function + ``` +1. Create a virtual environment for python: + ```bash + python3 -m venv .venv + ``` +1. Activate the virtual environment: + ```bash + source .venv/bin/activate + ``` + + If you are in Windows platform, you would activate the virtualenv like this: + + ``` + % .venv\Scripts\activate.bat + ``` + +1. Install python modules: + ```bash + python3 -m pip install -r requirements.txt + ``` +1. From the command line, use CDK to synthesize the CloudFormation template and check for errors: + + ```bash + cdk synth + ``` + NOTE: You may need to perform a one time cdk bootstrapping using the following command. See [CDK Bootstrapping](https://docs.aws.amazon.com/cdk/v2/guide/bootstrapping.html) for more details. + ```bash + cdk bootstrap aws:/// + ``` + +1. From the command line, use CDK to deploy the stack: + + ```bash + cdk deploy + ``` + + Expected result: + + ```bash + ✅ CdkStepfunctionDurableLambdaFunctionStack + + Outputs: + CdkStepfunctionDurableLambdaFunctionStack.AsyncDurableFunctionName = sfn-dfn-async-durable-fn + CdkStepfunctionDurableLambdaFunctionStack.StepFunctionDFArn = arn:aws:states:us-east-1:XXXXXXXXXXXX:stateMachine:sfn-dfn-integration-pattern-cdk + CdkStepfunctionDurableLambdaFunctionStack.SyncDurableFunctionName = sfn-dfn-sync-durable-fn + Stack ARN: + arn:aws:cloudformation:us-east-1:XXXXXXXXXXXX:stack/CdkStepfunctionDurableLambdaFunctionStack/e4d30000-0000-0000-0000-000000007503 + + ``` + +1. Note the outputs from the CDK deployment process. These contain the resource names and/or ARNs which are used for testing. + + + +## How it works + +Once the CDK stack is deployed successfully, a Step Function workflow is created along with two durable Lambda functions in the account & region provided during the bootstrap step. Go to AWS Step Function Console to understand the basic state machine created. + +- The `sfn-dfn-async-durable-fn` durable Lambda function simulates a long running task that takes more than 15 mins (using a Wait condition). To avoid hitting Lambda function's 15 mins timeout, the function is configured with a durable execution timeout of 1 hr. As a result, this Lambda function can only be invoked asynchronously by setting the [InvocationType](https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html#lambda-Invoke-request-InvocationType) parameter to `Event`. +- The `sfn-dfn-sync-durable-fn` durable Lambda function simulates a short running task that completes within the 15 mins timeout. It is configured with a durable execution timeout of 15 mins which matches the standard Lambda function timeout. This Lambda function can be invoked synchronously without specifying any InvocationType parameter (or using `RequestResponse` value, which is also the default). + +See AWS documentation for more details on [Invoking durable Lambda functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking.html). + +#### Step Functions State Machine +![image](./resources/stepfunctions_graph.png) + +The state machine invokes these 2 durable Lambda functions in the following pattern: +1. When the state machine starts, it first executes the 'Async Durable Lambda Fn Invoke' task, which invokes the `sfn-dfn-async-durable-fn` Lambda function. Since Step Functions' default `LambdaInvoke` uses synchronous invocation, we need to change to the '[Wait for Callback with Task Token](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token)' integtation pattern with asynchronous invocation, otherwise Step Function task will throw an error - +``` +Lambda.InvalidParameterValueException: You cannot synchronously invoke a durable function with an executionTimeout greater than 15 minutes. +``` +The below state machine ASL snippet shows this configuration: +```bash + "Async Durable Lambda Fn Invoke": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken", # wait for callback integration pattern + "InvocationType": "Event", # set InvocationType = 'Event' for async Lambda invocation + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:XXXXXXXXXXXX:function:sfn-dfn-async-durable-fn:1", # durable Lambda functions must be invoked with a qualified ARN (version or alias) + "Payload": { + "TaskToken": "{% $states.context.Task.Token %}", # pass the task-token to Lambda for a callback later. + "minutes_to_wait": "{% $states.input.minutes_to_wait %}" + }, + "HeartbeatSeconds": 3600, # set a heartbeat timeout of 1 hr before task is considered failed + }, + "Output": "{% $states.result %}" + } +``` +> Note: Durable functions require qualified identifiers for invocation. You must invoke durable functions using a version number, alias, or $LATEST. You can use either a full qualified ARN or a function name with version/alias suffix. You cannot use an unqualified identifier (without a version or alias suffix). See AWS Documentation for more details on [Qualified ARNs requirement](https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking.html#durable-invoking-qualified-arns). + +Since this durable Lambda function has an artificial wait time of X mins (specified as a Step Function input), both the Step Functions execution and durable Lambda function execution will pause, without consuming any CPU. Once the wait timer expires, durable Lambda function will resume execution from this point, having checkpointed the previous steps. Since this Lambda was invoked asynchronously, we need to call Step Functions' `send_task_success` or `send_task_failure` API and pass the task-token that was sent as an event parameter to the Lambda from Step Function. This will enable the Step Functions to resume its state machine. + +IMPORTANT: When using Step Function WAIT_FOR_TASK_TOKEN pattern, wrap SendTaskSuccess in context.step() in your Lambda code to make it durable. If placed outside context.step(), it will execute on every replay causing duplicate callbacks, or may never execute if Lambda is interrupted, leaving Step Functions waiting indefinitely. Also, send callback as the FINAL durable step. + +2. The state machine then executes the 'synchronous Durable Lambda Fn Invoke' task which invokes the `sfn-dfn-sync-durable-fn` Lambda function. Since this function can be invoked synchronously, we use the default Step Function task configuration, as shown below - +```bash +"Synchronous Durable Lambda Fn Invoke": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", # default request-response pattern to invoke Lambda synchronously + "Arguments": { + "FunctionName": "arn:aws:lambda:us-east-1:XXXXXXXXXXXX:function:sfn-dfn-sync-durable-fn:1", # durable Lambda functions must be invoked with a qualified ARN (version or alias) + "Payload": "{% $states.input %}" + }, + "Output": "{% $states.result.Payload %}", + "End": true + } + }, +``` +Once the Lambda function completes its execution and returns a response, Step Functions completes the task execution and end the state machine flow. + +## Testing + +Go to the AWS Step Functions Console and select the Step Function created by CDK (look for a name starting with `sfn-dfn-integration-pattern-cdk`). Execute the step function workflow and provide the input parameters as described below - this makes the Lambda durable function wait for 20 mins, which is more than the standard Lambda execution timeout. Since the durable execution configuration is set at 1 hr, Lambda will pause and resume execution after 20 mins, instead of timing out. +```bash +{ + "minutes_to_wait": 20 +} +``` + +Wait for the Step Function workflow to complete. You can check the progress of the execution steps under the Executions section. + +> NOTE: Since we have artificially added a wait condition in the `sfn-dfn-async-durable-fn` durable Lambda function which will wait for the duration specified in the state machine execution input parameters, the function will pause until the timer expires. For testing purposes, change the timeout to a smaller value + +You can check the Durable executions section on the AWS Lambda service console for the `sfn-dfn-async-durable-fn` durable Lambda function to see how the various steps are checkpointed. + +#### Durable execution in the Lambda console +![durable](./resources/Lambda-durable-execution.png) + +## Best practices for Lambda durable functions and Step Functions integration +Durable functions use a replay-based execution model that requires different patterns than traditional Lambda functions. Follow these best practices to build reliable, cost-effective workflows. Please see AWS documentation for more details on [Best practices for Lambda durable functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-best-practices.html). + +- Synchronous invocation is not supported for durable functions with execution_timeout > 15 minutes. Always use WAIT_FOR_TASK_TOKEN + invocation_type=EVENT. +- `SendTaskSuccess` must be a durable step. Placing it outside context.step() risks duplicate callbacks on replay or missed callbacks on interruption. +- Durable and standard Lambdas can coexist in the same workflow. + + +## Cleanup +1. Delete the stack + ```bash + cdk destroy + ``` + +## Tutorial + +See [this useful workshop](https://cdkworkshop.com/30-python.html) on working with the AWS CDK for Python projects. + +## Useful commands + + * `cdk ls` list all stacks in the app + * `cdk synth` emits the synthesized CloudFormation template + * `cdk deploy` deploy this stack to your default AWS account/region + * `cdk diff` compare deployed stack with current state + * `cdk docs` open CDK documentation + +---- +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/cdk-stepfunction-durable-lambda-function/app.py b/cdk-stepfunction-durable-lambda-function/app.py new file mode 100644 index 000000000..19b0d79ce --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/app.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +import os + +import aws_cdk as cdk + +from cdk_stepfunction_durable_lambda_function.cdk_stepfunction_durable_lambda_function_stack import CdkStepfunctionDurableLambdaFunctionStack + + +app = cdk.App() +CdkStepfunctionDurableLambdaFunctionStack(app, "CdkStepfunctionDurableLambdaFunctionStack", + # If you don't specify 'env', this stack will be environment-agnostic. + # Account/Region-dependent features and context lookups will not work, + # but a single synthesized template can be deployed anywhere. + + # Uncomment the next line to specialize this stack for the AWS Account + # and Region that are implied by the current CLI configuration. + + #env=cdk.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')), + + # Uncomment the next line if you know exactly what Account and Region you + # want to deploy the stack to. */ + + #env=cdk.Environment(account='123456789012', region='us-east-1'), + + # For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html + ) + +app.synth() diff --git a/cdk-stepfunction-durable-lambda-function/async-durable-lambda/async_durable_function_invocation.py b/cdk-stepfunction-durable-lambda-function/async-durable-lambda/async_durable_function_invocation.py new file mode 100644 index 000000000..c746dc508 --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/async-durable-lambda/async_durable_function_invocation.py @@ -0,0 +1,74 @@ +from aws_durable_execution_sdk_python.config import Duration +from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step +from aws_durable_execution_sdk_python.execution import durable_execution +import random +import datetime +import boto3 +import json + +@durable_step +def create_order(context: StepContext): + order_id = f"order-{random.randint(1, 100)}" + context.logger.info(f"Creating order... : {order_id}") + return { + "order_id": order_id, + "total": 50.00, + "status": "Created" + } + +@durable_step +def send_notification(context: StepContext, order_id: str): + context.logger.info(f"Sending notification...") + return { + "sent": True, + "order_id": order_id, + "recipient": "customer@example.com", + "timestamp": datetime.datetime.now().isoformat() + } + +@durable_step +def send_sfn_task_success(context: StepContext, task_token: str, response: dict): + sfn_client = boto3.client("stepfunctions") + sfn_client.send_task_success( + taskToken=task_token, + output=json.dumps(response, default=str), + ) + +@durable_execution +def lambda_handler(event: dict, context: DurableContext) -> dict: + context.logger.info(f"Async Durable Lambda Event: {event}") + + # Extract Step Function Task Token outside durable step + # Only deterministic operations like event.pop("TaskToken") are safe outside steps. + task_token = event.pop("TaskToken", None) + minutes_to_wait = event.pop("minutes_to_wait", 1) + + # Step 1: Create the order + order_details = context.step(create_order()) + context.logger.info(f"Order created: {order_details['order_id']}") + + # Step 2: Wait X minutes - simulate a long running task + context.logger.info(f"Waiting {minutes_to_wait} minutes before sending notification...") + context.wait(Duration.from_minutes(minutes_to_wait)) + + # Step 3: Send notification + context.logger.info(f"Waited for {minutes_to_wait} minutes without consuming CPU.") + notification_details = context.step(send_notification(order_details['order_id'])) + context.logger.info("Notification sent successfully...") + + response = { + "success": True, + "notification": notification_details + } + + # IMPORTANT: When using Step Function WAIT_FOR_TASK_TOKEN pattern, + # wrap SendTaskSuccess in context.step() to make it durable. + # If placed outside context.step(), it will execute on every + # replay causing duplicate callbacks, or may never execute if + # Lambda is interrupted, leaving Step Functions waiting indefinitely. + # Send callback as the FINAL durable step + if task_token: + context.logger.info("Resuming Step Function by calling send_task_success with task_token") + context.step(send_sfn_task_success(task_token, response)) + + return response diff --git a/cdk-stepfunction-durable-lambda-function/cdk.json b/cdk-stepfunction-durable-lambda-function/cdk.json new file mode 100644 index 000000000..747d6d1bd --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/cdk.json @@ -0,0 +1,97 @@ +{ + "app": "python3 app.py", + "watch": { + "include": [ + "**" + ], + "exclude": [ + "README.md", + "cdk*.json", + "requirements*.txt", + "source.bat", + "**/__init__.py", + "**/__pycache__", + "tests" + ] + }, + "context": { + "@aws-cdk/aws-lambda:recognizeLayerVersion": true, + "@aws-cdk/core:checkSecretUsage": true, + "@aws-cdk/core:target-partitions": [ + "aws", + "aws-cn" + ], + "@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true, + "@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true, + "@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true, + "@aws-cdk/aws-iam:minimizePolicies": true, + "@aws-cdk/core:validateSnapshotRemovalPolicy": true, + "@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true, + "@aws-cdk/aws-s3:createDefaultLoggingPolicy": true, + "@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true, + "@aws-cdk/aws-apigateway:disableCloudWatchRole": true, + "@aws-cdk/core:enablePartitionLiterals": true, + "@aws-cdk/aws-events:eventsTargetQueueSameAccount": true, + "@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true, + "@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true, + "@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true, + "@aws-cdk/aws-route53-patters:useCertificate": true, + "@aws-cdk/customresources:installLatestAwsSdkDefault": false, + "@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true, + "@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true, + "@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true, + "@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true, + "@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true, + "@aws-cdk/aws-redshift:columnId": true, + "@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true, + "@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true, + "@aws-cdk/aws-apigateway:requestValidatorUniqueId": true, + "@aws-cdk/aws-kms:aliasNameRef": true, + "@aws-cdk/aws-kms:applyImportedAliasPermissionsToPrincipal": true, + "@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true, + "@aws-cdk/core:includePrefixInUniqueNameGeneration": true, + "@aws-cdk/aws-efs:denyAnonymousAccess": true, + "@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true, + "@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true, + "@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true, + "@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true, + "@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true, + "@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true, + "@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true, + "@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true, + "@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true, + "@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true, + "@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true, + "@aws-cdk/aws-eks:nodegroupNameAttribute": true, + "@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true, + "@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true, + "@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false, + "@aws-cdk/aws-s3:keepNotificationInImportedBucket": false, + "@aws-cdk/core:explicitStackTags": true, + "@aws-cdk/aws-ecs:enableImdsBlockingDeprecatedFeature": false, + "@aws-cdk/aws-ecs:disableEcsImdsBlocking": true, + "@aws-cdk/aws-ecs:reduceEc2FargateCloudWatchPermissions": true, + "@aws-cdk/aws-dynamodb:resourcePolicyPerReplica": true, + "@aws-cdk/aws-ec2:ec2SumTImeoutEnabled": true, + "@aws-cdk/aws-appsync:appSyncGraphQLAPIScopeLambdaPermission": true, + "@aws-cdk/aws-rds:setCorrectValueForDatabaseInstanceReadReplicaInstanceResourceId": true, + "@aws-cdk/core:cfnIncludeRejectComplexResourceUpdateCreatePolicyIntrinsics": true, + "@aws-cdk/aws-lambda-nodejs:sdkV3ExcludeSmithyPackages": true, + "@aws-cdk/aws-stepfunctions-tasks:fixRunEcsTaskPolicy": true, + "@aws-cdk/aws-ec2:bastionHostUseAmazonLinux2023ByDefault": true, + "@aws-cdk/aws-route53-targets:userPoolDomainNameMethodWithoutCustomResource": true, + "@aws-cdk/aws-elasticloadbalancingV2:albDualstackWithoutPublicIpv4SecurityGroupRulesDefault": true, + "@aws-cdk/aws-iam:oidcRejectUnauthorizedConnections": true, + "@aws-cdk/core:enableAdditionalMetadataCollection": true, + "@aws-cdk/aws-lambda:createNewPoliciesWithAddToRolePolicy": false, + "@aws-cdk/aws-s3:setUniqueReplicationRoleName": true, + "@aws-cdk/aws-events:requireEventBusPolicySid": true, + "@aws-cdk/core:aspectPrioritiesMutating": true, + "@aws-cdk/aws-dynamodb:retainTableReplica": true, + "@aws-cdk/aws-stepfunctions:useDistributedMapResultWriterV2": true, + "@aws-cdk/s3-notifications:addS3TrustKeyPolicyForSnsSubscriptions": true, + "@aws-cdk/aws-ec2:requirePrivateSubnetsForEgressOnlyInternetGateway": true, + "@aws-cdk/aws-s3:publicAccessBlockedByDefault": true, + "@aws-cdk/aws-lambda:useCdkManagedLogGroup": true + } +} diff --git a/cdk-stepfunction-durable-lambda-function/cdk_stepfunction_durable_lambda_function/__init__.py b/cdk-stepfunction-durable-lambda-function/cdk_stepfunction_durable_lambda_function/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cdk-stepfunction-durable-lambda-function/cdk_stepfunction_durable_lambda_function/cdk_stepfunction_durable_lambda_function_stack.py b/cdk-stepfunction-durable-lambda-function/cdk_stepfunction_durable_lambda_function/cdk_stepfunction_durable_lambda_function_stack.py new file mode 100644 index 000000000..c7912848c --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/cdk_stepfunction_durable_lambda_function/cdk_stepfunction_durable_lambda_function_stack.py @@ -0,0 +1,116 @@ +from aws_cdk import ( + Duration, + Stack, + CfnOutput, + aws_lambda as _lambda, + aws_stepfunctions as sfn, + aws_stepfunctions_tasks as tasks, + aws_iam as iam +) +from constructs import Construct + +class CdkStepfunctionDurableLambdaFunctionStack(Stack): + + def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: + super().__init__(scope, construct_id, **kwargs) + + resource_prefix = "sfn-dfn" + + # Create a durable Lambda function with execution timeout > 15 mins + async_durable_lambda_fn = _lambda.Function(self, "asyncDurableFunctionInvocation", + description="Durable Lambda function with execution timeout > 15 mins, invoked asynchronously from Step Function", + function_name=f"{resource_prefix}-async-durable-fn", + runtime=_lambda.Runtime.PYTHON_3_14, + handler="async_durable_function_invocation.lambda_handler", + architecture=_lambda.Architecture.ARM_64, + code=_lambda.Code.from_asset("./async-durable-lambda/"), + timeout=Duration.minutes(15), + # DurableConfig is what makes a Lambda function "durable" + durable_config=_lambda.DurableConfig( + execution_timeout=Duration.hours(1), # Durable Function duration set to 1 hour, greater than lambda function's 15 mins timeout + retention_period=Duration.days(3), + ), + ) + + # Create version for async durable function (required for durable functions) + async_durable_lambda_version = async_durable_lambda_fn.current_version + + # Create a durable Lambda function with execution timeout < 15 mins + synchronous_durable_lambda_fn = _lambda.Function(self, "synchronousDurableFunctionInvocation", + description="Durable Lambda function with execution timeout < 15 mins, invoked synchronously from Step Function", + function_name=f"{resource_prefix}-sync-durable-fn", + runtime=_lambda.Runtime.PYTHON_3_14, + handler="synchronous_durable_function_invocation.lambda_handler", + architecture=_lambda.Architecture.ARM_64, + code=_lambda.Code.from_asset("./sync-durable-lambda/"), + timeout=Duration.minutes(15), + # DurableConfig is what makes a Lambda function "durable" + durable_config=_lambda.DurableConfig( + execution_timeout=Duration.minutes(15), # Durable Function duration set to 15 mins, equal to lambda function's 15 mins timeout + retention_period=Duration.days(3), + ), + ) + + # Create version for async durable function (required for durable functions) + sync_durable_lambda_version = synchronous_durable_lambda_fn.current_version + + invoke_async_durable_function_task = tasks.LambdaInvoke( + self, "Async Durable Lambda Fn Invoke", + lambda_function=async_durable_lambda_version, + integration_pattern=sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN, # use this pattern to pass a task token to durable fn invoked asynchronously + invocation_type=tasks.LambdaInvocationType.EVENT, # set Lambda invocaton type = Event for async invoke + payload=sfn.TaskInput.from_object({ + "TaskToken": "{% $states.context.Task.Token %}", + "minutes_to_wait": "{% $states.input.minutes_to_wait %}" + }), + outputs="{% $states.result %}", + heartbeat_timeout=sfn.Timeout.duration(Duration.hours(1)), # setting Step Function task heartbeat timeout to 1 hour to match durable function's execution timeout + ) + + invoke_sync_durable_function_task = tasks.LambdaInvoke( + self, "Synchronous Durable Lambda Fn Invoke", + lambda_function=sync_durable_lambda_version, + payload=sfn.TaskInput.from_text("{% $states.input %}"), + outputs="{% $states.result.Payload %}" + ) + + chain = invoke_async_durable_function_task\ + .next(invoke_sync_durable_function_task + ) + + state_machine = sfn.StateMachine(self, "SFnDurableFunctionIntegration", + definition_body = sfn.DefinitionBody.from_chainable(chain), + query_language=sfn.QueryLanguage.JSONATA, + state_machine_name=resource_prefix + "-integration-pattern-cdk", + timeout=Duration.hours(2) + ) + + # Grant permission to allow durable Lambda function to send the token to + # the Step Function using send_task_success or send_task_failure API for + # WaitForTaskToken pattern + async_durable_lambda_fn.add_to_role_policy( + iam.PolicyStatement( + actions=[ + "states:SendTaskSuccess", + "states:SendTaskFailure", + "states:SendTaskHeartbeat" + ], + resources=[f"arn:aws:states:{self.region}:{self.account}:stateMachine:*"] + ) + ) + + CfnOutput(self, "StepFunctionDFArn", + value = state_machine.state_machine_arn, + export_name = 'StepFunctionDFArn', + description = 'Step Function arn') + + CfnOutput(self, "AsyncDurableFunctionName", + value = async_durable_lambda_fn.function_name, + export_name = 'AsyncDurableFunctionName', + description = 'Async durable Lambda function name') + + CfnOutput(self, "SyncDurableFunctionName", + value = synchronous_durable_lambda_fn.function_name, + export_name = 'SyncDurableFunctionName', + description = 'Synchronous durable Lambda function name') + diff --git a/cdk-stepfunction-durable-lambda-function/requirements-dev.txt b/cdk-stepfunction-durable-lambda-function/requirements-dev.txt new file mode 100644 index 000000000..927094516 --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/requirements-dev.txt @@ -0,0 +1 @@ +pytest==6.2.5 diff --git a/cdk-stepfunction-durable-lambda-function/requirements.txt b/cdk-stepfunction-durable-lambda-function/requirements.txt new file mode 100644 index 000000000..cb083ffb5 --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/requirements.txt @@ -0,0 +1,2 @@ +aws-cdk-lib==2.240.0 +constructs>=10.0.0,<11.0.0 diff --git a/cdk-stepfunction-durable-lambda-function/resources/Lambda-durable-execution.png b/cdk-stepfunction-durable-lambda-function/resources/Lambda-durable-execution.png new file mode 100644 index 000000000..3919ea491 Binary files /dev/null and b/cdk-stepfunction-durable-lambda-function/resources/Lambda-durable-execution.png differ diff --git a/cdk-stepfunction-durable-lambda-function/resources/stepfunctions_graph.png b/cdk-stepfunction-durable-lambda-function/resources/stepfunctions_graph.png new file mode 100644 index 000000000..8d77464a8 Binary files /dev/null and b/cdk-stepfunction-durable-lambda-function/resources/stepfunctions_graph.png differ diff --git a/cdk-stepfunction-durable-lambda-function/source.bat b/cdk-stepfunction-durable-lambda-function/source.bat new file mode 100644 index 000000000..9e1a83442 --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/source.bat @@ -0,0 +1,13 @@ +@echo off + +rem The sole purpose of this script is to make the command +rem +rem source .venv/bin/activate +rem +rem (which activates a Python virtualenv on Linux or Mac OS X) work on Windows. +rem On Windows, this command just runs this batch file (the argument is ignored). +rem +rem Now we don't need to document a Windows command for activating a virtualenv. + +echo Executing .venv\Scripts\activate.bat for you +.venv\Scripts\activate.bat diff --git a/cdk-stepfunction-durable-lambda-function/stepfunction-durablelambdafunction-pattern.json b/cdk-stepfunction-durable-lambda-function/stepfunction-durablelambdafunction-pattern.json new file mode 100644 index 000000000..15ceb0d0f --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/stepfunction-durablelambdafunction-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "Step Functions to Lambda durable functions", + "description": "Create a Step Functions workflow to invoke Lambda durable functions synchronously and asynchronously.", + "language": "Python", + "level": "200", + "framework": "AWS CDK", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to integrate AWS Lambda Durable Functions into an AWS Step Functions workflow. ", + "This pattern covers both the synchronous invocation (using default Request Response pattern) and asynchronous invocation (using the Step Function Wait for Callback with Task Token integration pattern) of the durable Lambda function.", + "It addresses the challenge of running long-running Lambda functions (beyond 15 minutes) within a Step Functions orchestration, using asynchronous invocation and durable checkpointing.", + "This pattern deploys one Step Functions and two Lambda functions along with required IAM policies." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/cdk-stepfunction-durable-lambda-function", + "templateURL": "serverless-patterns/cdk-stepfunction-durable-lambda-function", + "projectFolder": "cdk-stepfunction-durable-lambda-function", + "templateFile": "cdk_stepfunction_durable_lambda_function_stack.py" + } + }, + "resources": { + "bullets": [ + { + "text": "Step Functions Integration Patterns", + "link": "https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html" + }, + { + "text": "AWS Lambda durable functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Invoking durable Lambda functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking.html" + }, + { + "text": "Durable functions or Step Functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-step-functions.html" + } + ] + }, + "deploy": { + "text": [ + "cdk deploy" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: cdk destroy." + ] + }, + "authors": [ + { + "name": "Akshay Singhal", + "image": "https://avatars.githubusercontent.com/u/13421773?v=4", + "bio": "Akshay Singhal is a Principal Technical Account Manager (TAM) for AWS Enterprise Support focusing on the Security ISV (Independent Software Vendors) segment and loves everything serverless..", + "linkedin": "https://www.linkedin.com/in/singhalakshay", + "twitter": "@_sin_ak" + } + ] +} diff --git a/cdk-stepfunction-durable-lambda-function/sync-durable-lambda/synchronous_durable_function_invocation.py b/cdk-stepfunction-durable-lambda-function/sync-durable-lambda/synchronous_durable_function_invocation.py new file mode 100644 index 000000000..e4f715e14 --- /dev/null +++ b/cdk-stepfunction-durable-lambda-function/sync-durable-lambda/synchronous_durable_function_invocation.py @@ -0,0 +1,47 @@ +from aws_durable_execution_sdk_python.config import Duration +from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step +from aws_durable_execution_sdk_python.execution import durable_execution +import random +import datetime + +@durable_step +def create_order(context: StepContext): + order_id = f"order-{random.randint(1, 100)}" + context.logger.info(f"Creating order... : {order_id}") + return { + "order_id": order_id, + "total": 50.00, + "status": "Created" + } + +@durable_step +def send_notification(context: StepContext, order_id: str): + context.logger.info(f"Sending notification...") + return { + "sent": True, + "order_id": order_id, + "recipient": "customer@example.com", + "timestamp": datetime.datetime.now().isoformat() + } + +@durable_execution +def lambda_handler(event: dict, context: DurableContext) -> dict: + + context.logger.info(f"Lambda Event: {event}") + # Step 1: Create the order + order_details = context.step(create_order()) + context.logger.info(f"Order created: {order_details['order_id']}") + + # Step 2: Wait 10 seconds - simulate a short running process + context.logger.info("Waiting 10 seconds before sending notification...") + context.wait(Duration.from_seconds(10)) + + # Step 3: Send notification + context.logger.info("Waited for 10 seconds without consuming CPU.") + notification_details = context.step(send_notification(order_details['order_id'])) + context.logger.info("Notification sent successfully...") + + return { + "success": True, + "notification": notification_details + }