diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index f358dd9ee1bb0..57858fc4270ac 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -53,7 +53,9 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.secrets.cache import SecretCache from airflow.stats import Stats +from airflow.traces.tracer import Trace, span from airflow.utils import timezone +from airflow.utils.dates import datetime_to_nano from airflow.utils.file import list_py_file_paths, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.mixins import MultiprocessingStartMethodMixin @@ -228,7 +230,9 @@ def _run_processor_manager( # to kill all sub-process of this at the OS-level, rather than having # to iterate the child processes set_new_process_group() - + span = Trace.get_current_span() + span.set_attribute("dag_directory", str(dag_directory)) + span.set_attribute("dag_ids", str(dag_ids)) setproctitle("airflow scheduler -- DagFileProcessorManager") reload_configuration_for_dag_processing() processor_manager = DagFileProcessorManager( @@ -258,8 +262,10 @@ def heartbeat(self) -> None: self._heartbeat_manager() def _process_message(self, message): + span = Trace.get_current_span() self.log.debug("Received message of type %s", type(message).__name__) if isinstance(message, DagParsingStat): + span.set_attribute("all_files_processed", str(message.all_files_processed)) self._sync_metadata(message) else: raise RuntimeError(f"Unexpected message received of type {type(message).__name__}") @@ -562,118 +568,144 @@ def _run_parsing_loop(self): # in sync mode we need to be told to start a "loop" self.start_new_processes() while True: - loop_start_time = time.monotonic() - ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time) - self.heartbeat() - if self._direct_scheduler_conn is not None and self._direct_scheduler_conn in ready: - agent_signal = self._direct_scheduler_conn.recv() - - self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) - if agent_signal == DagParsingSignal.TERMINATE_MANAGER: - self.terminate() - break - elif agent_signal == DagParsingSignal.END_MANAGER: - self.end() - sys.exit(os.EX_OK) - elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: - # continue the loop to parse dags - pass - elif isinstance(agent_signal, CallbackRequest): - self._add_callback_to_queue(agent_signal) - else: - raise ValueError(f"Invalid message {type(agent_signal)}") - - if not ready and not self._async_mode: - # In "sync" mode we don't want to parse the DAGs until we - # are told to (as that would open another connection to the - # SQLite DB which isn't a good practice - - # This shouldn't happen, as in sync mode poll should block for - # ever. Lets be defensive about that. - self.log.warning( - "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time - ) - - continue - - for sentinel in ready: - if sentinel is not self._direct_scheduler_conn: - processor = self.waitables.get(sentinel) - if processor: - self._collect_results_from_processor(processor) - self.waitables.pop(sentinel) - self._processors.pop(processor.file_path) - - if self.standalone_dag_processor: - self._fetch_callbacks(max_callbacks_per_loop) - self._scan_stale_dags() - DagWarning.purge_inactive_dag_warnings() - refreshed_dag_dir = self._refresh_dag_dir() - - self._kill_timed_out_processors() - - # Generate more file paths to process if we processed all the files already. Note for this - # to clear down, we must have cleared all files found from scanning the dags dir _and_ have - # cleared all files added as a result of callbacks - if not self._file_path_queue: - self.emit_metrics() - self.prepare_file_path_queue() - - # if new files found in dag dir, add them - elif refreshed_dag_dir: - self.add_new_file_path_to_queue() - - self._refresh_requested_filelocs() - self.start_new_processes() - - # Update number of loop iteration. - self._num_run += 1 - - if not self._async_mode: - self.log.debug("Waiting for processors to finish since we're using sqlite") - # Wait until the running DAG processors are finished before - # sending a DagParsingStat message back. This means the Agent - # can tell we've got to the end of this iteration when it sees - # this type of message - self.wait_until_finished() - - # Collect anything else that has finished, but don't kick off any more processors - self.collect_results() + with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span: + loop_start_time = time.monotonic() + ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time) + if span.is_recording(): + span.add_event(name="heartbeat") + self.heartbeat() + if self._direct_scheduler_conn is not None and self._direct_scheduler_conn in ready: + agent_signal = self._direct_scheduler_conn.recv() + + self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) + if agent_signal == DagParsingSignal.TERMINATE_MANAGER: + if span.is_recording(): + span.add_event(name="terminate") + self.terminate() + break + elif agent_signal == DagParsingSignal.END_MANAGER: + if span.is_recording(): + span.add_event(name="end") + self.end() + sys.exit(os.EX_OK) + elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: + # continue the loop to parse dags + pass + elif isinstance(agent_signal, CallbackRequest): + self._add_callback_to_queue(agent_signal) + else: + raise ValueError(f"Invalid message {type(agent_signal)}") + + if not ready and not self._async_mode: + # In "sync" mode we don't want to parse the DAGs until we + # are told to (as that would open another connection to the + # SQLite DB which isn't a good practice + + # This shouldn't happen, as in sync mode poll should block for + # ever. Lets be defensive about that. + self.log.warning( + "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time + ) - self._print_stat() + continue - all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) - max_runs_reached = self.max_runs_reached() + for sentinel in ready: + if sentinel is not self._direct_scheduler_conn: + processor = self.waitables.get(sentinel) + if processor: + self._collect_results_from_processor(processor) + self.waitables.pop(sentinel) + self._processors.pop(processor.file_path) + + if self.standalone_dag_processor: + self._fetch_callbacks(max_callbacks_per_loop) + self._scan_stale_dags() + DagWarning.purge_inactive_dag_warnings() + refreshed_dag_dir = self._refresh_dag_dir() + + if span.is_recording(): + span.add_event(name="_kill_timed_out_processors") + self._kill_timed_out_processors() + + # Generate more file paths to process if we processed all the files already. Note for this + # to clear down, we must have cleared all files found from scanning the dags dir _and_ have + # cleared all files added as a result of callbacks + if not self._file_path_queue: + self.emit_metrics() + if span.is_recording(): + span.add_event(name="prepare_file_path_queue") + self.prepare_file_path_queue() + + # if new files found in dag dir, add them + elif refreshed_dag_dir: + if span.is_recording(): + span.add_event(name="add_new_file_path_to_queue") + self.add_new_file_path_to_queue() + + self._refresh_requested_filelocs() + if span.is_recording(): + span.add_event(name="start_new_processes") + self.start_new_processes() + + # Update number of loop iteration. + self._num_run += 1 + + if not self._async_mode: + self.log.debug("Waiting for processors to finish since we're using sqlite") + # Wait until the running DAG processors are finished before + # sending a DagParsingStat message back. This means the Agent + # can tell we've got to the end of this iteration when it sees + # this type of message + self.wait_until_finished() + + # Collect anything else that has finished, but don't kick off any more processors + if span.is_recording(): + span.add_event(name="collect_results") + self.collect_results() + + if span.is_recording(): + span.add_event(name="print_stat") + self._print_stat() + + all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) + max_runs_reached = self.max_runs_reached() - try: - if self._direct_scheduler_conn: - self._direct_scheduler_conn.send( - DagParsingStat( - max_runs_reached, - all_files_processed, + try: + if self._direct_scheduler_conn: + self._direct_scheduler_conn.send( + DagParsingStat( + max_runs_reached, + all_files_processed, + ) ) + except BlockingIOError: + # Try again next time around the loop! + + # It is better to fail, than it is deadlock. This should + # "almost never happen" since the DagParsingStat object is + # small, and in async mode this stat is not actually _required_ + # for normal operation (It only drives "max runs") + self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring") + + if max_runs_reached: + self.log.info( + "Exiting dag parsing loop as all files have been processed %s times", self._max_runs ) - except BlockingIOError: - # Try again next time around the loop! - - # It is better to fail, than it is deadlock. This should - # "almost never happen" since the DagParsingStat object is - # small, and in async mode this stat is not actually _required_ - # for normal operation (It only drives "max runs") - self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring") - - if max_runs_reached: - self.log.info( - "Exiting dag parsing loop as all files have been processed %s times", self._max_runs - ) - break + if span.is_recording(): + span.add_event( + name="info", + attributes={ + "message": "Exiting dag parsing loop as all files have been processed {self._max_runs} times" + }, + ) + break - if self._async_mode: - loop_duration = time.monotonic() - loop_start_time - if loop_duration < 1: - poll_time = 1 - loop_duration - else: - poll_time = 0.0 + if self._async_mode: + loop_duration = time.monotonic() - loop_start_time + if loop_duration < 1: + poll_time = 1 - loop_duration + else: + poll_time = 0.0 @provide_session def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): @@ -1103,6 +1135,24 @@ def _collect_results_from_processor(self, processor) -> None: ) self._file_stats[processor.file_path] = stat file_name = Path(processor.file_path).stem + """crude exposure of instrumentation code which may need to be furnished""" + span = Trace.get_tracer("DagFileProcessorManager").start_span( + "dag_processing", start_time=datetime_to_nano(processor.start_time) + ) + span.set_attribute("file_path", processor.file_path) + span.set_attribute("run_count", self.get_run_count(processor.file_path) + 1) + + if processor.result is None: + span.set_attribute("error", True) + span.set_attribute("processor.exit_code", processor.exit_code) + else: + span.set_attribute("num_dags", num_dags) + span.set_attribute("import_errors", count_import_errors) + if count_import_errors > 0: + span.set_attribute("error", True) + + span.end(end_time=datetime_to_nano(last_finish_time)) + Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration) Stats.timing("dag_processing.last_duration", last_duration, tags={"file_name": file_name}) @@ -1134,6 +1184,7 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req callback_requests=callback_requests, ) + @span def start_new_processes(self): """Start more processors if we have enough slots and files to process.""" # initialize cache to mutualize calls to Variable.get in DAGs @@ -1157,14 +1208,21 @@ def start_new_processes(self): del self._callback_to_execute[file_path] Stats.incr("dag_processing.processes", tags={"file_path": file_path, "action": "start"}) - + span = Trace.get_current_span() + span.set_attribute("category", "processing") processor.start() self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path) + if span.is_recording(): + span.add_event( + name="dag_processing processor started", + attributes={"file_path": file_path, "pid": processor.pid}, + ) self._processors[file_path] = processor self.waitables[processor.waitable_handle] = processor Stats.gauge("dag_processing.file_path_queue_size", len(self._file_path_queue)) + @span def add_new_file_path_to_queue(self): for file_path in self.file_paths: if file_path not in self._file_stats: @@ -1172,6 +1230,11 @@ def add_new_file_path_to_queue(self): self.log.info("Adding new file %s to parsing queue", file_path) self._file_stats[file_path] = DagFileProcessorManager.DEFAULT_FILE_STAT self._file_path_queue.appendleft(file_path) + span = Trace.get_current_span() + if span.is_recording(): + span.add_event( + name="adding new file to parsing queue", attributes={"file_path": file_path} + ) def prepare_file_path_queue(self): """ @@ -1285,6 +1348,13 @@ def _kill_timed_out_processors(self): # Deprecated; may be removed in a future Airflow release. Stats.incr("dag_file_processor_timeouts") processor.kill() + span = Trace.get_current_span() + span.set_attribute("category", "processing") + if span.is_recording(): + span.add_event( + name="dag processing killed processor", + attributes={"file_path": file_path, "action": "timeout"}, + ) # Clean up processor references self.waitables.pop(processor.waitable_handle) @@ -1345,12 +1415,16 @@ def emit_metrics(self): This is called once every time around the parsing "loop" - i.e. after all files have been parsed. """ - parse_time = time.perf_counter() - self._parsing_start_time - Stats.gauge("dag_processing.total_parse_time", parse_time) - Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) - Stats.gauge( - "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) - ) + with Trace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: + parse_time = time.perf_counter() - self._parsing_start_time + Stats.gauge("dag_processing.total_parse_time", parse_time) + Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values())) + Stats.gauge( + "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values()) + ) + span.set_attribute("total_parse_time", parse_time) + span.set_attribute("dag_bag_size", sum(stat.num_dags for stat in self._file_stats.values())) + span.set_attribute("import_errors", sum(stat.import_errors for stat in self._file_stats.values())) @property def file_paths(self): diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 1a13877f889d0..bd2dd40e84eff 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -32,6 +32,9 @@ from airflow.configuration import conf from airflow.exceptions import RemovedInAirflow3Warning from airflow.stats import Stats +from airflow.traces import NO_TRACE_ID +from airflow.traces.tracer import Trace, gen_context, span +from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.task_context_logger import TaskContextLogger from airflow.utils.state import TaskInstanceState @@ -211,6 +214,7 @@ def sync(self) -> None: Executors should override this to perform gather statuses. """ + @span def heartbeat(self) -> None: """Heartbeat sent to trigger new jobs.""" if not self.parallelism: @@ -228,6 +232,17 @@ def heartbeat(self) -> None: else: self.log.debug("%s open slots", open_slots) + span = Trace.get_current_span() + if span.is_recording(): + span.add_event( + name="executor", + attributes={ + "executor.open_slots": open_slots, + "executor.queued_tasks": num_queued_tasks, + "executor.running_tasks": num_running_tasks, + }, + ) + Stats.gauge( "executor.open_slots", value=open_slots, tags={"status": "open", "name": self.__class__.__name__} ) @@ -260,12 +275,14 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, QueuedTa reverse=True, ) + @span def trigger_tasks(self, open_slots: int) -> None: """ Initiate async execution of the queued tasks, up to the number of available slots. :param open_slots: Number of open slots """ + span = Trace.get_current_span() sorted_queue = self.order_queued_tasks_by_priority() task_tuples = [] @@ -302,15 +319,40 @@ def trigger_tasks(self, open_slots: int) -> None: if key in self.attempts: del self.attempts[key] task_tuples.append((key, command, queue, ti.executor_config)) + if span.is_recording(): + span.add_event( + name="task to trigger", + attributes={"command": str(command), "conf": str(ti.executor_config)}, + ) if task_tuples: self._process_tasks(task_tuples) + @span def _process_tasks(self, task_tuples: list[TaskTuple]) -> None: for key, command, queue, executor_config in task_tuples: - del self.queued_tasks[key] - self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) - self.running.add(key) + task_instance = self.queued_tasks[key][3] # TaskInstance in fourth element + trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True)) + span_id = int(gen_span_id_from_ti_key(key, as_int=True)) + links = [{"trace_id": trace_id, "span_id": span_id}] + + # assuming that the span_id will very likely be unique inside the trace + with Trace.start_span( + span_name=f"{key.dag_id}.{key.task_id}", + component="BaseExecutor", + span_id=span_id, + links=links, + ) as span: + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number) + span.set_attribute("command", str(command)) + span.set_attribute("queue", str(queue)) + span.set_attribute("executor_config", str(executor_config)) + del self.queued_tasks[key] + self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) + self.running.add(key) def change_state( self, key: TaskInstanceKey, state: TaskInstanceState, info=None, remove_running=True @@ -338,6 +380,20 @@ def fail(self, key: TaskInstanceKey, info=None) -> None: :param info: Executor information for the task instance :param key: Unique key for the task instance """ + trace_id = Trace.get_current_span().get_span_context().trace_id + if trace_id != NO_TRACE_ID: + span_id = int(gen_span_id_from_ti_key(key, as_int=True)) + with Trace.start_span( + span_name="fail", + component="BaseExecutor", + parent_sc=gen_context(trace_id=trace_id, span_id=span_id), + ) as span: + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number) + span.set_attribute("error", True) + self.change_state(key, TaskInstanceState.FAILED, info) def success(self, key: TaskInstanceKey, info=None) -> None: @@ -347,6 +403,19 @@ def success(self, key: TaskInstanceKey, info=None) -> None: :param info: Executor information for the task instance :param key: Unique key for the task instance """ + trace_id = Trace.get_current_span().get_span_context().trace_id + if trace_id != NO_TRACE_ID: + span_id = int(gen_span_id_from_ti_key(key, as_int=True)) + with Trace.start_span( + span_name="success", + component="BaseExecutor", + parent_sc=gen_context(trace_id=trace_id, span_id=span_id), + ) as span: + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number - 1) + self.change_state(key, TaskInstanceState.SUCCESS, info) def queued(self, key: TaskInstanceKey, info=None) -> None: diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 3b2670e75592c..90cedf7dbd6d3 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -39,6 +39,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.executors.base_executor import PARALLELISM, BaseExecutor +from airflow.traces.tracer import Trace, span from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -77,6 +78,7 @@ def run(self): setproctitle("airflow worker -- LocalExecutor") return super().run() + @span def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: """ Execute command received and stores result state in queue. @@ -98,6 +100,7 @@ def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: # Remove the command since the worker is done executing the task setproctitle("airflow worker -- LocalExecutor") + @span def _execute_work_in_subprocess(self, command: CommandType) -> TaskInstanceState: try: subprocess.check_call(command, close_fds=True) @@ -106,6 +109,7 @@ def _execute_work_in_subprocess(self, command: CommandType) -> TaskInstanceState self.log.error("Failed to execute task %s.", e) return TaskInstanceState.FAILED + @span def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState: pid = os.fork() if pid: @@ -165,6 +169,7 @@ def __init__( self.key: TaskInstanceKey = key self.command: CommandType = command + @span def do_work(self) -> None: self.execute_work(key=self.key, command=self.command) @@ -184,6 +189,7 @@ def __init__(self, task_queue: Queue[ExecutorWorkType], result_queue: Queue[Task super().__init__(result_queue=result_queue) self.task_queue = task_queue + @span def do_work(self) -> None: while True: try: @@ -244,6 +250,7 @@ def start(self) -> None: self.executor.workers_used = 0 self.executor.workers_active = 0 + @span def execute_async( self, key: TaskInstanceKey, @@ -262,6 +269,14 @@ def execute_async( if TYPE_CHECKING: assert self.executor.result_queue + span = Trace.get_current_span() + if span.is_recording(): + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("commands_to_run", str(command)) + local_worker = LocalWorker(self.executor.result_queue, key=key, command=command) self.executor.workers_used += 1 self.executor.workers_active += 1 @@ -311,6 +326,7 @@ def start(self) -> None: for worker in self.executor.workers: worker.start() + @span def execute_async( self, key: TaskInstanceKey, @@ -372,6 +388,7 @@ def start(self) -> None: self.impl.start() + @span def execute_async( self, key: TaskInstanceKey, diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 41b8ae9ddc244..1b145892ebc7e 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -29,6 +29,7 @@ from typing import TYPE_CHECKING, Any from airflow.executors.base_executor import BaseExecutor +from airflow.traces.tracer import Trace, span if TYPE_CHECKING: from airflow.executors.base_executor import CommandType @@ -59,6 +60,7 @@ def __init__(self): super().__init__() self.commands_to_run = [] + @span def execute_async( self, key: TaskInstanceKey, @@ -69,6 +71,14 @@ def execute_async( self.validate_airflow_tasks_run_command(command) self.commands_to_run.append((key, command)) + span = Trace.get_current_span() + if span.is_recording(): + span.set_attribute("dag_id", key.dag_id) + span.set_attribute("run_id", key.run_id) + span.set_attribute("task_id", key.task_id) + span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("commands_to_run", str(self.commands_to_run)) + def sync(self) -> None: for key, command in self.commands_to_run: self.log.info("Executing command: %s", command) diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 4273f1d3345b5..9384821807283 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -34,6 +34,7 @@ from airflow.models.base import ID_LEN, Base from airflow.serialization.pydantic.job import JobPydantic from airflow.stats import Stats +from airflow.traces.tracer import Trace, span from airflow.utils import timezone from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.log.logging_mixin import LoggingMixin @@ -199,52 +200,62 @@ def heartbeat( :param session to use for saving the job """ previous_heartbeat = self.latest_heartbeat - - try: - # This will cause it to load from the db - self._merge_from(Job._fetch_from_db(self, session)) - previous_heartbeat = self.latest_heartbeat - - if self.state == JobState.RESTARTING: - self.kill() - - # Figure out how long to sleep for - sleep_for = 0 - if self.latest_heartbeat: - seconds_remaining = ( - self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() - ) - sleep_for = max(0, seconds_remaining) - sleep(sleep_for) - - job = Job._update_heartbeat(job=self, session=session) - self._merge_from(job) - time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds() - health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) - if time_since_last_heartbeat > health_check_threshold_value: - self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) - # At this point, the DB has updated. - previous_heartbeat = self.latest_heartbeat - - heartbeat_callback(session) - self.log.debug("[heartbeat]") - self.heartbeat_failed = False - except OperationalError: - Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) - if not self.heartbeat_failed: - self.log.exception("%s heartbeat failed with error", self.__class__.__name__) - self.heartbeat_failed = True - if self.is_alive(): - self.log.error( - "%s heartbeat failed with error. Scheduler may go into unhealthy state", - self.__class__.__name__, - ) - else: - self.log.error( - "%s heartbeat failed with error. Scheduler is in unhealthy state", self.__class__.__name__ - ) - # We didn't manage to heartbeat, so make sure that the timestamp isn't updated - self.latest_heartbeat = previous_heartbeat + with Trace.start_span(span_name="heartbeat", component="Job") as span: + try: + span.set_attribute("heartbeat", str(self.latest_heartbeat)) + # This will cause it to load from the db + self._merge_from(Job._fetch_from_db(self, session)) + previous_heartbeat = self.latest_heartbeat + + if self.state == JobState.RESTARTING: + self.kill() + + # Figure out how long to sleep for + sleep_for = 0 + if self.latest_heartbeat: + seconds_remaining = ( + self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() + ) + sleep_for = max(0, seconds_remaining) + if span.is_recording(): + span.add_event(name="sleep", attributes={"sleep_for": sleep_for}) + sleep(sleep_for) + + job = Job._update_heartbeat(job=self, session=session) + self._merge_from(job) + time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds() + health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) + if time_since_last_heartbeat > health_check_threshold_value: + self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) + # At this point, the DB has updated. + previous_heartbeat = self.latest_heartbeat + + heartbeat_callback(session) + self.log.debug("[heartbeat]") + self.heartbeat_failed = False + except OperationalError: + Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) + if not self.heartbeat_failed: + self.log.exception("%s heartbeat failed with error", self.__class__.__name__) + self.heartbeat_failed = True + msg = f"{self.__class__.__name__} heartbeat got an exception" + if span.is_recording(): + span.add_event(name="error", attributes={"message": msg}) + if self.is_alive(): + self.log.error( + "%s heartbeat failed with error. Scheduler may go into unhealthy state", + self.__class__.__name__, + ) + msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state" + if span.is_recording(): + span.add_event(name="error", attributes={"message": msg}) + else: + msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state" + self.log.error(msg) + if span.is_recording(): + span.add_event(name="error", attributes={"message": msg}) + # We didn't manage to heartbeat, so make sure that the timestamp isn't updated + self.latest_heartbeat = previous_heartbeat @provide_session def prepare_for_execution(self, session: Session = NEW_SESSION): @@ -448,6 +459,7 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N return ret +@span def perform_heartbeat( job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool ) -> None: diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index 96e36bcfe7d4b..a6a1f0ac8fa23 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -28,6 +28,7 @@ from airflow.jobs.job import perform_heartbeat from airflow.models.taskinstance import TaskReturnCode from airflow.stats import Stats +from airflow.traces.tracer import Trace from airflow.utils import timezone from airflow.utils.log.file_task_handler import _set_task_deferred_context_var from airflow.utils.log.logging_mixin import LoggingMixin @@ -184,39 +185,55 @@ def sigusr2_debug_handler(signum, frame): # If LocalTaskJob receives SIGTERM, LocalTaskJob passes SIGTERM to _run_raw_task # If the state of task_instance is changed, LocalTaskJob sends SIGTERM to _run_raw_task while not self.terminating: - # Monitor the task to see if it's done. Wait in a syscall - # (`os.wait`) for as long as possible so we notice the - # subprocess finishing as quick as we can - max_wait_time = max( - 0, # Make sure this value is never negative, - min( - ( - heartbeat_time_limit - - (timezone.utcnow() - self.job.latest_heartbeat).total_seconds() * 0.75 + with Trace.start_span( + span_name="local_task_job_loop", component="LocalTaskJobRunner" + ) as span: + # Monitor the task to see if it's done. Wait in a syscall + # (`os.wait`) for as long as possible so we notice the + # subprocess finishing as quick as we can + max_wait_time = max( + 0, # Make sure this value is never negative, + min( + ( + heartbeat_time_limit + - (timezone.utcnow() - self.job.latest_heartbeat).total_seconds() * 0.75 + ), + self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, ), - self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, - ), - ) - return_code = self.task_runner.return_code(timeout=max_wait_time) - if return_code is not None: - self.handle_task_exit(return_code) - return return_code - - perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=False - ) - - # If it's been too long since we've heartbeat, then it's possible that - # the scheduler rescheduled this task, so kill launched processes. - # This can only really happen if the worker can't read the DB for a long time - time_since_last_heartbeat = (timezone.utcnow() - self.job.latest_heartbeat).total_seconds() - if time_since_last_heartbeat > heartbeat_time_limit: - Stats.incr("local_task_job_prolonged_heartbeat_failure", 1, 1) - self.log.error("Heartbeat time limit exceeded!") - raise AirflowException( - f"Time since last heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit " - f"({heartbeat_time_limit}s)." ) + return_code = self.task_runner.return_code(timeout=max_wait_time) + if return_code is not None: + self.handle_task_exit(return_code) + return return_code + + if span.is_recording(): + span.add_event(name="perform_heartbeat") + perform_heartbeat( + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=False + ) + + # If it's been too long since we've heartbeat, then it's possible that + # the scheduler rescheduled this task, so kill launched processes. + # This can only really happen if the worker can't read the DB for a long time + time_since_last_heartbeat = ( + timezone.utcnow() - self.job.latest_heartbeat + ).total_seconds() + if time_since_last_heartbeat > heartbeat_time_limit: + Stats.incr("local_task_job_prolonged_heartbeat_failure", 1, 1) + self.log.error("Heartbeat time limit exceeded!") + if span.is_recording(): + span.add_event( + name="error", + attributes={ + "message": "Heartbeat time limit exceeded", + "heartbeat_time_limit(s)": heartbeat_time_limit, + "time_since_last_heartbeat(s)": time_since_last_heartbeat, + }, + ) + raise AirflowException( + f"Time since last heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit " + f"({heartbeat_time_limit}s)." + ) return return_code finally: # Print a marker for log grouping of details before task execution diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1d49f412b0d7f..7c2f21eb72425 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -59,7 +59,10 @@ from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES from airflow.timetables.simple import DatasetTriggeredTimetable +from airflow.traces import utils as trace_utils +from airflow.traces.tracer import Trace, span from airflow.utils import timezone +from airflow.utils.dates import datetime_to_nano from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.task_context_logger import TaskContextLogger @@ -814,6 +817,60 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) -> ti.pid, ) + with Trace.start_span_from_taskinstance(ti=ti) as span: + span.set_attribute("category", "scheduler") + span.set_attribute("task_id", ti.task_id) + span.set_attribute("dag_id", ti.dag_id) + span.set_attribute("state", ti.state) + if ti.state == TaskInstanceState.FAILED: + span.set_attribute("error", True) + span.set_attribute("start_date", str(ti.start_date)) + span.set_attribute("end_date", str(ti.end_date)) + span.set_attribute("duration", ti.duration) + span.set_attribute("executor_config", str(ti.executor_config)) + span.set_attribute("execution_date", str(ti.execution_date)) + span.set_attribute("hostname", ti.hostname) + span.set_attribute("log_url", ti.log_url) + span.set_attribute("operator", str(ti.operator)) + span.set_attribute("try_number", ti.try_number - 1) + span.set_attribute("executor_state", state) + span.set_attribute("job_id", ti.job_id) + span.set_attribute("pool", ti.pool) + span.set_attribute("queue", ti.queue) + span.set_attribute("priority_weight", ti.priority_weight) + span.set_attribute("queued_dttm", str(ti.queued_dttm)) + span.set_attribute("ququed_by_job_id", ti.queued_by_job_id) + span.set_attribute("pid", ti.pid) + if span.is_recording(): + span.add_event(name="queued", timestamp=datetime_to_nano(ti.queued_dttm)) + span.add_event(name="started", timestamp=datetime_to_nano(ti.start_date)) + span.add_event(name="ended", timestamp=datetime_to_nano(ti.end_date)) + if conf.has_option("traces", "otel_task_log_event") and conf.getboolean( + "traces", "otel_task_log_event" + ): + from airflow.utils.log.log_reader import TaskLogReader + + task_log_reader = TaskLogReader() + if task_log_reader.supports_read: + metadata: dict[str, Any] = {} + logs, metadata = task_log_reader.read_log_chunks(ti, ti.try_number, metadata) + if ti.hostname in dict(logs[0]): + message = str(dict(logs[0])[ti.hostname]).replace("\\n", "\n") + while metadata["end_of_log"] is False: + logs, metadata = task_log_reader.read_log_chunks( + ti, ti.try_number - 1, metadata + ) + if ti.hostname in dict(logs[0]): + message = message + str(dict(logs[0])[ti.hostname]).replace("\\n", "\n") + if span.is_recording(): + span.add_event( + name="task_log", + attributes={ + "message": message, + "metadata": str(metadata), + }, + ) + # There are two scenarios why the same TI with the same try_number is queued # after executor is finished with it: # 1) the TI was killed externally and it had no time to mark itself failed @@ -1042,13 +1099,16 @@ def _run_scheduler_loop(self) -> None: ) for loop_count in itertools.count(start=1): - with Stats.timer("scheduler.scheduler_loop_duration") as timer: - if self.using_sqlite and self.processor_agent: - self.processor_agent.run_single_parsing_loop() - # For the sqlite case w/ 1 thread, wait until the processor - # is finished to avoid concurrent access to the DB. - self.log.debug("Waiting for processors to finish since we're using sqlite") - self.processor_agent.wait_until_finished() + with Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span: + span.set_attribute("category", "scheduler") + span.set_attribute("loop_count", loop_count) + with Stats.timer("scheduler.scheduler_loop_duration") as timer: + if self.using_sqlite and self.processor_agent: + self.processor_agent.run_single_parsing_loop() + # For the sqlite case w/ 1 thread, wait until the processor + # is finished to avoid concurrent access to the DB. + self.log.debug("Waiting for processors to finish since we're using sqlite") + self.processor_agent.wait_until_finished() with create_session() as session: # This will schedule for as many executors as possible. @@ -1069,16 +1129,23 @@ def _run_scheduler_loop(self) -> None: if self.processor_agent: self.processor_agent.heartbeat() - # Heartbeat the scheduler periodically - perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True - ) + # Heartbeat the scheduler periodically + perform_heartbeat( + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True + ) - # Run any pending timed events - next_event = timers.run(blocking=False) - self.log.debug("Next timed event is in %f", next_event) + # Run any pending timed events + next_event = timers.run(blocking=False) + self.log.debug("Next timed event is in %f", next_event) self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration) + if span.is_recording(): + span.add_event( + name="Ran scheduling loop", + attributes={ + "duration in seconds": timer.duration, + }, + ) if not is_unit_test and not num_queued_tis and not num_finished_events: # If the scheduler is doing things, don't sleep. This means when there is work to do, the @@ -1092,6 +1159,8 @@ def _run_scheduler_loop(self) -> None: self.num_runs, loop_count, ) + if span.is_recording(): + span.add_event("Exiting scheduler loop as requested number of runs has been reached") break if self.processor_agent and self.processor_agent.done: self.log.info( @@ -1100,6 +1169,8 @@ def _run_scheduler_loop(self) -> None: self.num_times_parse_dags, loop_count, ) + if span.is_recording(): + span.add_event("Exiting scheduler loop as requested DAG parse count has been reached") break def _do_scheduling(self, session: Session) -> int: @@ -1219,6 +1290,7 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio guard.commit() # END: create dagruns + @span def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None: """Create a DAG run and update the dag_model to control if/when the next DAGRun should be created.""" # Bulk Fetch DagRuns with dag_id and execution_date same @@ -1426,6 +1498,7 @@ def _should_update_dag_next_dagruns( return False return True + @span def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" # added all() to save runtime, otherwise query is executed more than once @@ -1435,7 +1508,14 @@ def _start_queued_dagruns(self, session: Session) -> None: DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session), ) + @span def _update_state(dag: DAG, dag_run: DagRun): + __span = Trace.get_current_span() + __span.set_attribute("state", str(DagRunState.RUNNING)) + __span.set_attribute("run_id", dag_run.run_id) + __span.set_attribute("type", dag_run.run_type) + __span.set_attribute("dag_id", dag_run.dag_id) + dag_run.state = DagRunState.RUNNING dag_run.start_date = timezone.utcnow() if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1: @@ -1455,12 +1535,18 @@ def _update_state(dag: DAG, dag_run: DagRun): schedule_delay, tags={"dag_id": dag.dag_id}, ) + if __span.is_recording(): + __span.add_event( + name="schedule_delay", + attributes={"dag_id": dag.dag_id, "schedule_delay": str(schedule_delay)}, + ) # cache saves time during scheduling of many dag_runs for same dag cached_get_dag: Callable[[str], DAG | None] = lru_cache()( partial(self.dagbag.get_dag, session=session) ) + _span = Trace.get_current_span() for dag_run in dag_runs: dag = dag_run.dag = cached_get_dag(dag_run.dag_id) @@ -1477,6 +1563,15 @@ def _update_state(dag: DAG, dag_run: DagRun): dag_run.execution_date, ) else: + if _span.is_recording(): + _span.add_event( + name="dag_run", + attributes={ + "run_id": dag_run.run_id, + "dag_id": dag_run.dag_id, + "conf": str(dag_run.conf), + }, + ) active_runs_of_dags[dag_run.dag_id] += 1 _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed() @@ -1504,70 +1599,101 @@ def _schedule_dag_run( :param dag_run: The DagRun to schedule :return: Callback that needs to be executed """ - callback: DagCallbackRequest | None = None + trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True)) + span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, as_int=True)) + links = [{"trace_id": trace_id, "span_id": span_id}] + + with Trace.start_span( + span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links + ) as span: + span.set_attribute("dag_id", dag_run.dag_id) + span.set_attribute("run_id", dag_run.run_id) + span.set_attribute("run_type", dag_run.run_type) + callback: DagCallbackRequest | None = None + + dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) + dag_model = DM.get_dagmodel(dag_run.dag_id, session) + + if not dag or not dag_model: + self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id) + return callback + + if ( + dag_run.start_date + and dag.dagrun_timeout + and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout + ): + dag_run.set_state(DagRunState.FAILED) + unfinished_task_instances = session.scalars( + select(TI) + .where(TI.dag_id == dag_run.dag_id) + .where(TI.run_id == dag_run.run_id) + .where(TI.state.in_(State.unfinished)) + ) + for task_instance in unfinished_task_instances: + task_instance.state = TaskInstanceState.SKIPPED + session.merge(task_instance) + session.flush() + self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) + + if self._should_update_dag_next_dagruns( + dag, dag_model, last_dag_run=dag_run, session=session + ): + dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) + + callback_to_execute = DagCallbackRequest( + full_filepath=dag.fileloc, + dag_id=dag.dag_id, + run_id=dag_run.run_id, + is_failure_callback=True, + processor_subdir=dag_model.processor_subdir, + msg="timed_out", + ) - dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) - dag_model = DM.get_dagmodel(dag_run.dag_id, session) + dag_run.notify_dagrun_state_changed() + duration = dag_run.end_date - dag_run.start_date + Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration) + Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id}) + span.set_attribute("error", True) + if span.is_recording(): + span.add_event( + name="error", + attributes={ + "message": f"Run {dag_run.run_id} of {dag_run.dag_id} has timed-out", + "duration": str(duration), + }, + ) + return callback_to_execute - if not dag or not dag_model: - self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id) - return callback + if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates: + self.log.error("Execution date is in future: %s", dag_run.execution_date) + return callback - if ( - dag_run.start_date - and dag.dagrun_timeout - and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout - ): - dag_run.set_state(DagRunState.FAILED) - unfinished_task_instances = session.scalars( - select(TI) - .where(TI.dag_id == dag_run.dag_id) - .where(TI.run_id == dag_run.run_id) - .where(TI.state.in_(State.unfinished)) - ) - for task_instance in unfinished_task_instances: - task_instance.state = TaskInstanceState.SKIPPED - session.merge(task_instance) - session.flush() - self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) + if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session): + self.log.warning( + "The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id + ) + return callback + # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? + schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) + # This will do one query per dag run. We "could" build up a complex + # query to update all the TIs across all the execution dates and dag + # IDs in a single query, but it turns out that can be _very very slow_ + # see #11147/commit ee90807ac for more details + if span.is_recording(): + span.add_event( + name="schedule_tis", + attributes={ + "message": "dag_run scheduling its tis", + "schedulable_tis": [_ti.task_id for _ti in schedulable_tis], + }, + ) + dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) - callback_to_execute = DagCallbackRequest( - full_filepath=dag.fileloc, - dag_id=dag.dag_id, - run_id=dag_run.run_id, - is_failure_callback=True, - processor_subdir=dag_model.processor_subdir, - msg="timed_out", - ) - - dag_run.notify_dagrun_state_changed() - duration = dag_run.end_date - dag_run.start_date - Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration) - Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id}) - return callback_to_execute - - if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates: - self.log.error("Execution date is in future: %s", dag_run.execution_date) - return callback - - if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session): - self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id) - return callback - # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? - schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) - - if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): - dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) - # This will do one query per dag run. We "could" build up a complex - # query to update all the TIs across all the execution dates and dag - # IDs in a single query, but it turns out that can be _very very slow_ - # see #11147/commit ee90807ac for more details - dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) - - return callback_to_run + return callback_to_run def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool: """ @@ -1662,20 +1788,27 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None: from airflow.models.pool import Pool - pools = Pool.slots_stats(session=session) - for pool_name, slot_stats in pools.items(): - Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"]) - Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"]) - Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"]) - Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"]) - Stats.gauge(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"]) - - # Same metrics with tagging - Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name}) - Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name}) - Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name}) - Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name}) - Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], tags={"pool_name": pool_name}) + with Trace.start_span(span_name="emit_pool_metrics", component="SchedulerJobRunner") as span: + pools = Pool.slots_stats(session=session) + for pool_name, slot_stats in pools.items(): + Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"]) + Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"]) + Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"]) + Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"]) + Stats.gauge(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"]) + + # Same metrics with tagging + Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name}) + Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name}) + Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name}) + Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name}) + Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], tags={"pool_name": pool_name}) + + span.set_attribute("category", "scheduler") + span.set_attribute(f"pool.open_slots.{pool_name}", slot_stats["open"]) + span.set_attribute(f"pool.queued_slots.{pool_name}", slot_stats["queued"]) + span.set_attribute(f"pool.running_slots.{pool_name}", slot_stats["running"]) + span.set_attribute(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"]) @provide_session def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int: diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index e1736ae8e5baa..080323a1d1617 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -37,6 +37,7 @@ from airflow.jobs.job import perform_heartbeat from airflow.models.trigger import Trigger from airflow.stats import Stats +from airflow.traces.tracer import Trace, span from airflow.triggers.base import TriggerEvent from airflow.typing_compat import TypedDict from airflow.utils import timezone @@ -362,26 +363,43 @@ def _run_trigger_loop(self) -> None: if not self.trigger_runner.is_alive(): self.log.error("Trigger runner thread has died! Exiting.") break - # Clean out unused triggers - Trigger.clean_unused() - # Load/delete triggers - self.load_triggers() - # Handle events - self.handle_events() - # Handle failed triggers - self.handle_failed_triggers() - perform_heartbeat(self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True) - # Collect stats - self.emit_metrics() + with Trace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner") as span: + # Clean out unused triggers + if span.is_recording(): + span.add_event(name="Trigger.clean_unused") + Trigger.clean_unused() + # Load/delete triggers + if span.is_recording(): + span.add_event(name="load_triggers") + self.load_triggers() + # Handle events + if span.is_recording(): + span.add_event(name="handle_events") + self.handle_events() + # Handle failed triggers + if span.is_recording(): + span.add_event(name="handle_failed_triggers") + self.handle_failed_triggers() + if span.is_recording(): + span.add_event(name="perform_heartbeat") + perform_heartbeat( + self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True + ) + # Collect stats + if span.is_recording(): + span.add_event(name="emit_metrics") + self.emit_metrics() # Idle sleep time.sleep(1) + @span def load_triggers(self): """Query the database for the triggers we're supposed to be running and update the runner.""" Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold) ids = Trigger.ids_for_triggerer(self.job.id) self.trigger_runner.update_triggers(set(ids)) + @span def handle_events(self): """Dispatch outbound events to the Trigger model which pushes them to the relevant task instances.""" while self.trigger_runner.events: @@ -392,6 +410,7 @@ def handle_events(self): # Emit stat event Stats.incr("triggers.succeeded") + @span def handle_failed_triggers(self): """ Handle "failed" triggers. - ones that errored or exited before they sent an event. @@ -405,11 +424,15 @@ def handle_failed_triggers(self): # Emit stat event Stats.incr("triggers.failed") + @span def emit_metrics(self): Stats.gauge(f"triggers.running.{self.job.hostname}", len(self.trigger_runner.triggers)) Stats.gauge( "triggers.running", len(self.trigger_runner.triggers), tags={"hostname": self.job.hostname} ) + span = Trace.get_current_span() + span.set_attribute("trigger host", self.job.hostname) + span.set_attribute("triggers running", len(self.trigger_runner.triggers)) class TriggerDetails(TypedDict): diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 6c3d0715b9fbd..d4ef937e9d797 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -62,7 +62,9 @@ from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES +from airflow.traces.tracer import Trace from airflow.utils import timezone +from airflow.utils.dates import datetime_to_nano from airflow.utils.helpers import chunks, is_container, prune_dict from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -919,6 +921,37 @@ def recalculate(self) -> _UnfinishedStates: self.data_interval_end, self.dag_hash, ) + + with Trace.start_span_from_dagrun(dagrun=self) as span: + if self._state is DagRunState.FAILED: + span.set_attribute("error", True) + attributes = { + "category": "DAG runs", + "dag_id": str(self.dag_id), + "execution_date": str(self.execution_date), + "run_id": str(self.run_id), + "queued_at": str(self.queued_at), + "run_start_date": str(self.start_date), + "run_end_date": str(self.end_date), + "run_duration": str( + (self.end_date - self.start_date).total_seconds() + if self.start_date and self.end_date + else 0 + ), + "state": str(self._state), + "external_trigger": str(self.external_trigger), + "run_type": str(self.run_type), + "data_interval_start": str(self.data_interval_start), + "data_interval_end": str(self.data_interval_end), + "dag_hash": str(self.dag_hash), + "conf": str(self.conf), + } + if span.is_recording(): + span.add_event(name="queued", timestamp=datetime_to_nano(self.queued_at)) + span.add_event(name="started", timestamp=datetime_to_nano(self.start_date)) + span.add_event(name="ended", timestamp=datetime_to_nano(self.end_date)) + span.set_attributes(attributes) + session.flush() self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 1dacbe7525ded..27eb5c26c2fc4 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -108,6 +108,7 @@ from airflow.templates import SandboxedEnvironment from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS +from airflow.traces.tracer import Trace from airflow.utils import timezone from airflow.utils.context import ( ConnectionAccessor, @@ -1211,6 +1212,27 @@ def _handle_failure( if not test_mode: TaskInstance.save_to_db(failure_context["ti"], session) + with Trace.start_span_from_taskinstance(ti=task_instance) as span: + # ---- error info ---- + span.set_attribute("error", "true") + span.set_attribute("error_msg", str(error)) + span.set_attribute("context", context) + span.set_attribute("force_fail", force_fail) + # ---- common info ---- + span.set_attribute("category", "DAG runs") + span.set_attribute("task_id", task_instance.task_id) + span.set_attribute("dag_id", task_instance.dag_id) + span.set_attribute("state", task_instance.state) + span.set_attribute("start_date", str(task_instance.start_date)) + span.set_attribute("end_date", str(task_instance.end_date)) + span.set_attribute("duration", task_instance.duration) + span.set_attribute("executor_config", str(task_instance.executor_config)) + span.set_attribute("execution_date", str(task_instance.execution_date)) + span.set_attribute("hostname", task_instance.hostname) + if isinstance(task_instance, TaskInstance): + span.set_attribute("log_url", task_instance.log_url) + span.set_attribute("operator", str(task_instance.operator)) + def _refresh_from_task( *, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, pool_override: str | None = None diff --git a/airflow/traces/__init__.py b/airflow/traces/__init__.py index abe55b510359f..7b2f416872e98 100644 --- a/airflow/traces/__init__.py +++ b/airflow/traces/__init__.py @@ -18,3 +18,4 @@ TRACEPARENT = "traceparent" TRACESTATE = "tracestate" +NO_TRACE_ID = 1 diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py index 88999abe2bd14..1d58717287c73 100644 --- a/airflow/traces/tracer.py +++ b/airflow/traces/tracer.py @@ -96,6 +96,9 @@ def set_attributes(self, attributes) -> None: """Set multiple attributes at once.""" pass + def is_recording(self): + return False + def add_event( self, name: str, diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py index eaf3c1c0655d5..afab2591d5146 100644 --- a/airflow/traces/utils.py +++ b/airflow/traces/utils.py @@ -20,6 +20,7 @@ import logging from typing import TYPE_CHECKING +from airflow.traces import NO_TRACE_ID from airflow.utils.hashlib_wrapper import md5 from airflow.utils.state import TaskInstanceState @@ -40,9 +41,12 @@ def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> str def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int: + if dag_run.start_date is None: + return NO_TRACE_ID + """Generate trace id from DagRun.""" return _gen_id( - [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())], + [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())], as_int, ) @@ -50,7 +54,7 @@ def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int: def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> str | int: """Generate span id from TI key.""" return _gen_id( - [ti_key.dag_id, ti_key.run_id, ti_key.task_id, str(ti_key.try_number)], + [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id, str(ti_key.try_number)], as_int, SPAN_ID, ) @@ -58,8 +62,11 @@ def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> st def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int: """Generate dag's root span id using dag_run.""" + if dag_run.start_date is None: + return NO_TRACE_ID + return _gen_id( - [dag_run.dag_id, dag_run.run_id, str(dag_run.start_date.timestamp())], + [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())], as_int, SPAN_ID, ) diff --git a/scripts/ci/docker-compose/integration-otel.yml b/scripts/ci/docker-compose/integration-otel.yml index 6573709bc3ccb..7a635c17c7d22 100644 --- a/scripts/ci/docker-compose/integration-otel.yml +++ b/scripts/ci/docker-compose/integration-otel.yml @@ -54,7 +54,7 @@ services: - ./grafana/volume/provisioning:/grafana/provisioning jaeger: - image: jaegertracing/all-in-one + image: jaegertracing/all-in-one:1.57 container_name: "breeze-jaeger" environment: COLLECTOR_OTLP_ENABLED: true