Skip to content

Commit d5b6d44

Browse files
ashbvlieven
authored andcommitted
Split providers out of the main "airflow/" tree into a UV workspace project (#42505)
This is only a partial split so far. It moves all the code and tests, but leaves the creation of `core/` to a separate PR as this is already large enough. In addition to the straight file rename the other changes I had to make here are: - Some mypy/typing fixes. Mypy can be fragile about what it picks up when, so maybe some of those changes were caused by that. But the typing changes aren't large. - Improve typing in common.sql type stub Again, likely a mypy file oddity, but the types should be safe - Removed the `check-providers-init-file-missing` check This isn't needed now that airflow/providers shouldn't exist at all in the main tree. - Create a "dev.tests_common" package that contains helper files and common pytest fixtures Since the provider tests are no longer under tests/ they don't automatically share the fixtures from the parent `tests/conftest.py` so they needed extracted. Ditto for `tests.test_utils` -- they can't be easily imported in provider tests anymore, so they are moved to a more explicit shared location. In future we should switch how the CI image is built to make better use of UV caching than our own approach as that would remvoe a lot of custom code.
1 parent efcb554 commit d5b6d44

3 files changed

Lines changed: 16 additions & 18 deletions

File tree

providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ def _change_state(
482482
self.running.remove(key)
483483
except KeyError:
484484
self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
485+
return
485486

486487
# If we don't have a TI state, look it up from the db. event_buffer expects the TI state
487488
if state is None:

providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,6 @@ def process_status(
273273
(pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version)
274274
)
275275
elif status == "Succeeded":
276-
# We get multiple events once the pod hits a terminal state, and we only want to
277-
# send it along to the scheduler once.
278-
# If our event type is DELETED, or the pod has a deletion timestamp, we've already
279-
# seen the initial Succeeded event and sent it along to the scheduler.
280-
if event["type"] == "DELETED" or pod.metadata.deletion_timestamp:
281-
self.log.info(
282-
"Skipping event for Succeeded pod %s - event for this pod already sent to executor",
283-
pod_name,
284-
)
285-
return
286276
self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string)
287277
self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version))
288278
elif status == "Running":

providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,21 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_
784784
finally:
785785
executor.end()
786786

787+
@pytest.mark.db_test
788+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
789+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
790+
def test_change_state_key_not_in_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
791+
executor = self.kubernetes_executor
792+
executor.start()
793+
try:
794+
key = ("dag_id", "task_id", "run_id", "try_number1")
795+
executor.running = set()
796+
executor._change_state(key, State.SUCCESS, "pod_name", "default")
797+
assert executor.event_buffer.get(key) is None
798+
assert executor.running == set()
799+
finally:
800+
executor.end()
801+
787802
@pytest.mark.db_test
788803
@pytest.mark.parametrize(
789804
"multi_namespace_mode_namespace_list, watchers_keys",
@@ -1858,14 +1873,6 @@ def test_process_status_succeeded(self):
18581873
# We don't know the TI state, so we send in None
18591874
self.assert_watcher_queue_called_once_with_state(None)
18601875

1861-
def test_process_status_succeeded_dedup_timestamp(self):
1862-
self.pod.status.phase = "Succeeded"
1863-
self.pod.metadata.deletion_timestamp = timezone.utcnow()
1864-
self.events.append({"type": "MODIFIED", "object": self.pod})
1865-
1866-
self._run()
1867-
self.watcher.watcher_queue.put.assert_not_called()
1868-
18691876
@pytest.mark.parametrize(
18701877
"ti_state",
18711878
[

0 commit comments

Comments
 (0)