Skip to content

Commit b583752

Browse files
committed
tests: Add OpenLineage to system tests in Google provider
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
1 parent 63d3602 commit b583752

49 files changed

Lines changed: 4175 additions & 258 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

providers/tests/system/google/cloud/bigquery/example_bigquery_dts.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from airflow.providers.google.cloud.sensors.bigquery_dts import BigQueryDataTransferServiceTransferRunSensor
4545
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
4646
from airflow.utils.trigger_rule import TriggerRule
47+
from providers.openlineage.tests.system.openlineage.operator import OpenLineageTestOperator
4748

4849
from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
4950

@@ -158,6 +159,11 @@
158159
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
159160
)
160161

162+
check_openlineage_events = OpenLineageTestOperator(
163+
task_id="check_openlineage_events",
164+
file_path=str(Path(__file__).parent / "resources" / "openlineage" / "bigquery_dts.json"),
165+
)
166+
161167
# Task dependencies created via `XComArgs`:
162168
# gcp_bigquery_create_transfer >> gcp_bigquery_start_transfer
163169
# gcp_bigquery_create_transfer >> gcp_run_sensor
@@ -178,6 +184,7 @@
178184
# TEST TEARDOWN
179185
delete_dataset,
180186
delete_bucket,
187+
check_openlineage_events,
181188
)
182189

