Skip to content

Commit c089cbd

Browse files
committed
Fix Neptune operator execute_complete missing error handling
Both NeptuneStartDbClusterOperator and NeptuneStopDbClusterOperator silently succeeded when the trigger reported a failure — no status validation, so the task was marked success and downstream tasks would start against a cluster that wasn't actually running. Also fixes wrong XCom key (cluster_id → db_cluster_id) that caused empty cluster ID in return value, and removes debug self.log.info(event).
1 parent ac6fb17 commit c089cbd

1 file changed

Lines changed: 9 additions & 15 deletions

File tree

  • providers/amazon/src/airflow/providers/amazon/aws/operators

providers/amazon/src/airflow/providers/amazon/aws/operators/neptune.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,11 @@ def execute(self, context: Context, event: dict[str, Any] | None = None, **kwarg
187187
return {"db_cluster_id": self.cluster_id}
188188

189189
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> dict[str, str]:
190-
status = ""
191-
cluster_id = ""
190+
if event and event.get("status") != "success":
191+
raise AirflowException(f"Error starting Neptune cluster: {event}")
192192

193-
if event:
194-
status = event.get("status", "")
195-
cluster_id = event.get("cluster_id", "")
196-
197-
self.log.info("Neptune cluster %s available with status: %s", cluster_id, status)
193+
cluster_id = event.get("db_cluster_id", "") if event else ""
194+
self.log.info("Neptune cluster %s available.", cluster_id)
198195

199196
return {"db_cluster_id": cluster_id}
200197

@@ -314,13 +311,10 @@ def execute(self, context: Context, event: dict[str, Any] | None = None, **kwarg
314311
return {"db_cluster_id": self.cluster_id}
315312

316313
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> dict[str, str]:
317-
status = ""
318-
cluster_id = ""
319-
self.log.info(event)
320-
if event:
321-
status = event.get("status", "")
322-
cluster_id = event.get("cluster_id", "")
323-
324-
self.log.info("Neptune cluster %s stopped with status: %s", cluster_id, status)
314+
if event and event.get("status") != "success":
315+
raise AirflowException(f"Error stopping Neptune cluster: {event}")
316+
317+
cluster_id = event.get("db_cluster_id", "") if event else ""
318+
self.log.info("Neptune cluster %s stopped.", cluster_id)
325319

326320
return {"db_cluster_id": cluster_id}

0 commit comments

Comments
 (0)