Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
e1f721d
trace and span creation with context propagation
xBis7 Nov 12, 2024
fdb92df
cleanup
xBis7 Nov 12, 2024
4b3c93e
revert the metrics name length change
xBis7 Nov 14, 2024
939511b
cleanup
xBis7 Nov 15, 2024
bae8bca
move the active span dictionaries to the scheduler class
xBis7 Nov 16, 2024
c841f17
fix variable name + comment
xBis7 Nov 16, 2024
0266f27
initial impl: handling scheduler ha
xBis7 Nov 26, 2024
9af9c6b
fix span timings
xBis7 Nov 26, 2024
526d50e
refactor test_otel.py
xBis7 Nov 26, 2024
07076ee
handle scheduler forceful exit
xBis7 Dec 2, 2024
cf290d6
cleanup old dagrun and ti spans
xBis7 Dec 2, 2024
691865b
remove CTX_PROP_SUFFIX
xBis7 Dec 2, 2024
87d6efb
cleaning up comments
xBis7 Dec 2, 2024
8329733
remove context propagation config flag
xBis7 Dec 2, 2024
2af6df5
resolve conflicts and finish merging with main
xBis7 Dec 2, 2024
2ad643b
add attributes while cleaning up ended spans
xBis7 Dec 2, 2024
03e1383
fix integration and system test failures
xBis7 Dec 3, 2024
2042334
fix unit test failures
xBis7 Dec 4, 2024
5178e14
cleanup
xBis7 Dec 4, 2024
8f8d212
Merge remote-tracking branch 'origin/main' into ctx_prop_final
xBis7 Dec 4, 2024
ea3fbc1
fix sqlite migration failure
xBis7 Dec 4, 2024
ce0c356
Merge branch 'main' into ctx_prop_final
xBis7 Dec 4, 2024
e3be7e3
task output not captured by tests - fixed
xBis7 Dec 13, 2024
975f533
cleanup inaccurate comment
xBis7 Dec 13, 2024
e7c4cfd
use one common dictionary for active spans
xBis7 Dec 13, 2024
48928f9
rephrase comment about otel spans
xBis7 Dec 13, 2024
3943ffc
scheduler cleanup
xBis7 Dec 13, 2024
7f122f6
scheduler unit tests
xBis7 Dec 15, 2024
8511bd7
revert changes in old migration file
xBis7 Dec 15, 2024
08de46f
move passing reference to active_spans dict out of the scheduler loop
xBis7 Dec 15, 2024
ec32152
make set_dagrun_span_attrs not a static method
xBis7 Dec 15, 2024
5a234be
remove set methods from dagrun and ti
xBis7 Dec 15, 2024
54161f9
finish merging with main
xBis7 Dec 16, 2024
b3f544a
add a migration file
xBis7 Dec 16, 2024
0c5e45d
fix failing tests
xBis7 Dec 16, 2024
db481a1
fix discrepancies between model and migration file
xBis7 Dec 16, 2024
6744115
fix include_dag_run on ti refresh_from_db
xBis7 Dec 17, 2024
8434b02
fix issue with recreated spans + fix integration tests
xBis7 Dec 18, 2024
e96806b
resolve conflicts with main
xBis7 Dec 18, 2024
b562d87
trigger er diagram generation
xBis7 Dec 18, 2024
1d19534
trigger er diagram generation
xBis7 Dec 18, 2024
67e85af
refactor dagrun to improve readability
xBis7 Dec 18, 2024
82c159b
unit tests for dag_run span changes
xBis7 Dec 18, 2024
537dec0
fix refresh issue with loading dag_run when task_instance isn't bound…
xBis7 Dec 18, 2024
48374d5
resolve merge conflicts + new migration file
xBis7 Dec 30, 2024
11d5a6c
fix test_scheduler_job failures after merge
xBis7 Dec 31, 2024
09ed2e9
fix test_dagrun failures after merge
xBis7 Dec 31, 2024
cbfd68a
resolve conflicts after merge with main
xBis7 Jan 15, 2025
c5a71b6
fix test failures after merge
xBis7 Jan 15, 2025
6d5d361
fix test_exceptions.py
xBis7 Jan 15, 2025
cdc760d
merge with main + resolve conflicts
xBis7 Jan 17, 2025
cb96bf7
trigger ER diagram creation
xBis7 Jan 17, 2025
86b3cc4
trigger ER diagram creation
xBis7 Jan 17, 2025
4004ca5
trigger ER diagram creation
xBis7 Jan 17, 2025
261807d
merge with main and resolve conflicts
xBis7 Feb 10, 2025
d86a7bf
fix failures after merge with main
xBis7 Feb 11, 2025
cdaf4e8
merge with main
xBis7 Feb 11, 2025
8ce2f7c
finish merge with main
xBis7 Mar 9, 2025
4d74c93
fix mypy error
xBis7 Mar 9, 2025
f7af6f9
fix more mypy errors
xBis7 Mar 9, 2025
ed1f566
remove leftover dagrun field external_trigger
xBis7 Mar 9, 2025
4a99dd7
fix test failures after merge
xBis7 Mar 9, 2025
8b90db6
set the dr context on the workload ti
xBis7 Mar 12, 2025
1c80870
resolve conflicts after merge with main
xBis7 Mar 19, 2025
bbb3705
remove given/when/then comments from tests
xBis7 Mar 19, 2025
ef7969b
increase length for metric names
xBis7 Mar 19, 2025
4dd237a
cleanup
xBis7 Mar 19, 2025
041bf84
assign active_ti_span in place
xBis7 Mar 19, 2025
5642e5c
update otel processor init comments
xBis7 Mar 19, 2025
eaada53
test for running just a dag to make sure it succeeds in an integratio…
xBis7 Mar 21, 2025
4356819
remove redundant logs
xBis7 Mar 22, 2025
6e1f997
replace pendulum with timezone.utcnow() in otel_tracer.py
xBis7 Mar 22, 2025
f1b4fdf
replace PythonOperator with TaskFlow in test dags
xBis7 Mar 22, 2025
1a9c08d
replace time.time() with timezone.utcnow().timestamp()
xBis7 Mar 22, 2025
db5c49c
add ids to parameterized tests
xBis7 Mar 22, 2025
774bc82
merge with main and resolve conflicts
xBis7 Mar 22, 2025
1972540
restructure tests according to new directory structure
xBis7 Mar 22, 2025
90b3737
fix test_otel_logger.py
xBis7 Mar 22, 2025
fcf5233
test setup fixed, all pass with actual otel
xBis7 Mar 29, 2025
4857709
test_otel.py - print ti output fixed
xBis7 Mar 30, 2025
036f648
fixed tests with pause freezing due to capfd and flakiness due to cpu…
xBis7 Mar 30, 2025
3e04a1f
process new format of task line output
xBis7 Apr 1, 2025
91faa3f
fix bug with task output processing
xBis7 Apr 1, 2025
66dd719
fix otel assertion errors caused by airflow 3 changes
xBis7 Apr 2, 2025
53d30fc
add a deadline while waiting for the control_file
xBis7 Apr 2, 2025
12a4376
base_executor cleanup
xBis7 Apr 2, 2025
14c6306
merge with main and resolve conflicts
xBis7 Apr 2, 2025
8324022
merge with main and resolve conflicts
xBis7 Apr 2, 2025
f36c2e3
trigger diagram generation
xBis7 Apr 2, 2025
c18f4bf
trigger diagram generation
xBis7 Apr 2, 2025
6f03891
trigger diagram generation
xBis7 Apr 2, 2025
4c62c3c
fix failing ci test
xBis7 Apr 2, 2025
eea16a4
modify the initial file for datamoels generation
xBis7 Apr 3, 2025
81a3c1d
Apply suggestions from code review
ashb Apr 3, 2025
7de85bd
Update airflow-core/src/airflow/jobs/scheduler_job_runner.py
ashb Apr 3, 2025
a5dd674
Update airflow-core/src/airflow/migrations/versions/0065_3_0_0_add_ne…
ashb Apr 3, 2025
b3b29c9
Static fixes
ashb Apr 3, 2025
b234572
Static fixes.
ashb Apr 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
040d02de3ade319dccca8e797c88a2d118b3f231e293fbac9b7cd22c40dee94b
6d34a69fdd22fb001604f5fdbc5d456cf7c25d5657536a74df8d0ac8ac63bd9a
1,125 changes: 578 additions & 547 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``be2cc2f742cf`` (head) | ``d469d27e2a64`` | ``3.0.0`` | Support bundles in DagPriorityParsingRequest. |
| ``ec62e120484d`` (head) | ``be2cc2f742cf`` | ``3.0.0`` | Add new otel span fields. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``be2cc2f742cf`` | ``d469d27e2a64`` | ``3.0.0`` | Support bundles in DagPriorityParsingRequest. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``d469d27e2a64`` | ``16f7f5ee874e`` | ``3.0.0`` | Use ti_id as FK to TaskReschedule. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class TaskInstance(StrictBaseModel):
try_number: int
map_index: int = -1
hostname: str | None = None
context_carrier: dict | None = None


