diff --git a/airflow/operators/python.py b/airflow/operators/python.py index d2f8b7dee1710..4158db2895c58 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -400,6 +400,7 @@ def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Pat output_path = tmp_dir / "script.out" string_args_path = tmp_dir / "string_args.txt" script_path = tmp_dir / "script.py" + termination_log_path = tmp_dir / "termination.log" self._write_args(input_path) self._write_string_args(string_args_path) write_python_script( @@ -423,11 +424,17 @@ def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Pat os.fspath(input_path), os.fspath(output_path), os.fspath(string_args_path), + os.fspath(termination_log_path), ] ) except subprocess.CalledProcessError as e: if e.returncode in self.skip_on_exit_code: raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.") + elif termination_log_path.exists() and termination_log_path.stat().st_size > 0: + error_msg = f"Process returned non-zero exit status {e.returncode}.\n" + with open(termination_log_path) as file: + error_msg += file.read() + raise AirflowException(error_msg) from None else: raise diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2 index 60c96d34819df..7bbf6a953193b 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/airflow/utils/python_virtualenv_script.jinja2 @@ -49,7 +49,12 @@ with open(sys.argv[3], "r") as file: # Script {{ python_callable_source }} -res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"]) +try: + res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"]) +except Exception as e: + with open(sys.argv[4], "w") as file: + file.write(str(e)) + raise # Write output with open(sys.argv[2], "wb") as file: diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 2089ad0b79bf3..498be2eb526dc 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -681,6 +681,14 @@ def f(): with pytest.raises(CalledProcessError): self.run_as_task(f) + def test_fail_with_message(self): + def f(): + raise Exception("Custom error message") + + with pytest.raises(AirflowException) as e: + self.run_as_task(f) + assert "Custom error message" in str(e) + def test_string_args(self): def f(): global virtualenv_string_args