Skip to content

Commit c10d9c8

Browse files
committed
Handle Serialized DAG Format from v3 to v2 when downgrading Airflow
This migration enables Airflow downgrades by converting v3 serialized DAGs back to v2 format. The `upgrade()` is a no-op since the server handles v1/v2/v3 at runtime, but `downgrade()` removes client_defaults sections and updates version numbers to ensure compatibility with older Airflow versions. closes #55949
1 parent 7b562ba commit c10d9c8

5 files changed

Lines changed: 184 additions & 5 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ebc15a6ad6529f10a59633d8adff009dd3b526e6a97f1b578436c275b98e177b
1+
7863cac09b55cd47e8949ace5abb8bc308aa38eab48dc14cf38c5b707713896a

airflow-core/docs/migrations-ref.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
3939
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4040
| Revision ID | Revises ID | Airflow Version | Description |
4141
+=========================+==================+===================+==============================================================+
42-
| ``15d84ca19038`` (head) | ``eaf332f43c7c`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. |
42+
| ``15d84ca19038`` (head) | ``cc92b33c6709`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. |
43+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
44+
| ``cc92b33c6709`` | ``eaf332f43c7c`` | ``3.1.0`` | Add backward compatibility for serialized DAG format v3 to |
45+
| | | | v2. |
4346
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4447
| ``eaf332f43c7c`` | ``a3c7f2b18d4e`` | ``3.1.0`` | add last_parse_duration to dag model. |
4548
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Add backward compatibility for serialized DAG format v3 to v2.
21+
22+
Revision ID: cc92b33c6709
23+
Revises: eaf332f43c7c
24+
Create Date: 2025-09-22 22:50:48.035121
25+
26+
"""
27+
28+
from __future__ import annotations
29+
30+
import sqlalchemy as sa
31+
from alembic import op
32+
33+
# revision identifiers, used by Alembic.
34+
revision = "cc92b33c6709"
35+
down_revision = "eaf332f43c7c"
36+
branch_labels = None
37+
depends_on = None
38+
airflow_version = "3.1.0"
39+
40+
41+
def upgrade():
42+
"""Apply Downgrade Serialized Dag version to v2."""
43+
# No-op: Server handles v1/v2/v3 DAGs at runtime via conversion functions
44+
pass
45+
46+
47+
def downgrade():
48+
"""Convert v3 serialized DAGs back to v2 format for compatibility with older Airflow versions."""
49+
import gzip
50+
import json
51+
52+
connection = op.get_bind()
53+
dialect = connection.dialect.name
54+
55+
if dialect == "postgresql":
56+
connection.execute(
57+
sa.text("""
58+
UPDATE serialized_dag
59+
SET data = jsonb_set(
60+
CASE
61+
WHEN data ? 'client_defaults' THEN data - 'client_defaults'
62+
ELSE data
63+
END,
64+
'{__version}',
65+
'2'::jsonb
66+
)
67+
WHERE data->>'__version' = '3'
68+
AND data_compressed IS NULL
69+
""")
70+
)
71+
elif dialect == "mysql":
72+
connection.execute(
73+
sa.text("""
74+
UPDATE serialized_dag
75+
SET data = JSON_SET(
76+
JSON_REMOVE(data, '$.client_defaults'),
77+
'$.__version',
78+
2
79+
)
80+
WHERE JSON_EXTRACT(data, '$.__version') = '3'
81+
AND data_compressed IS NULL
82+
""")
83+
)
84+
else:
85+
json_functions_available = False
86+
try:
87+
connection.execute(sa.text("SELECT JSON_SET('{}', '$.test', 'value')")).fetchone()
88+
json_functions_available = True
89+
print("SQLite JSON functions detected, using optimized SQL approach")
90+
except Exception:
91+
print("SQLite JSON functions not available, using Python fallback for JSON processing")
92+
93+
if json_functions_available:
94+
connection.execute(
95+
sa.text("""
96+
UPDATE serialized_dag
97+
SET data = JSON_SET(
98+
JSON_REMOVE(data, '$.client_defaults'),
99+
'$.__version',
100+
2
101+
)
102+
WHERE JSON_EXTRACT(data, '$.__version') = '3'
103+
AND data_compressed IS NULL
104+
""")
105+
)
106+
else:
107+
result = connection.execute(
108+
sa.text("""
109+
SELECT id, data
110+
FROM serialized_dag
111+
WHERE data_compressed IS NULL
112+
""")
113+
)
114+
115+
for row in result:
116+
dag_id, data_json = row
117+
try:
118+
if data_json is None:
119+
continue
120+
121+
dag_data = json.loads(data_json)
122+
123+
if dag_data.get("__version") != 3:
124+
continue
125+
126+
if "client_defaults" in dag_data:
127+
del dag_data["client_defaults"]
128+
dag_data["__version"] = 2
129+
130+
new_json = json.dumps(dag_data)
131+
connection.execute(
132+
sa.text("UPDATE serialized_dag SET data = :data WHERE id = :id"),
133+
{"data": new_json, "id": dag_id},
134+
)
135+
136+
except Exception as e:
137+
print(f"Failed to downgrade uncompressed DAG {dag_id}: {e}")
138+
continue
139+
try:
140+
result = connection.execute(
141+
sa.text("""
142+
SELECT id, data_compressed
143+
FROM serialized_dag
144+
WHERE data_compressed IS NOT NULL
145+
""")
146+
)
147+
148+
for row in result:
149+
dag_id, compressed_data = row
150+
try:
151+
if compressed_data is None:
152+
continue
153+
154+
decompressed = gzip.decompress(compressed_data)
155+
dag_data = json.loads(decompressed)
156+
157+
if dag_data.get("__version") != 3:
158+
continue
159+
160+
if "client_defaults" in dag_data:
161+
del dag_data["client_defaults"]
162+
dag_data["__version"] = 2
163+
164+
new_compressed = gzip.compress(json.dumps(dag_data).encode("utf-8"))
165+
connection.execute(
166+
sa.text("UPDATE serialized_dag SET data_compressed = :data WHERE id = :id"),
167+
{"data": new_compressed, "id": dag_id},
168+
)
169+
170+
except Exception as e:
171+
print(f"Failed to downgrade compressed DAG {dag_id}: {e}")
172+
continue
173+
174+
except Exception as e:
175+
print(f"Failed to process compressed DAGs during downgrade: {e}")
176+
raise

airflow-core/src/airflow/migrations/versions/0085_3_2_0_replace_asset_trigger_table_with_asset.py renamed to airflow-core/src/airflow/migrations/versions/0086_3_2_0_replace_asset_trigger_table_with_asset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
replace asset_trigger table with asset_watcher.
2121
2222
Revision ID: 15d84ca19038
23-
Revises: eaf332f43c7c
23+
Revises: cc92b33c6709
2424
Create Date: 2025-09-14 01:34:40.423767
2525
2626
"""
@@ -32,7 +32,7 @@
3232

3333
# revision identifiers, used by Alembic.
3434
revision = "15d84ca19038"
35-
down_revision = "eaf332f43c7c"
35+
down_revision = "cc92b33c6709"
3636
branch_labels = None
3737
depends_on = None
3838
airflow_version = "3.2.0"

airflow-core/src/airflow/utils/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class MappedClassProtocol(Protocol):
110110
"2.10.3": "5f2621c13b39",
111111
"3.0.0": "29ce7909c52b",
112112
"3.0.3": "fe199e1abd77",
113-
"3.1.0": "eaf332f43c7c",
113+
"3.1.0": "cc92b33c6709",
114114
"3.2.0": "15d84ca19038",
115115
}
116116

0 commit comments

Comments
 (0)