Skip to content

Commit 6b92145

Browse files
authored
Send important executor logs to task logs in AwsBatchExecutor (#40698)
1 parent 97b88fd commit 6b92145

2 files changed

Lines changed: 42 additions & 40 deletions

File tree

airflow/providers/amazon/aws/executors/batch/batch_executor.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
from __future__ import annotations
2121

2222
import contextlib
23+
import logging
2324
import time
24-
from collections import defaultdict, deque
25+
from collections import deque
2526
from copy import deepcopy
2627
from typing import TYPE_CHECKING, Any, Dict, List, Sequence
2728

@@ -264,15 +265,14 @@ def attempt_submit_jobs(self):
264265
in the next iteration of the sync() method, unless it has exceeded the maximum number of
265266
attempts. If a job exceeds the maximum number of attempts, it is removed from the queue.
266267
"""
267-
failure_reasons = defaultdict(int)
268268
for _ in range(len(self.pending_jobs)):
269269
batch_job = self.pending_jobs.popleft()
270270
key = batch_job.key
271271
cmd = batch_job.command
272272
queue = batch_job.queue
273273
exec_config = batch_job.executor_config
274274
attempt_number = batch_job.attempt_number
275-
_failure_reason = []
275+
failure_reason: str | None = None
276276
if timezone.utcnow() < batch_job.next_attempt_time:
277277
self.pending_jobs.append(batch_job)
278278
continue
@@ -286,18 +286,18 @@ def attempt_submit_jobs(self):
286286
if error_code in INVALID_CREDENTIALS_EXCEPTIONS:
287287
self.pending_jobs.append(batch_job)
288288
raise
289-
_failure_reason.append(str(e))
289+
failure_reason = str(e)
290290
except Exception as e:
291-
_failure_reason.append(str(e))
292-
293-
if _failure_reason:
294-
for reason in _failure_reason:
295-
failure_reasons[reason] += 1
291+
failure_reason = str(e)
296292

293+
if failure_reason:
297294
if attempt_number >= int(self.__class__.MAX_SUBMIT_JOB_ATTEMPTS):
298-
self.log.error(
299-
"This job has been unsuccessfully attempted too many times (%s). Dropping the task.",
295+
self.send_message_to_task_logs(
296+
logging.ERROR,
297+
"This job has been unsuccessfully attempted too many times (%s). Dropping the task. Reason: %s",
300298
attempt_number,
299+
failure_reason,
300+
ti=key,
301301
)
302302
self.fail(key=key)
303303
else:
@@ -322,11 +322,6 @@ def attempt_submit_jobs(self):
322322
# running_state is added in Airflow 2.10 and only needed to support task adoption
323323
# (an optional executor feature).
324324
self.running_state(key, job_id)
325-
if failure_reasons:
326-
self.log.error(
327-
"Pending Batch jobs failed to launch for the following reasons: %s. Retrying later.",
328-
dict(failure_reasons),
329-
)
330325

331326
def _describe_jobs(self, job_ids) -> list[BatchJob]:
332327
all_jobs = []
@@ -462,3 +457,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
462457

463458
not_adopted_tis = [ti for ti in tis if ti not in adopted_tis]
464459
return not_adopted_tis
460+
461+
def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey):
462+
# TODO: remove this method when min_airflow_version is set to higher than 2.10.0
463+
try:
464+
super().send_message_to_task_logs(level, msg, *args, ti=ti)
465+
except AttributeError:
466+
# ``send_message_to_task_logs`` is added in 2.10.0
467+
self.log.error(msg, *args)

tests/providers/amazon/aws/executors/batch/test_batch_executor.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import logging
2222
import os
2323
from unittest import mock
24+
from unittest.mock import call
2425

2526
import pytest
2627
import yaml
@@ -194,8 +195,9 @@ def test_execute(self, mock_executor):
194195
mock_executor.batch.submit_job.assert_called_once()
195196
assert len(mock_executor.active_workers) == 1
196197

198+
@mock.patch.object(AwsBatchExecutor, "send_message_to_task_logs")
197199
@mock.patch.object(batch_executor, "calculate_next_attempt_delay", return_value=dt.timedelta(seconds=0))
198-
def test_attempt_all_jobs_when_some_jobs_fail(self, _, mock_executor, caplog):
200+
def test_attempt_all_jobs_when_some_jobs_fail(self, _, mock_send_message_to_task_logs, mock_executor):
199201
"""
200202
Test how jobs are tried when one job fails, but others pass.
201203
@@ -206,7 +208,6 @@ def test_attempt_all_jobs_when_some_jobs_fail(self, _, mock_executor, caplog):
206208
airflow_key = mock.Mock(spec=tuple)
207209
airflow_cmd1 = mock.Mock(spec=list)
208210
airflow_cmd2 = mock.Mock(spec=list)
209-
caplog.set_level("ERROR")
210211
airflow_commands = [airflow_cmd1, airflow_cmd2]
211212
responses = [Exception("Failure 1"), {"jobId": "job-2"}]
212213

@@ -229,13 +230,10 @@ def test_attempt_all_jobs_when_some_jobs_fail(self, _, mock_executor, caplog):
229230
for i in range(2):
230231
submit_job_args["containerOverrides"]["command"] = airflow_commands[i]
231232
assert mock_executor.batch.submit_job.call_args_list[i].kwargs == submit_job_args
232-
assert "Pending Batch jobs failed to launch for the following reasons" in caplog.messages[0]
233233
assert len(mock_executor.pending_jobs) == 1
234234
mock_executor.pending_jobs[0].command == airflow_cmd1
235235
assert len(mock_executor.active_workers.get_all_jobs()) == 1
236236

237-
caplog.clear()
238-
239237
# Add more tasks to pending_jobs. This simulates tasks being scheduled by Airflow
240238
airflow_cmd3 = mock.Mock(spec=list)
241239
airflow_cmd4 = mock.Mock(spec=list)
@@ -252,26 +250,27 @@ def test_attempt_all_jobs_when_some_jobs_fail(self, _, mock_executor, caplog):
252250
for i in range(2, 5):
253251
submit_job_args["containerOverrides"]["command"] = airflow_commands[i]
254252
assert mock_executor.batch.submit_job.call_args_list[i].kwargs == submit_job_args
255-
assert "Pending Batch jobs failed to launch for the following reasons" in caplog.messages[0]
256253
assert len(mock_executor.pending_jobs) == 1
257254
mock_executor.pending_jobs[0].command == airflow_cmd1
258255
assert len(mock_executor.active_workers.get_all_jobs()) == 3
259256

260-
caplog.clear()
261-
262257
airflow_commands.append(airflow_cmd1)
263258
responses.append(Exception("Failure 1"))
264259

265260
mock_executor.attempt_submit_jobs()
266261
submit_job_args["containerOverrides"]["command"] = airflow_commands[0]
267262
assert mock_executor.batch.submit_job.call_args_list[5].kwargs == submit_job_args
268-
assert (
269-
"This job has been unsuccessfully attempted too many times (3). Dropping the task."
270-
== caplog.messages[0]
263+
mock_send_message_to_task_logs.assert_called_once_with(
264+
logging.ERROR,
265+
"This job has been unsuccessfully attempted too many times (%s). Dropping the task. Reason: %s",
266+
3,
267+
"Failure 1",
268+
ti=airflow_key,
271269
)
272270

271+
@mock.patch.object(AwsBatchExecutor, "send_message_to_task_logs")
273272
@mock.patch.object(batch_executor, "calculate_next_attempt_delay", return_value=dt.timedelta(seconds=0))
274-
def test_attempt_all_jobs_when_jobs_fail(self, _, mock_executor, caplog):
273+
def test_attempt_all_jobs_when_jobs_fail(self, _, mock_send_message_to_task_logs, mock_executor):
275274
"""
276275
Test job retry behaviour when jobs fail validation.
277276
@@ -282,7 +281,6 @@ def test_attempt_all_jobs_when_jobs_fail(self, _, mock_executor, caplog):
282281
airflow_key = mock.Mock(spec=tuple)
283282
airflow_cmd1 = mock.Mock(spec=list)
284283
airflow_cmd2 = mock.Mock(spec=list)
285-
caplog.set_level("ERROR")
286284
commands = [airflow_cmd1, airflow_cmd2]
287285
failures = [Exception("Failure 1"), Exception("Failure 2")]
288286
submit_job_args = {
@@ -304,29 +302,29 @@ def test_attempt_all_jobs_when_jobs_fail(self, _, mock_executor, caplog):
304302
for i in range(2):
305303
submit_job_args["containerOverrides"]["command"] = commands[i]
306304
assert mock_executor.batch.submit_job.call_args_list[i].kwargs == submit_job_args
307-
assert "Pending Batch jobs failed to launch for the following reasons" in caplog.messages[0]
308305
assert len(mock_executor.pending_jobs) == 2
309306

310-
caplog.clear()
311-
312307
mock_executor.batch.submit_job.side_effect = failures
313308
mock_executor.attempt_submit_jobs()
314309
for i in range(2):
315310
submit_job_args["containerOverrides"]["command"] = commands[i]
316311
assert mock_executor.batch.submit_job.call_args_list[i].kwargs == submit_job_args
317-
assert "Pending Batch jobs failed to launch for the following reasons" in caplog.messages[0]
318312
assert len(mock_executor.pending_jobs) == 2
319313

320-
caplog.clear()
321-
322314
mock_executor.batch.submit_job.side_effect = failures
323315
mock_executor.attempt_submit_jobs()
324-
assert len(caplog.messages) == 3
316+
calls = []
325317
for i in range(2):
326-
assert (
327-
"This job has been unsuccessfully attempted too many times (3). Dropping the task."
328-
== caplog.messages[i]
318+
calls.append(
319+
call(
320+
logging.ERROR,
321+
"This job has been unsuccessfully attempted too many times (%s). Dropping the task. Reason: %s",
322+
3,
323+
f"Failure {i + 1}",
324+
ti=airflow_key,
325+
)
329326
)
327+
mock_send_message_to_task_logs.assert_has_calls(calls)
330328

331329
def test_attempt_submit_jobs_failure(self, mock_executor):
332330
mock_executor.batch.submit_job.side_effect = NoCredentialsError()
@@ -467,8 +465,9 @@ def test_sync(self, success_mock, fail_mock, mock_airflow_key, mock_executor):
467465

468466
@mock.patch.object(BaseExecutor, "fail")
469467
@mock.patch.object(BaseExecutor, "success")
468+
@mock.patch.object(AwsBatchExecutor, "send_message_to_task_logs")
470469
@mock.patch.object(batch_executor, "calculate_next_attempt_delay", return_value=dt.timedelta(seconds=0))
471-
def test_failed_sync(self, _, success_mock, fail_mock, mock_airflow_key, mock_executor):
470+
def test_failed_sync(self, _, _2, success_mock, fail_mock, mock_airflow_key, mock_executor):
472471
"""Test failure states"""
473472
self._mock_sync(
474473
executor=mock_executor, airflow_key=mock_airflow_key(), status="FAILED", attempt_number=2

0 commit comments

Comments
 (0)