Skip to content

Commit 9173b7a

Browse files
committed
add basic system tests for OpenLineage
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
1 parent e964c64 commit 9173b7a

12 files changed

Lines changed: 588 additions & 2 deletions

File tree

airflow/models/variable.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ def get(
152152
mask_secret(var_val, key)
153153
return var_val
154154

155+
@staticmethod
156+
@provide_session
157+
def list(session: Session = None) -> list[Variable]:
158+
return session.query(Variable).all()
159+
155160
@staticmethod
156161
@provide_session
157162
def set(

docs/apache-airflow-providers-openlineage/index.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@
5656
PyPI Repository <https://pypi.org/project/apache-airflow-providers-openlineage/>
5757
Installing from sources <installing-providers-from-sources>
5858

59+
.. toctree::
60+
:hidden:
61+
:maxdepth: 1
62+
:caption: System tests
63+
64+
System Tests <_api/tests/system/openlineage/index>
65+
66+
5967
.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
6068
6169

providers/src/airflow/providers/openlineage/plugins/adapter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ def emit(self, event: RunEvent):
156156
stack.enter_context(Stats.timer("ol.emit.attempts"))
157157
self._client.emit(redacted_event)
158158
self.log.debug("Successfully emitted OpenLineage event of id %s", event.run.runId)
159-
except Exception as e:
159+
except Exception:
160160
Stats.incr("ol.emit.failed")
161161
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
162-
self.log.debug("OpenLineage emission failure: %s", e)
162+
self.log.debug("OpenLineage emission failure: %s", exc_info=True)
163163

164164
return redacted_event
165165

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from typing import TYPE_CHECKING
20+
21+
from openlineage.client.serde import Serde
22+
from openlineage.client.transport import Transport
23+
24+
from airflow.models.variable import Variable
25+
from airflow.utils.log.logging_mixin import LoggingMixin
26+
27+
if TYPE_CHECKING:
28+
from openlineage.client.client import Event
29+
30+
31+
class VariableTransport(Transport, LoggingMixin):
32+
"""
33+
This transport sends OpenLineage events to Variables.
34+
35+
Key schema is <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>.
36+
It's made to be used in system tests, stored data read by OpenLineageTestOperator.
37+
"""
38+
39+
def __init__(self, *args, **kwargs) -> None:
40+
super().__init__(*args, **kwargs)
41+
42+
def emit(self, event: Event) -> None:
43+
key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr]
44+
event_str = Serde.to_json(event)
45+
if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None:
46+
Variable.set(key=key, value=var + [event_str], serialize_json=True)
47+
else:
48+
Variable.set(key=key, value=[event_str], serialize_json=True)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import pytest
20+
21+
from airflow.listeners.listener import get_listener_manager
22+
from airflow.providers.openlineage.plugins.listener import OpenLineageListener
23+
from airflow.providers.openlineage.transport.variable import VariableTransport
24+
25+
26+
@pytest.fixture(autouse=True)
27+
def set_transport_variable():
28+
lm = get_listener_manager()
29+
lm.clear()
30+
listener = OpenLineageListener()
31+
listener.adapter._client = listener.adapter.get_or_create_openlineage_client()
32+
listener.adapter._client.transport = VariableTransport()
33+
lm.add_listener(listener)
34+
yield
35+
lm.clear()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[
2+
{
3+
"eventType": "START",
4+
"eventTime": "{{ is_datetime(result) }}",
5+
"run": {
6+
"runId": "{{ is_uuid(result) }}"
7+
},
8+
"job": {
9+
"namespace": "default",
10+
"name": "openlineage_basic_dag.do_nothing_task",
11+
"facets": {
12+
"jobType": {
13+
"integration": "AIRFLOW",
14+
"jobType": "TASK",
15+
"processingType": "BATCH"
16+
}
17+
}
18+
}
19+
},
20+
{
21+
"eventType": "COMPLETE",
22+
"eventTime": "{{ is_datetime(result) }}",
23+
"run": {
24+
"runId": "{{ is_uuid(result) }}"
25+
},
26+
"job": {
27+
"namespace": "default",
28+
"name": "openlineage_basic_dag.do_nothing_task",
29+
"facets": {
30+
"jobType": {
31+
"integration": "AIRFLOW",
32+
"jobType": "TASK",
33+
"processingType": "BATCH"
34+
}
35+
}
36+
}
37+
}
38+
]
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from datetime import datetime
20+
21+
from airflow import DAG
22+
from airflow.providers.standard.operators.python import PythonOperator
23+
24+
from providers.tests.system.openlineage.operator import OpenLineageTestOperator
25+
26+
27+
def do_nothing():
28+
pass
29+
30+
31+
default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}
32+
33+
# Instantiate the DAG
34+
with DAG(
35+
"openlineage_basic_dag",
36+
default_args=default_args,
37+
start_date=datetime(2021, 1, 1),
38+
schedule=None,
39+
catchup=False,
40+
) as dag:
41+
nothing_task = PythonOperator(task_id="do_nothing_task", python_callable=do_nothing)
42+
43+
check_events = OpenLineageTestOperator(
44+
task_id="check_events", file_path="providers/tests/system/openlineage/example_openlineage.json"
45+
)
46+
47+
nothing_task >> check_events
48+
49+
50+
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
51+
52+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
53+
test_run = get_test_run(dag)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
[
2+
{
3+
"eventType": "START",
4+
"eventTime": "{{ is_datetime(result) }}",
5+
"run": {
6+
"runId": "{{ is_uuid(result) }}"
7+
},
8+
"job": {
9+
"namespace": "default",
10+
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
11+
"facets": {
12+
"jobType": {
13+
"integration": "AIRFLOW",
14+
"jobType": "TASK",
15+
"processingType": "BATCH"
16+
}
17+
}
18+
}
19+
},
20+
{
21+
"eventType": "COMPLETE",
22+
"eventTime": "{{ is_datetime(result) }}",
23+
"run": {
24+
"runId": "{{ is_uuid(result) }}"
25+
},
26+
"job": {
27+
"namespace": "default",
28+
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
29+
"facets": {
30+
"jobType": {
31+
"integration": "AIRFLOW",
32+
"jobType": "TASK",
33+
"processingType": "BATCH"
34+
}
35+
}
36+
}
37+
},
38+
{
39+
"eventType": "START",
40+
"eventTime": "{{ is_datetime(result) }}",
41+
"run": {
42+
"runId": "{{ is_uuid(result) }}"
43+
},
44+
"job": {
45+
"namespace": "default",
46+
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
47+
"facets": {
48+
"jobType": {
49+
"integration": "AIRFLOW",
50+
"jobType": "TASK",
51+
"processingType": "BATCH"
52+
}
53+
}
54+
}
55+
},
56+
57+
{
58+
"eventType": "COMPLETE",
59+
"eventTime": "{{ is_datetime(result) }}",
60+
"run": {
61+
"runId": "{{ is_uuid(result) }}"
62+
},
63+
"job": {
64+
"namespace": "default",
65+
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
66+
"facets": {
67+
"jobType": {
68+
"integration": "AIRFLOW",
69+
"jobType": "TASK",
70+
"processingType": "BATCH"
71+
}
72+
}
73+
}
74+
}
75+
]

0 commit comments

Comments
 (0)