Skip to content

Commit 0a11b78

Browse files
committed
Release metadata locks in BaseDBManager
1 parent cb59532 commit 0a11b78

1 file changed

Lines changed: 33 additions & 0 deletions

File tree

airflow-core/src/airflow/utils/db_manager.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,34 @@ def __init__(self, session):
4848
super().__init__()
4949
self.session = session
5050

51+
def _is_mysql(self) -> bool:
52+
"""Check if the database is MySQL."""
53+
return self.session.get_bind().dialect.name == "mysql"
54+
55+
def _release_metadata_locks(self) -> None:
56+
"""
57+
Release MySQL metadata locks by committing the underlying Connection.
58+
59+
In SQLAlchemy 2.0, session.commit() may NOT commit the underlying
60+
Connection transaction due to join_transaction_mode behavior.
61+
We must commit the Connection directly.
62+
"""
63+
if not self._is_mysql():
64+
return
65+
66+
self.log.debug("MySQL: Releasing metadata locks for DDL operations")
67+
68+
# Get the Connection from the Session
69+
connection = self.session.connection()
70+
71+
# Check if we're in an active transaction
72+
if connection.in_transaction():
73+
self.log.debug("MySQL: Connection is in transaction, committing directly")
74+
# Commit the Connection directly - this WILL release the transaction
75+
# regardless of join_transaction_mode
76+
connection.commit()
77+
self.log.debug("MySQL: Connection committed, metadata locks released")
78+
5179
def get_alembic_config(self):
5280
from alembic.config import Config
5381

@@ -107,6 +135,8 @@ def drop_tables(self, connection):
107135
def resetdb(self, skip_init=False):
108136
from airflow.utils.db import DBLocks, create_global_lock
109137

138+
self._release_metadata_locks()
139+
110140
connection = settings.engine.connect()
111141

112142
with create_global_lock(self.session, lock=DBLocks.MIGRATIONS), connection.begin():
@@ -117,6 +147,7 @@ def resetdb(self, skip_init=False):
117147

118148
def initdb(self):
119149
"""Initialize the database."""
150+
self._release_metadata_locks()
120151
db_exists = self.get_current_revision()
121152
if db_exists:
122153
self.upgradedb()
@@ -127,6 +158,8 @@ def upgradedb(self, to_revision=None, from_revision=None, show_sql_only=False):
127158
"""Upgrade the database."""
128159
self.log.info("Upgrading the %s database", self.__class__.__name__)
129160

161+
self._release_metadata_locks()
162+
130163
config = self.get_alembic_config()
131164
command.upgrade(config, revision=to_revision or "heads", sql=show_sql_only)
132165
self.log.info("Migrated the %s database", self.__class__.__name__)

0 commit comments

Comments
 (0)