class DagRun(StrictBaseModel):
Expand Down
45 changes: 41 additions & 4 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
from airflow.utils.thread_safe_dict import ThreadSafeDict

PARALLELISM: int = conf.getint("core", "PARALLELISM")

Expand Down Expand Up @@ -116,6 +117,8 @@ class BaseExecutor(LoggingMixin):
:param parallelism: how many jobs should run at one time.
"""

active_spans = ThreadSafeDict()

supports_ad_hoc_ti_run: bool = False
supports_sentry: bool = False

Expand Down Expand Up @@ -178,6 +181,10 @@ def __init__(self, parallelism: int = PARALLELISM, team_id: str | None = None):
def __repr__(self):
return f"{self.__class__.__name__}(parallelism={self.parallelism})"

@classmethod
def set_active_spans(cls, active_spans: ThreadSafeDict):
cls.active_spans = active_spans

def start(self): # pragma: no cover
"""Executors may need to get things started."""

Expand Down Expand Up @@ -375,7 +382,7 @@ def trigger_tasks(self, open_slots: int) -> None:
"""
sorted_queue = self.order_queued_tasks_by_priority()
task_tuples = []
workloads = []
workload_list = []

for _ in range(min((open_slots, len(self.queued_tasks)))):
key, item = sorted_queue.pop(0)
Expand Down Expand Up @@ -419,16 +426,46 @@ def trigger_tasks(self, open_slots: int) -> None:
# TODO: TaskSDK: Compat, remove when KubeExecutor is fully moved over to TaskSDK too.
# TODO: TaskSDK: We need to minimum version requirements on executors with Airflow 3.
# How/where do we do that? Executor loader?
from airflow.executors import workloads

