Skip to content

Commit e63e32f

Browse files
committed
Group id is mandatory configurtion option for confluent_kafka 2.4.0+
The group.id parameter has been optional before 2.4.0 version of confluent_kafka relased on 7th of May. This started to fail our integration tests (cool). This PR adds "group.id" as extra field and sets the defaults for integration testing.
1 parent ca058a6 commit e63e32f

4 files changed

Lines changed: 5 additions & 3 deletions

File tree

airflow/providers/apache/kafka/hooks/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
4949
"hidden_fields": ["schema", "login", "password", "port", "host"],
5050
"relabeling": {"extra": "Config Dict"},
5151
"placeholders": {
52-
"extra": '{"bootstrap.servers": "localhost:9092"}',
52+
"extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}',
5353
},
5454
}
5555

airflow/utils/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ def create_default_connections(session: Session = NEW_SESSION):
424424
Connection(
425425
conn_id="kafka_default",
426426
conn_type="kafka",
427-
extra=json.dumps({"bootstrap.servers": "broker:29092"}),
427+
extra=json.dumps({"bootstrap.servers": "broker:29092", "group.id": "my-group"}),
428428
),
429429
session,
430430
)

tests/integration/providers/apache/kafka/hooks/test_admin_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
2626
from airflow.utils import db
2727

28-
client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092"}
28+
client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"}
2929

3030

3131
@pytest.mark.integration("kafka")

tests/integration/providers/apache/kafka/operators/test_produce.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TestProduceToTopic:
4141
"""
4242

4343
def setup_method(self):
44+
GROUP = "operator.producer.test.integration.test_1"
4445
db.merge_conn(
4546
Connection(
4647
conn_id="kafka_default",
@@ -50,6 +51,7 @@ def setup_method(self):
5051
"socket.timeout.ms": 10,
5152
"message.timeout.ms": 10,
5253
"bootstrap.servers": "broker:29092",
54+
"group.id": GROUP,
5355
}
5456
),
5557
)

0 commit comments

Comments
 (0)