Skip to content

Commit 1455a3b

Browse files
authored
Use base aws classes in AWS CloudFormation Operators/Sensors (#36771)
1 parent c2d02b4 commit 1455a3b

5 files changed

Lines changed: 194 additions & 57 deletions

File tree

airflow/providers/amazon/aws/operators/cloud_formation.py

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,66 +15,79 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18-
"""This module contains CloudFormation create/delete stack operators."""
18+
"""This module contains AWS CloudFormation create/delete stack operators."""
1919
from __future__ import annotations
2020

2121
from typing import TYPE_CHECKING, Sequence
2222

23-
from airflow.models import BaseOperator
2423
from airflow.providers.amazon.aws.hooks.cloud_formation import CloudFormationHook
24+
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
25+
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
2526

2627
if TYPE_CHECKING:
2728
from airflow.utils.context import Context
2829

2930

30-
class CloudFormationCreateStackOperator(BaseOperator):
31+
class CloudFormationCreateStackOperator(AwsBaseOperator[CloudFormationHook]):
3132
"""
32-
An operator that creates a CloudFormation stack.
33+
An operator that creates a AWS CloudFormation stack.
3334
3435
.. seealso::
3536
For more information on how to use this operator, take a look at the guide:
3637
:ref:`howto/operator:CloudFormationCreateStackOperator`
3738
3839
:param stack_name: stack name (templated)
39-
:param cloudformation_parameters: parameters to be passed to CloudFormation.
40-
:param aws_conn_id: aws connection to uses
40+
:param cloudformation_parameters: parameters to be passed to AWS CloudFormation.
41+
:param aws_conn_id: The Airflow connection used for AWS credentials.
42+
If this is ``None`` or empty then the default boto3 behaviour is used. If
43+
running Airflow in a distributed manner and aws_conn_id is None or
44+
empty, then default boto3 configuration would be used (and must be
45+
maintained on each worker node).
46+
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
47+
:param verify: Whether or not to verify SSL certificates. See:
48+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
49+
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
50+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
4151
"""
4252

43-
template_fields: Sequence[str] = ("stack_name", "cloudformation_parameters")
44-
template_ext: Sequence[str] = ()
53+
aws_hook_class = CloudFormationHook
54+
template_fields: Sequence[str] = aws_template_fields("stack_name", "cloudformation_parameters")
4555
ui_color = "#6b9659"
4656

47-
def __init__(
48-
self, *, stack_name: str, cloudformation_parameters: dict, aws_conn_id: str = "aws_default", **kwargs
49-
):
57+
def __init__(self, *, stack_name: str, cloudformation_parameters: dict, **kwargs):
5058
super().__init__(**kwargs)
5159
self.stack_name = stack_name
5260
self.cloudformation_parameters = cloudformation_parameters
53-
self.aws_conn_id = aws_conn_id
5461

5562
def execute(self, context: Context):
5663
self.log.info("CloudFormation parameters: %s", self.cloudformation_parameters)
57-
58-
cloudformation_hook = CloudFormationHook(aws_conn_id=self.aws_conn_id)
59-
cloudformation_hook.create_stack(self.stack_name, self.cloudformation_parameters)
64+
self.hook.create_stack(self.stack_name, self.cloudformation_parameters)
6065

6166

62-
class CloudFormationDeleteStackOperator(BaseOperator):
67+
class CloudFormationDeleteStackOperator(AwsBaseOperator[CloudFormationHook]):
6368
"""
64-
An operator that deletes a CloudFormation stack.
65-
66-
:param stack_name: stack name (templated)
67-
:param cloudformation_parameters: parameters to be passed to CloudFormation.
69+
An operator that deletes a AWS CloudFormation stack.
6870
6971
.. seealso::
7072
For more information on how to use this operator, take a look at the guide:
7173
:ref:`howto/operator:CloudFormationDeleteStackOperator`
7274
73-
:param aws_conn_id: aws connection to uses
75+
:param stack_name: stack name (templated)
76+
:param cloudformation_parameters: parameters to be passed to CloudFormation.
77+
:param aws_conn_id: The Airflow connection used for AWS credentials.
78+
If this is ``None`` or empty then the default boto3 behaviour is used. If
79+
running Airflow in a distributed manner and aws_conn_id is None or
80+
empty, then default boto3 configuration would be used (and must be
81+
maintained on each worker node).
82+
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
83+
:param verify: Whether or not to verify SSL certificates. See:
84+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
85+
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
86+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
7487
"""
7588

76-
template_fields: Sequence[str] = ("stack_name",)
77-
template_ext: Sequence[str] = ()
89+
aws_hook_class = CloudFormationHook
90+
template_fields: Sequence[str] = aws_template_fields("stack_name")
7891
ui_color = "#1d472b"
7992
ui_fgcolor = "#FFF"
8093

@@ -93,6 +106,4 @@ def __init__(
93106

94107
def execute(self, context: Context):
95108
self.log.info("CloudFormation Parameters: %s", self.cloudformation_parameters)
96-
97-
cloudformation_hook = CloudFormationHook(aws_conn_id=self.aws_conn_id)
98-
cloudformation_hook.delete_stack(self.stack_name, self.cloudformation_parameters)
109+
self.hook.delete_stack(self.stack_name, self.cloudformation_parameters)

airflow/providers/amazon/aws/sensors/cloud_formation.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818
"""This module contains sensors for AWS CloudFormation."""
1919
from __future__ import annotations
2020

21-
from functools import cached_property
2221
from typing import TYPE_CHECKING, Sequence
2322

23+
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
24+
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
25+
2426
if TYPE_CHECKING:
2527
from airflow.utils.context import Context
2628

2729
from airflow.exceptions import AirflowSkipException
2830
from airflow.providers.amazon.aws.hooks.cloud_formation import CloudFormationHook
29-
from airflow.sensors.base import BaseSensorOperator
3031

3132

32-
class CloudFormationCreateStackSensor(BaseSensorOperator):
33+
class CloudFormationCreateStackSensor(AwsBaseSensor[CloudFormationHook]):
3334
"""
3435
Waits for a stack to be created successfully on AWS CloudFormation.
3536
@@ -38,19 +39,25 @@ class CloudFormationCreateStackSensor(BaseSensorOperator):
3839
:ref:`howto/sensor:CloudFormationCreateStackSensor`
3940
4041
:param stack_name: The name of the stack to wait for (templated)
41-
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
42-
stored
43-
:param poke_interval: Time in seconds that the job should wait between each try
42+
:param aws_conn_id: The Airflow connection used for AWS credentials.
43+
If this is ``None`` or empty then the default boto3 behaviour is used. If
44+
running Airflow in a distributed manner and aws_conn_id is None or
45+
empty, then default boto3 configuration would be used (and must be
46+
maintained on each worker node).
47+
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
48+
:param verify: Whether or not to verify SSL certificates. See:
49+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
50+
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
51+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
4452
"""
4553

46-
template_fields: Sequence[str] = ("stack_name",)
54+
aws_hook_class = CloudFormationHook
55+
template_fields: Sequence[str] = aws_template_fields("stack_name")
4756
ui_color = "#C5CAE9"
4857

49-
def __init__(self, *, stack_name, aws_conn_id="aws_default", region_name=None, **kwargs):
58+
def __init__(self, *, stack_name, **kwargs):
5059
super().__init__(**kwargs)
5160
self.stack_name = stack_name
52-
self.aws_conn_id = aws_conn_id
53-
self.region_name = region_name
5461

5562
def poke(self, context: Context):
5663
stack_status = self.hook.get_stack_status(self.stack_name)
@@ -65,13 +72,8 @@ def poke(self, context: Context):
6572
raise AirflowSkipException(message)
6673
raise ValueError(message)
6774

68-
@cached_property
69-
def hook(self) -> CloudFormationHook:
70-
"""Create and return a CloudFormationHook."""
71-
return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
7275

73-
74-
class CloudFormationDeleteStackSensor(BaseSensorOperator):
76+
class CloudFormationDeleteStackSensor(AwsBaseSensor[CloudFormationHook]):
7577
"""
7678
Waits for a stack to be deleted successfully on AWS CloudFormation.
7779
@@ -80,12 +82,20 @@ class CloudFormationDeleteStackSensor(BaseSensorOperator):
8082
:ref:`howto/sensor:CloudFormationDeleteStackSensor`
8183
8284
:param stack_name: The name of the stack to wait for (templated)
83-
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
84-
stored
85-
:param poke_interval: Time in seconds that the job should wait between each try
85+
:param aws_conn_id: The Airflow connection used for AWS credentials.
86+
If this is ``None`` or empty then the default boto3 behaviour is used. If
87+
running Airflow in a distributed manner and aws_conn_id is None or
88+
empty, then default boto3 configuration would be used (and must be
89+
maintained on each worker node).
90+
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
91+
:param verify: Whether or not to verify SSL certificates. See:
92+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
93+
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
94+
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
8695
"""
8796

88-
template_fields: Sequence[str] = ("stack_name",)
97+
aws_hook_class = CloudFormationHook
98+
template_fields: Sequence[str] = aws_template_fields("stack_name")
8999
ui_color = "#C5CAE9"
90100

91101
def __init__(
@@ -113,8 +123,3 @@ def poke(self, context: Context):
113123
if self.soft_fail:
114124
raise AirflowSkipException(message)
115125
raise ValueError(message)
116-
117-
@cached_property
118-
def hook(self) -> CloudFormationHook:
119-
"""Create and return a CloudFormationHook."""
120-
return CloudFormationHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)

docs/apache-airflow-providers-amazon/operators/cloudformation.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ Prerequisite Tasks
3131

3232
.. include:: ../_partials/prerequisite_tasks.rst
3333

34+
Generic Parameters
35+
------------------
36+
37+
.. include:: ../_partials/generic_parameters.rst
38+
3439
Operators
3540
---------
3641

tests/providers/amazon/aws/operators/test_cloud_formation.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,35 @@ def mocked_hook_client():
4040

4141

4242
class TestCloudFormationCreateStackOperator:
43+
def test_init(self):
44+
op = CloudFormationCreateStackOperator(
45+
task_id="cf_create_stack_init",
46+
stack_name="fake-stack",
47+
cloudformation_parameters={},
48+
# Generic hooks parameters
49+
aws_conn_id="fake-conn-id",
50+
region_name="eu-west-1",
51+
verify=True,
52+
botocore_config={"read_timeout": 42},
53+
)
54+
assert op.hook.client_type == "cloudformation"
55+
assert op.hook.resource_type is None
56+
assert op.hook.aws_conn_id == "fake-conn-id"
57+
assert op.hook._region_name == "eu-west-1"
58+
assert op.hook._verify is True
59+
assert op.hook._config is not None
60+
assert op.hook._config.read_timeout == 42
61+
62+
op = CloudFormationCreateStackOperator(
63+
task_id="cf_create_stack_init",
64+
stack_name="fake-stack",
65+
cloudformation_parameters={},
66+
)
67+
assert op.hook.aws_conn_id == "aws_default"
68+
assert op.hook._region_name is None
69+
assert op.hook._verify is None
70+
assert op.hook._config is None
71+
4372
def test_create_stack(self, mocked_hook_client):
4473
stack_name = "myStack"
4574
timeout = 15
@@ -60,6 +89,30 @@ def test_create_stack(self, mocked_hook_client):
6089

6190

6291
class TestCloudFormationDeleteStackOperator:
92+
def test_init(self):
93+
op = CloudFormationDeleteStackOperator(
94+
task_id="cf_delete_stack_init",
95+
stack_name="fake-stack",
96+
# Generic hooks parameters
97+
aws_conn_id="fake-conn-id",
98+
region_name="us-east-1",
99+
verify=False,
100+
botocore_config={"read_timeout": 42},
101+
)
102+
assert op.hook.client_type == "cloudformation"
103+
assert op.hook.resource_type is None
104+
assert op.hook.aws_conn_id == "fake-conn-id"
105+
assert op.hook._region_name == "us-east-1"
106+
assert op.hook._verify is False
107+
assert op.hook._config is not None
108+
assert op.hook._config.read_timeout == 42
109+
110+
op = CloudFormationDeleteStackOperator(task_id="cf_delete_stack_init", stack_name="fake-stack")
111+
assert op.hook.aws_conn_id == "aws_default"
112+
assert op.hook._region_name is None
113+
assert op.hook._verify is None
114+
assert op.hook._config is None
115+
63116
def test_delete_stack(self, mocked_hook_client):
64117
stack_name = "myStackToBeDeleted"
65118

0 commit comments

Comments
 (0)