From c5a9e2341fefc9bfe302b664a1126583825c5648 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 19 Jan 2024 16:34:12 +0800 Subject: [PATCH 1/2] style(providers/google): improve BigQueryInsertJobOperator type hinting --- airflow/providers/google/cloud/operators/bigquery.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index deadf0d193604..6b3cbf53b4bd8 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2810,7 +2810,7 @@ def execute(self, context: Any): job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id) except Conflict: # If the job already exists retrieve it - job = hook.get_job( + job: CopyJob | QueryJob | LoadJob | ExtractJob = hook.get_job( project_id=self.project_id, location=self.location, job_id=self.job_id, @@ -2852,6 +2852,7 @@ def execute(self, context: Any): persist_kwargs["dataset_id"] = table["datasetId"] persist_kwargs["project_id"] = table["projectId"] BigQueryTableLink.persist(**persist_kwargs) + self.job_id = job.job_id if self.project_id: @@ -2861,6 +2862,7 @@ def execute(self, context: Any): location=self.location, ) context["ti"].xcom_push(key="job_id_path", value=job_id_path) + # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) @@ -2882,7 +2884,7 @@ def execute(self, context: Any): self.log.info("Current state of job %s is %s", job.job_id, job.state) self._handle_job_error(job) - def execute_complete(self, context: Context, event: dict[str, Any]): + def execute_complete(self, context: Context, event: dict[str, Any]) -> str: """Callback for when the trigger fires. This returns immediately. It relies on trigger to throw an exception, From b45943d6117c596d8bf1c85c763a13e73c389940 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sat, 20 Jan 2024 15:55:28 +0800 Subject: [PATCH 2/2] style: resolve mypy failure --- airflow/providers/google/cloud/operators/bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 6b3cbf53b4bd8..f10e5fbf5c2e6 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2810,7 +2810,7 @@ def execute(self, context: Any): job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id) except Conflict: # If the job already exists retrieve it - job: CopyJob | QueryJob | LoadJob | ExtractJob = hook.get_job( + job = hook.get_job( project_id=self.project_id, location=self.location, job_id=self.job_id, @@ -2884,7 +2884,7 @@ def execute(self, context: Any): self.log.info("Current state of job %s is %s", job.job_id, job.state) self._handle_job_error(job) - def execute_complete(self, context: Context, event: dict[str, Any]) -> str: + def execute_complete(self, context: Context, event: dict[str, Any]) -> str | None: """Callback for when the trigger fires. This returns immediately. It relies on trigger to throw an exception,