Skip to content

Commit f4542a3

Browse files
Fix waiting the base container when reading the logs of other containers (#33127)
* Revert "Revert "Fix waiting the base container when reading the logs of other containers (#33092)" (#33125)" This reverts commit 4fbbdbc. * patch sidecar methods
1 parent cfac7d3 commit f4542a3

2 files changed

Lines changed: 45 additions & 1 deletion

File tree

  • airflow/providers/cncf/kubernetes/operators
  • tests/providers/cncf/kubernetes/operators

airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,9 @@ def execute_sync(self, context: Context):
590590
container_logs=self.container_logs,
591591
follow_logs=True,
592592
)
593-
else:
593+
if not self.get_logs or (
594+
self.container_logs is not True and self.base_container_name not in self.container_logs
595+
):
594596
self.pod_manager.await_container_completion(
595597
pod=self.pod, container_name=self.base_container_name
596598
)

tests/providers/cncf/kubernetes/operators/test_pod.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,48 @@ def test_task_skip_when_pod_exit_with_certain_code(
12421242
with pytest.raises(expected_exc):
12431243
self.run_pod(k)
12441244

1245+
@patch(f"{POD_MANAGER_CLASS}.extract_xcom")
1246+
@patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
1247+
@patch(f"{POD_MANAGER_CLASS}.await_container_completion")
1248+
@patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs")
1249+
@patch(HOOK_CLASS)
1250+
def test_get_logs_but_not_for_base_container(
1251+
self,
1252+
hook_mock,
1253+
mock_fetch_log,
1254+
mock_await_container_completion,
1255+
mock_await_xcom_sidecar,
1256+
mock_extract_xcom,
1257+
):
1258+
hook_mock.return_value.get_xcom_sidecar_container_image.return_value = None
1259+
hook_mock.return_value.get_xcom_sidecar_container_resources.return_value = None
1260+
k = KubernetesPodOperator(
1261+
namespace="default",
1262+
image="ubuntu:16.04",
1263+
cmds=["bash", "-cx"],
1264+
arguments=["echo 10"],
1265+
labels={"foo": "bar"},
1266+
name="test",
1267+
task_id="task",
1268+
do_xcom_push=True,
1269+
container_logs=["some_init_container"],
1270+
get_logs=True,
1271+
)
1272+
mock_extract_xcom.return_value = "{}"
1273+
remote_pod_mock = MagicMock()
1274+
remote_pod_mock.status.phase = "Succeeded"
1275+
self.await_pod_mock.return_value = remote_pod_mock
1276+
pod = self.run_pod(k)
1277+
1278+
# check that the base container is not included in the logs
1279+
mock_fetch_log.assert_called_once_with(
1280+
pod=pod, container_logs=["some_init_container"], follow_logs=True
1281+
)
1282+
# check that KPO waits for the base container to complete before proceeding to extract XCom
1283+
mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base")
1284+
# check that we wait for the xcom sidecar to start before extracting XCom
1285+
mock_await_xcom_sidecar.assert_called_once_with(pod=pod)
1286+
12451287

12461288
class TestSuppress:
12471289
def test__suppress(self, caplog):

0 commit comments

Comments
 (0)