if isinstance(item, workloads.ExecuteTask) and hasattr(item, "ti"):
ti = item.ti

# If it's None, then the span for the current TaskInstanceKey hasn't been started.
if self.active_spans is not None and self.active_spans.get(key) is None:
from airflow.models.taskinstance import SimpleTaskInstance

if isinstance(ti, SimpleTaskInstance):
parent_context = Trace.extract(ti.parent_context_carrier)
elif isinstance(ti, workloads.TaskInstance):
parent_context = Trace.extract(ti.parent_context_carrier)
else:
parent_context = Trace.extract(ti.dag_run.context_carrier)
# Start a new span using the context from the parent.
# Attributes will be set once the task has finished so that all
# values will be available (end_time, duration, etc.).

span = Trace.start_child_span(
span_name=f"{ti.task_id}",
parent_context=parent_context,
component="task",
start_as_current=False,
)
self.active_spans.set(key, span)
# Inject the current context into the carrier.
carrier = Trace.inject()
ti.context_carrier = carrier

if hasattr(self, "_process_workloads"):
workloads.append(item)
workload_list.append(item)
else:
(command, _, queue, ti) = item
task_tuples.append((key, command, queue, getattr(ti, "executor_config", None)))

if task_tuples:
self._process_tasks(task_tuples)
elif workloads:
self._process_workloads(workloads) # type: ignore[attr-defined]
elif workload_list:
self._process_workloads(workload_list) # type: ignore[attr-defined]

@add_span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
Expand Down
5 changes: 5 additions & 0 deletions airflow-core/src/airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class TaskInstance(BaseModel):
priority_weight: int
executor_config: dict | None = Field(default=None, exclude=True)

parent_context_carrier: dict | None = None
context_carrier: dict | None = None
queued_dttm: datetime | None = None

# TODO: Task-SDK: Can we replace TastInstanceKey with just the uuid across the codebase?
@property
def key(self) -> TaskInstanceKey:
Expand Down Expand Up @@ -105,6 +109,7 @@ def make(
from airflow.utils.helpers import log_filename_template_renderer

ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
ser_ti.parent_context_carrier = ti.dag_run.context_carrier
bundle_info = BundleInfo(
name=ti.dag_model.bundle_name,
version=ti.dag_run.bundle_version,
Expand Down
Loading
Loading