diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index cebe9a829ea5d..39d0f072a8acd 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -22,7 +22,7 @@ import shutil from functools import cached_property from pathlib import Path -from typing import Collection +from typing import TYPE_CHECKING, Collection # not sure why but mypy complains on missing `storage` but it is clearly there and is importable from google.cloud import storage # type: ignore[attr-defined] @@ -36,6 +36,9 @@ from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +if TYPE_CHECKING: + from airflow.models.taskinstance import TaskInstance + _DEFAULT_SCOPESS = frozenset( [ "https://www.googleapis.com/auth/devstorage.read_write", @@ -96,6 +99,7 @@ def __init__( **kwargs, ): super().__init__(base_log_folder, filename_template) + self.handler: logging.FileHandler | None = None self.remote_base = gcs_log_folder self.log_relative_path = "" self.closed = False @@ -137,15 +141,21 @@ def client(self) -> storage.Client: project=self.project_id if self.project_id else project_id, ) - def set_context(self, ti): - super().set_context(ti) + def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: + if getattr(self, "supports_task_context_logging", False): + super().set_context(ti, identifier=identifier) + else: + super().set_context(ti) # Log relative path is used to construct local and remote # log path to upload log files into GCS and read from the # remote location. + if TYPE_CHECKING: + assert self.handler is not None + full_path = self.handler.baseFilename self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix() is_trigger_log_context = getattr(ti, "is_trigger_log_context", False) - self.upload_on_close = is_trigger_log_context or not ti.raw + self.upload_on_close = is_trigger_log_context or not getattr(ti, "raw", None) def close(self): """Close and upload local log file to remote storage GCS."""