183190
from tests_common.test_utils.watcher import watcher
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
Example Airflow DAG for Google BigQuery service.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import os
25+
from datetime import datetime
26+
from pathlib import Path
27+
28+
from airflow.models.dag import DAG
29+
from airflow.providers.google.cloud.operators.bigquery import (
30+
BigQueryCreateEmptyDatasetOperator,
31+
BigQueryCreateEmptyTableOperator,
32+
BigQueryDeleteDatasetOperator,
33+
BigQueryInsertJobOperator,
34+
)
35+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
36+
from airflow.utils.trigger_rule import TriggerRule
37+
from providers.openlineage.tests.system.openlineage.operator import OpenLineageTestOperator
38+
39+
from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
40+
41+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
42+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
43+
LOCATION = "us-east1"
44+
45+
TABLE_1 = "table1"
46+
TABLE_2 = "table2"
47+
TABLE_3 = "table3"
48+
TABLE_4 = "table4"
49+
50+
SCHEMA = [
51+
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
52+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
53+
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
54+
]
55+
56+
DAG_ID = "bigquery_jobs"
57+
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
58+
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
59+
DATASET = f"{DATASET_NAME}1"
60+
INSERT_DATE = "2030-01-01"
61+
INSERT_ROWS_QUERY = (
62+
f"INSERT {DATASET}.{TABLE_1} VALUES "
63+
f"(42, 'monty python', '{INSERT_DATE}'), "
64+
f"(42, 'fishy fish', '{INSERT_DATE}');"
65+
)
66+
67+
CTAS_QUERY = (
68+
f"CREATE OR REPLACE TABLE {DATASET}.{TABLE_3} AS SELECT value, name, ds FROM {DATASET}.{TABLE_1};"
69+
)
70+
71+
with DAG(
72+
DAG_ID,
73+
schedule="@once",
74+
start_date=datetime(2021, 1, 1),
75+
catchup=False,
76+
tags=["example", "bigquery"],
77+
) as dag:
78+
create_bucket = GCSCreateBucketOperator(
79+
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID, storage_class="STANDARD"
80+
)
81+
82+
create_dataset = BigQueryCreateEmptyDatasetOperator(
83+
task_id="create_dataset",
84+
dataset_id=DATASET,
85+
)
86+
87+
create_table = BigQueryCreateEmptyTableOperator(
88+
task_id="create_table",
89+
dataset_id=DATASET,
90+
table_id=TABLE_1,
91+
schema_fields=SCHEMA,
92+
)
93+
94+
insert_query_job = BigQueryInsertJobOperator(
95+
task_id="insert_query_job",
96+
configuration={
97+
"query": {
98+
"query": INSERT_ROWS_QUERY,
99+
"useLegacySql": False,
100+
"priority": "BATCH",
101+
}
102+
},
103+
)
104+
105+
execute_copy_job = BigQueryInsertJobOperator(
106+
task_id="execute_copy_job",
107+
configuration={
108+
"copy": {
109+
"sourceTables": [{"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE_1}],
110+
"destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE_2},
111+
"writeDisposition": "WRITE_TRUNCATE",
112+
"operationType": "COPY",
113+
}
114+
},
115+
)
116+
117+
execute_ctas_query = BigQueryInsertJobOperator(
118+
task_id="execute_ctas_query",
119+
configuration={
120+
"query": {
121+
"query": CTAS_QUERY,
122+
"useLegacySql": False,
123+
}
124+
},
125+
)
126+
127+
execute_load_job = BigQueryInsertJobOperator(
128+
task_id="execute_load_job",
129+
configuration={
130+
"load": {
131+
"sourceUris": ["gs://cloud-samples-data/bigquery/us-states/us-states.csv"],
132+
"destinationTable": {
133+
"projectId": PROJECT_ID,
134+
"datasetId": DATASET,
135+
"tableId": TABLE_4,
136+
},
137+
"schema": {
138+
"fields": [
139+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
140+
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
141+
]
142+
},
143+
"write_disposition": "WRITE_TRUNCATE",
144+
"sourceFormat": "CSV",
145+
}
146+
},
147+
)
148+
149+
execute_extract_job = BigQueryInsertJobOperator(
150+
task_id="execute_extract_job",
151+
configuration={
152+
"extract": {
153+
"sourceTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE_4},
154+
"destinationUris": [
155+
f"gs://{BUCKET_NAME}/extract.csv",
156+
],
157+
"destinationFormat": "CSV",
158+
}
159+
},
160+
)
161+
162+
delete_dataset = BigQueryDeleteDatasetOperator(
163+
task_id="delete_dataset",
164+
dataset_id=DATASET,
165+
delete_contents=True,
166+
trigger_rule=TriggerRule.ALL_DONE,
167+
)
168+
169+
delete_bucket = GCSDeleteBucketOperator(
170+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
171+
)
172+
173+
# if not location:
174+
check_openlineage_events = OpenLineageTestOperator(
175+
task_id="check_openlineage_events",
176+
file_path=str(Path(__file__).parent / "resources" / "openlineage" / "bigquery_jobs.json"),
177+
)
178+
delete_dataset >> check_openlineage_events
179+
180+
# TEST SETUP
181+
create_dataset >> create_table
182+
# TEST BODY
183+
create_table >> insert_query_job >> execute_copy_job >> execute_ctas_query >> delete_dataset
184+
create_dataset >> execute_load_job >> execute_extract_job >> delete_dataset
185+
# TEST TEARDOWN
186+
delete_dataset >> delete_bucket
187+
188+
from tests_common.test_utils.watcher import watcher
189+
190+
# This test needs watcher in order to properly mark success/failure
191+
# when "tearDown" task with trigger rule is part of the DAG
192+
list(dag.tasks) >> watcher()
193+
194+
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
195+
196+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
197+
test_run = get_test_run(dag)

providers/tests/system/google/cloud/bigquery/example_bigquery_operations.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
3535
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
3636
from airflow.utils.trigger_rule import TriggerRule
37+
from providers.openlineage.tests.system.openlineage.operator import OpenLineageTestOperator
3738

3839
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
3940
DAG_ID = "bigquery_operations"
@@ -86,6 +87,11 @@
8687
task_id="delete_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
8788
)
8889

90+
check_openlineage_events = OpenLineageTestOperator(
91+
task_id="check_openlineage_events",
92+
file_path=str(Path(__file__).parent / "resources" / "openlineage" / "bigquery_operations.json"),
93+
)
94+
8995
(
9096
# TEST SETUP
9197
[create_bucket, create_dataset]
@@ -95,6 +101,7 @@
95101
# TEST TEARDOWN
96102
>> delete_dataset
97103
>> delete_bucket
104+
>> check_openlineage_events
98105
)
99106

100107
from tests_common.test_utils.watcher import watcher

0 commit comments

Comments
 (0)