Skip to content

Commit 58e5d02

Browse files
authored
Improve debuggability of SQS, Lambda, EC2, and RDS hooks (#64661)
- Add debug logging before/after API calls in all four hooks, following the pattern from the Airbyte provider (#51503) and DynamoDB hook (#64629) - Fix `raise e` → bare `raise` in RDS hook (get_export_task_state, get_event_subscription_state) to avoid misleading extra traceback frames
1 parent 9d877b4 commit 58e5d02

4 files changed

Lines changed: 96 additions & 23 deletions

File tree

providers/amazon/src/airflow/providers/amazon/aws/hooks/ec2.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def get_instance(self, instance_id: str, filters: list | None = None):
8989
:param filters: List of filters to specify instances to get
9090
:return: Instance object
9191
"""
92+
self.log.debug("Getting EC2 instance %s with filters %s", instance_id, filters)
9293
if self._api_type == "client_type":
9394
return self.get_instances(filters=filters, instance_ids=[instance_id])[0]
9495

@@ -104,7 +105,9 @@ def stop_instances(self, instance_ids: list) -> dict:
104105
"""
105106
self.log.info("Stopping instances: %s", instance_ids)
106107

107-
return self.conn.stop_instances(InstanceIds=instance_ids)
108+
result = self.conn.stop_instances(InstanceIds=instance_ids)
109+
self.log.debug("stop_instances response: %s", result.get("StoppingInstances"))
110+
return result
108111

109112
@only_client_type
110113
def start_instances(self, instance_ids: list) -> dict:
@@ -116,7 +119,9 @@ def start_instances(self, instance_ids: list) -> dict:
116119
"""
117120
self.log.info("Starting instances: %s", instance_ids)
118121

119-
return self.conn.start_instances(InstanceIds=instance_ids)
122+
result = self.conn.start_instances(InstanceIds=instance_ids)
123+
self.log.debug("start_instances response: %s", result.get("StartingInstances"))
124+
return result
120125

121126
@only_client_type
122127
def terminate_instances(self, instance_ids: list) -> dict:
@@ -128,7 +133,9 @@ def terminate_instances(self, instance_ids: list) -> dict:
128133
"""
129134
self.log.info("Terminating instances: %s", instance_ids)
130135

131-
return self.conn.terminate_instances(InstanceIds=instance_ids)
136+
result = self.conn.terminate_instances(InstanceIds=instance_ids)
137+
self.log.debug("terminate_instances response: %s", result.get("TerminatingInstances"))
138+
return result
132139

133140
@only_client_type
134141
def describe_instances(self, filters: list | None = None, instance_ids: list | None = None):
@@ -173,9 +180,12 @@ def get_instance_ids(self, filters: list | None = None) -> list:
173180
return [instance["InstanceId"] for instance in self.get_instances(filters=filters)]
174181

175182
async def get_instance_state_async(self, instance_id: str) -> str:
183+
self.log.debug("Getting instance state (async) for %s", instance_id)
176184
async with await self.get_async_conn() as client:
177185
response = await client.describe_instances(InstanceIds=[instance_id])
178-
return response["Reservations"][0]["Instances"][0]["State"]["Name"]
186+
state = response["Reservations"][0]["Instances"][0]["State"]["Name"]
187+
self.log.debug("Instance %s state (async): %s", instance_id, state)
188+
return state
179189

180190
def get_instance_state(self, instance_id: str) -> str:
181191
"""
@@ -200,8 +210,15 @@ def wait_for_state(self, instance_id: str, target_state: str, check_interval: fl
200210
:return: None
201211
"""
202212
instance_state = self.get_instance_state(instance_id=instance_id)
213+
self.log.debug(
214+
"Waiting for instance %s to reach state '%s', current state: '%s'",
215+
instance_id,
216+
target_state,
217+
instance_state,
218+
)
203219

204220
while instance_state != target_state:
221+
self.log.debug("Sleeping %ss before next state check", check_interval)
205222
time.sleep(check_interval)
206223
instance_state = self.get_instance_state(instance_id=instance_id)
207224

providers/amazon/src/airflow/providers/amazon/aws/hooks/lambda_function.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,19 @@ def invoke_lambda(
8080
"Payload": payload,
8181
"Qualifier": qualifier,
8282
}
83-
return self.conn.invoke(**trim_none_values(invoke_args))
83+
self.log.debug(
84+
"Invoking Lambda function %s with invocation type %s, qualifier %s",
85+
function_name,
86+
invocation_type,
87+
qualifier,
88+
)
89+
response = self.conn.invoke(**trim_none_values(invoke_args))
90+
self.log.debug(
91+
"Lambda invoke response: StatusCode=%s, FunctionError=%s",
92+
response.get("StatusCode"),
93+
response.get("FunctionError"),
94+
)
95+
return response
8496

8597
def create_lambda(
8698
self,
@@ -192,7 +204,20 @@ def create_lambda(
192204
"SnapStart": snap_start,
193205
"LoggingConfig": logging_config,
194206
}
195-
return self.conn.create_function(**trim_none_values(create_function_args))
207+
self.log.debug(
208+
"Creating Lambda function %s with runtime %s, handler %s, package type %s",
209+
function_name,
210+
runtime,
211+
handler,
212+
package_type,
213+
)
214+
response = self.conn.create_function(**trim_none_values(create_function_args))
215+
self.log.debug(
216+
"Lambda function created: ARN=%s, State=%s",
217+
response.get("FunctionArn"),
218+
response.get("State"),
219+
)
220+
return response
196221

197222
@staticmethod
198223
@return_on_error(None)

providers/amazon/src/airflow/providers/amazon/aws/hooks/rds.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,14 @@ def get_db_snapshot_state(self, snapshot_id: str) -> str:
6161
:return: Returns the status of the DB snapshot as a string (eg. "available")
6262
:raises AirflowNotFoundException: If the DB instance snapshot does not exist.
6363
"""
64+
self.log.debug("Retrieving state for DB snapshot %s", snapshot_id)
6465
try:
6566
response = self.conn.describe_db_snapshots(DBSnapshotIdentifier=snapshot_id)
6667
except self.conn.exceptions.DBSnapshotNotFoundFault as e:
67-
raise AirflowNotFoundException(e)
68-
return response["DBSnapshots"][0]["Status"].lower()
68+
raise AirflowNotFoundException(e) from e
69+
state = response["DBSnapshots"][0]["Status"].lower()
70+
self.log.debug("DB snapshot %s state: %s", snapshot_id, state)
71+
return state
6972

7073
def wait_for_db_snapshot_state(
7174
self, snapshot_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40
@@ -107,11 +110,14 @@ def get_db_cluster_snapshot_state(self, snapshot_id: str) -> str:
107110
:return: Returns the status of the DB cluster snapshot as a string (eg. "available")
108111
:raises AirflowNotFoundException: If the DB cluster snapshot does not exist.
109112
"""
113+
self.log.debug("Retrieving state for DB cluster snapshot %s", snapshot_id)
110114
try:
111115
response = self.conn.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier=snapshot_id)
112116
except self.conn.exceptions.DBClusterSnapshotNotFoundFault as e:
113-
raise AirflowNotFoundException(e)
114-
return response["DBClusterSnapshots"][0]["Status"].lower()
117+
raise AirflowNotFoundException(e) from e
118+
state = response["DBClusterSnapshots"][0]["Status"].lower()
119+
self.log.debug("DB cluster snapshot %s state: %s", snapshot_id, state)
120+
return state
115121

116122
def wait_for_db_cluster_snapshot_state(
117123
self, snapshot_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40
@@ -153,13 +159,16 @@ def get_export_task_state(self, export_task_id: str) -> str:
153159
:return: Returns the status of the snapshot export task as a string (eg. "canceled")
154160
:raises AirflowNotFoundException: If the export task does not exist.
155161
"""
162+
self.log.debug("Retrieving state for export task %s", export_task_id)
156163
try:
157164
response = self.conn.describe_export_tasks(ExportTaskIdentifier=export_task_id)
158165
except self.conn.exceptions.ClientError as e:
159166
if e.response["Error"]["Code"] in ("ExportTaskNotFound", "ExportTaskNotFoundFault"):
160-
raise AirflowNotFoundException(e)
161-
raise e
162-
return response["ExportTasks"][0]["Status"].lower()
167+
raise AirflowNotFoundException(e) from e
168+
raise
169+
state = response["ExportTasks"][0]["Status"].lower()
170+
self.log.debug("Export task %s state: %s", export_task_id, state)
171+
return state
163172

164173
def wait_for_export_task_state(
165174
self, export_task_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40
@@ -194,13 +203,16 @@ def get_event_subscription_state(self, subscription_name: str) -> str:
194203
:return: Returns the status of the event subscription as a string (eg. "active")
195204
:raises AirflowNotFoundException: If the event subscription does not exist.
196205
"""
206+
self.log.debug("Retrieving state for event subscription %s", subscription_name)
197207
try:
198208
response = self.conn.describe_event_subscriptions(SubscriptionName=subscription_name)
199209
except self.conn.exceptions.ClientError as e:
200210
if e.response["Error"]["Code"] in ("SubscriptionNotFoundFault", "SubscriptionNotFound"):
201-
raise AirflowNotFoundException(e)
202-
raise e
203-
return response["EventSubscriptionsList"][0]["Status"].lower()
211+
raise AirflowNotFoundException(e) from e
212+
raise
213+
state = response["EventSubscriptionsList"][0]["Status"].lower()
214+
self.log.debug("Event subscription %s state: %s", subscription_name, state)
215+
return state
204216

205217
def wait_for_event_subscription_state(
206218
self, subscription_name: str, target_state: str, check_interval: int = 30, max_attempts: int = 40
@@ -235,11 +247,14 @@ def get_db_instance_state(self, db_instance_id: str) -> str:
235247
:return: Returns the status of the DB instance as a string (eg. "available")
236248
:raises AirflowNotFoundException: If the DB instance does not exist.
237249
"""
250+
self.log.debug("Retrieving state for DB instance %s", db_instance_id)
238251
try:
239252
response = self.conn.describe_db_instances(DBInstanceIdentifier=db_instance_id)
240253
except self.conn.exceptions.DBInstanceNotFoundFault as e:
241-
raise AirflowNotFoundException(e)
242-
return response["DBInstances"][0]["DBInstanceStatus"].lower()
254+
raise AirflowNotFoundException(e) from e
255+
state = response["DBInstances"][0]["DBInstanceStatus"].lower()
256+
self.log.debug("DB instance %s state: %s", db_instance_id, state)
257+
return state
243258

244259
def wait_for_db_instance_state(
245260
self, db_instance_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40
@@ -286,11 +301,14 @@ def get_db_cluster_state(self, db_cluster_id: str) -> str:
286301
:return: Returns the status of the DB cluster as a string (eg. "available")
287302
:raises AirflowNotFoundException: If the DB cluster does not exist.
288303
"""
304+
self.log.debug("Retrieving state for DB cluster %s", db_cluster_id)
289305
try:
290306
response = self.conn.describe_db_clusters(DBClusterIdentifier=db_cluster_id)
291307
except self.conn.exceptions.DBClusterNotFoundFault as e:
292-
raise AirflowNotFoundException(e)
293-
return response["DBClusters"][0]["Status"].lower()
308+
raise AirflowNotFoundException(e) from e
309+
state = response["DBClusters"][0]["Status"].lower()
310+
self.log.debug("DB cluster %s state: %s", db_cluster_id, state)
311+
return state
294312

295313
def wait_for_db_cluster_state(
296314
self, db_cluster_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40

providers/amazon/src/airflow/providers/amazon/aws/hooks/sqs.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ def create_queue(self, queue_name: str, attributes: dict | None = None) -> dict:
5151
:param attributes: additional attributes for the queue (default: None)
5252
:return: dict with the information about the queue.
5353
"""
54-
return self.get_conn().create_queue(QueueName=queue_name, Attributes=attributes or {})
54+
self.log.debug("Creating SQS queue %s with attributes %s", queue_name, attributes)
55+
result = self.get_conn().create_queue(QueueName=queue_name, Attributes=attributes or {})
56+
self.log.debug("Created SQS queue %s, response: %s", queue_name, result.get("QueueUrl"))
57+
return result
5558

5659
@staticmethod
5760
def _build_msg_params(
@@ -104,7 +107,10 @@ def send_message(
104107
message_group_id=message_group_id,
105108
message_deduplication_id=message_deduplication_id,
106109
)
107-
return self.get_conn().send_message(**params)
110+
self.log.debug("Sending message to SQS queue %s with delay %ds", queue_url, delay_seconds)
111+
result = self.get_conn().send_message(**params)
112+
self.log.debug("Message sent to %s, MessageId: %s", queue_url, result.get("MessageId"))
113+
return result
108114

109115
async def asend_message(
110116
self,
@@ -138,5 +144,12 @@ async def asend_message(
138144
message_deduplication_id=message_deduplication_id,
139145
)
140146

147+
self.log.debug(
148+
"Sending message (async) to SQS queue %s with delay %ds",
149+
queue_url,
150+
delay_seconds,
151+
)
141152
async with await self.get_async_conn() as async_conn:
142-
return await async_conn.send_message(**params)
153+
result = await async_conn.send_message(**params)
154+
self.log.debug("Message sent (async) to %s, MessageId: %s", queue_url, result.get("MessageId"))
155+
return result

0 commit comments

Comments
 (0)