diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 55d74c0962078..0c29714e56228 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -56,6 +56,10 @@ "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" ) +DAG_PROCESSOR_MANAGER_LOG_STDOUT: str = conf.get_mandatory_value( + "logging", "DAG_PROCESSOR_MANAGER_LOG_STDOUT" +) + # FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 # All of these handlers inherited from FileTaskHandler and providing any value rather than None # would raise deprecation warning. @@ -171,6 +175,19 @@ }, } +if DAG_PROCESSOR_MANAGER_LOG_STDOUT == "True": + DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"].update( + { + "console": { + "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", + "formatter": "airflow", + "stream": "sys.stdout", + "filters": ["mask_secrets"], + } + } + ) + DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]["airflow.processor_manager"]["handlers"].append("console") + # Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. # This is to avoid exceptions when initializing RotatingFileHandler multiple times # in multiple processes. diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 07deb7c1d73d6..4db178cd8f1fb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -857,6 +857,13 @@ logging: type: string example: ~ default: "{AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log" + dag_processor_manager_log_stdout: + description: | + Whether DAG processor manager will write logs to stdout + version_added: 2.9.0 + type: boolean + example: ~ + default: "False" task_log_reader: description: | Name of handler to read task instance logs. diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 33d45dfb1272f..494765d57901f 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -1632,3 +1632,37 @@ def test_heartbeat_manager_end_no_process(self): processor_agent.end() processor_agent.log.warning.assert_called_with("Ending without manager process.") processor_agent._process.join.assert_not_called() + + @conf_vars({("logging", "dag_processor_manager_log_stdout"): "True"}) + def test_log_to_stdout(self, capfd): + test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" + async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") + + # Starting dag processing with 0 max_runs to avoid redundant operations. + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent.start() + if not async_mode: + processor_agent.run_single_parsing_loop() + + processor_agent._process.join() + + # Capture the stdout and stderr + out, _ = capfd.readouterr() + assert "DAG File Processing Stats" in out + + @conf_vars({("logging", "dag_processor_manager_log_stdout"): "False"}) + def test_not_log_to_stdout(self, capfd): + test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" + async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") + + # Starting dag processing with 0 max_runs to avoid redundant operations. + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent.start() + if not async_mode: + processor_agent.run_single_parsing_loop() + + processor_agent._process.join() + + # Capture the stdout and stderr + out, _ = capfd.readouterr() + assert "DAG File Processing Stats" not in out