From 8c93311a0c2a52f1f46f48b30b37e317c85fad63 Mon Sep 17 00:00:00 2001 From: Venkat VJ Date: Thu, 17 Oct 2024 15:46:36 -0700 Subject: [PATCH] Fix broken stat scheduler_loop_duration (#42886) * wip * wip * fix lint err --------- Co-authored-by: venkat (cherry picked from commit 60b8056616e94f987c3096dff3b59211d649b4b0) --- airflow/jobs/scheduler_job_runner.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index dd8ff4377f592..4f8e900df13b4 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1105,16 +1105,18 @@ def _run_scheduler_loop(self) -> None: ) for loop_count in itertools.count(start=1): - with Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span: + with Trace.start_span( + span_name="scheduler_job_loop", component="SchedulerJobRunner" + ) as span, Stats.timer("scheduler.scheduler_loop_duration") as timer: span.set_attribute("category", "scheduler") span.set_attribute("loop_count", loop_count) - with Stats.timer("scheduler.scheduler_loop_duration") as timer: - if self.using_sqlite and self.processor_agent: - self.processor_agent.run_single_parsing_loop() - # For the sqlite case w/ 1 thread, wait until the processor - # is finished to avoid concurrent access to the DB. - self.log.debug("Waiting for processors to finish since we're using sqlite") - self.processor_agent.wait_until_finished() + + if self.using_sqlite and self.processor_agent: + self.processor_agent.run_single_parsing_loop() + # For the sqlite case w/ 1 thread, wait until the processor + # is finished to avoid concurrent access to the DB. + self.log.debug("Waiting for processors to finish since we're using sqlite") + self.processor_agent.wait_until_finished() with create_session() as session: # This will schedule for as many executors as possible.