|
17 | 17 | # under the License. |
18 | 18 | """ |
19 | 19 | Example Airflow DAG that shows how to use GoogleAdsToGcsOperator. |
| 20 | +
|
| 21 | +In order to run this test, make sure you followed steps: |
| 22 | +1. In your GCP project create a service account that will be used to operate on Google Ads. |
| 23 | +The name should be in format `google-ads-service-account@{PROJECT_ID}.iam.gserviceaccount.com` |
| 24 | +2. Generate a key for this service account and store it in the Secret Manager |
| 25 | +under the name `google_ads_service_account_key`. |
| 26 | +3. Give this service account Editor permissions. |
| 27 | +4. Make sure Google Ads API is enabled in your GCP project. |
| 28 | +5. Login to https://ads.google.com |
| 29 | +6. In the Admin section go to Access and Security and give your GCP service account Admin permissions. |
| 30 | +7. Store values of your developer token and client ID to Secret Manager under names `google_ads_client_id` |
| 31 | +and `google_ads_developer_token`. |
20 | 32 | """ |
21 | 33 |
|
22 | 34 | from __future__ import annotations |
23 | 35 |
|
| 36 | +import json |
| 37 | +import logging |
24 | 38 | import os |
25 | 39 | from datetime import datetime |
26 | 40 |
|
| 41 | +from google.cloud.exceptions import NotFound |
| 42 | + |
| 43 | +from airflow.decorators import task |
27 | 44 | from airflow.models.dag import DAG |
28 | 45 | from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator |
29 | 46 | from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator |
| 47 | +from airflow.providers.google.cloud.hooks.secret_manager import GoogleCloudSecretManagerHook |
30 | 48 | from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
31 | 49 | from airflow.utils.trigger_rule import TriggerRule |
32 | 50 |
|
33 | | -from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID |
| 51 | +from tests_common.test_utils.api_client_helpers import create_airflow_connection, delete_airflow_connection |
34 | 52 |
|
35 | 53 | # [START howto_google_ads_env_variables] |
36 | 54 | ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") |
37 | | -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID |
| 55 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") |
38 | 56 | API_VERSION = "v19" |
39 | 57 |
|
40 | | -DAG_ID = "example_google_ads" |
| 58 | +DAG_ID = "google_ads" |
| 59 | + |
| 60 | +GOOGLE_ADS_CLIENT_ID = "google_ads_client_id" |
| 61 | +GOOGLE_ADS_SERVICE_ACCOUNT_KEY = "google_ads_service_account_key" |
| 62 | +GOOGLE_ADS_DEVELOPER_TOKEN = "google_ads_developer_token" |
41 | 63 |
|
42 | 64 | BUCKET_NAME = f"bucket_ads_{ENV_ID}" |
43 | | -CLIENT_IDS = ["1111111111", "2222222222"] |
44 | | -GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv" |
45 | | -GCS_ACCOUNTS_CSV = "folder_name/accounts.csv" |
| 65 | +GCS_OBJ_PATH = f"gs://{BUCKET_NAME}/google-ads-api-results.csv" |
| 66 | +GCS_ACCOUNTS_CSV = "accounts.csv" |
46 | 67 | QUERY = """ |
47 | 68 | SELECT |
48 | 69 | segments.date, |
|
61 | 82 | segments.date >= '2020-02-01' |
62 | 83 | AND segments.date <= '2020-02-29' |
63 | 84 | """ |
| 85 | +CONNECTION_GLOUD_ID = f"connection_cloud_{DAG_ID}_{ENV_ID}" |
| 86 | +CONNECTION_ADS_ID = "google_ads_default" |
| 87 | +CONNECTION_TYPE = "google_cloud_platform" |
64 | 88 |
|
65 | 89 | FIELDS_TO_EXTRACT = [ |
66 | 90 | "segments.date.value", |
|
76 | 100 | ] |
77 | 101 | # [END howto_google_ads_env_variables] |
78 | 102 |
|
| 103 | +log = logging.getLogger(__name__) |
| 104 | + |
| 105 | + |
| 106 | +def get_secret(secret_id: str) -> str: |
| 107 | + hook = GoogleCloudSecretManagerHook() |
| 108 | + if hook.secret_exists(secret_id=secret_id): |
| 109 | + return hook.access_secret(secret_id=secret_id).payload.data.decode() |
| 110 | + raise NotFound("The secret '%s' not found", secret_id) |
| 111 | + |
| 112 | + |
79 | 113 | with DAG( |
80 | 114 | DAG_ID, |
81 | 115 | schedule="@once", |
82 | 116 | start_date=datetime(2021, 1, 1), |
83 | 117 | catchup=False, |
84 | 118 | tags=["example", "ads"], |
| 119 | + render_template_as_native_obj=True, |
85 | 120 | ) as dag: |
| 121 | + |
| 122 | + @task |
| 123 | + def get_google_ads_client_id(): |
| 124 | + return get_secret(secret_id=GOOGLE_ADS_CLIENT_ID).strip() |
| 125 | + |
| 126 | + get_google_ads_client_id_task = get_google_ads_client_id() |
| 127 | + |
| 128 | + @task |
| 129 | + def get_google_ads_service_account_key(): |
| 130 | + return get_secret(secret_id=GOOGLE_ADS_SERVICE_ACCOUNT_KEY) |
| 131 | + |
| 132 | + get_google_ads_service_account_key_task = get_google_ads_service_account_key() |
| 133 | + |
| 134 | + @task |
| 135 | + def get_google_ads_developer_token(): |
| 136 | + return get_secret(secret_id=GOOGLE_ADS_DEVELOPER_TOKEN).strip() |
| 137 | + |
| 138 | + get_google_ads_developer_token_task = get_google_ads_developer_token() |
| 139 | + |
| 140 | + @task |
| 141 | + def create_connection_gcloud_for_ads(connection_id: str, key) -> None: |
| 142 | + conn_extra_json = json.dumps( |
| 143 | + { |
| 144 | + "keyfile_dict": key, |
| 145 | + "project": PROJECT_ID, |
| 146 | + "scope": "https://www.googleapis.com/auth/adwords, https://www.googleapis.com/auth/cloud-platform", |
| 147 | + } |
| 148 | + ) |
| 149 | + create_airflow_connection( |
| 150 | + connection_id=connection_id, |
| 151 | + connection_conf={"conn_type": CONNECTION_TYPE, "extra": conn_extra_json}, |
| 152 | + ) |
| 153 | + |
| 154 | + create_connection_gcloud_for_ads = create_connection_gcloud_for_ads( |
| 155 | + connection_id=CONNECTION_GLOUD_ID, key=get_google_ads_service_account_key_task |
| 156 | + ) |
| 157 | + |
| 158 | + @task |
| 159 | + def create_connection_ads(connection_id: str, token) -> None: |
| 160 | + conn_extra_json = json.dumps( |
| 161 | + { |
| 162 | + "google_ads_client": { |
| 163 | + "developer_token": token, |
| 164 | + # this parameter is required to be not None, but the actual content will be overwritten, so can be some dummy string |
| 165 | + "json_key_file_path": "some_string", |
| 166 | + "impersonated_email": f"google-ads-service-account@{PROJECT_ID}.iam.gserviceaccount.com", |
| 167 | + "use_proto_plus": False, |
| 168 | + }, |
| 169 | + "project": PROJECT_ID, |
| 170 | + "scope": "https://www.googleapis.com/auth/adwords, https://www.googleapis.com/auth/cloud-platform", |
| 171 | + } |
| 172 | + ) |
| 173 | + create_airflow_connection( |
| 174 | + connection_id=connection_id, |
| 175 | + connection_conf={"conn_type": CONNECTION_TYPE, "extra": conn_extra_json}, |
| 176 | + ) |
| 177 | + |
| 178 | + create_connection_ads_task = create_connection_ads( |
| 179 | + connection_id=CONNECTION_ADS_ID, token=get_google_ads_developer_token_task |
| 180 | + ) |
| 181 | + |
86 | 182 | create_bucket = GCSCreateBucketOperator( |
87 | | - task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID |
| 183 | + task_id="create_bucket", |
| 184 | + bucket_name=BUCKET_NAME, |
| 185 | + project_id=PROJECT_ID, |
| 186 | + gcp_conn_id=CONNECTION_GLOUD_ID, |
88 | 187 | ) |
89 | 188 |
|
90 | 189 | # [START howto_google_ads_to_gcs_operator] |
91 | 190 | run_operator = GoogleAdsToGcsOperator( |
92 | | - client_ids=CLIENT_IDS, |
| 191 | + client_ids=[get_google_ads_client_id_task], |
93 | 192 | query=QUERY, |
94 | 193 | attributes=FIELDS_TO_EXTRACT, |
95 | 194 | obj=GCS_OBJ_PATH, |
96 | 195 | bucket=BUCKET_NAME, |
97 | 196 | task_id="run_operator", |
98 | 197 | api_version=API_VERSION, |
| 198 | + gcp_conn_id=CONNECTION_GLOUD_ID, |
99 | 199 | ) |
100 | 200 | # [END howto_google_ads_to_gcs_operator] |
101 | 201 |
|
102 | 202 | # [START howto_ads_list_accounts_operator] |
103 | 203 | list_accounts = GoogleAdsListAccountsOperator( |
104 | | - task_id="list_accounts", bucket=BUCKET_NAME, object_name=GCS_ACCOUNTS_CSV |
| 204 | + task_id="list_accounts", |
| 205 | + bucket=BUCKET_NAME, |
| 206 | + object_name=GCS_ACCOUNTS_CSV, |
| 207 | + api_version=API_VERSION, |
| 208 | + gcp_conn_id=CONNECTION_GLOUD_ID, |
105 | 209 | ) |
106 | 210 | # [END howto_ads_list_accounts_operator] |
107 | 211 |
|
108 | 212 | delete_bucket = GCSDeleteBucketOperator( |
109 | | - task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 213 | + task_id="delete_bucket", |
| 214 | + bucket_name=BUCKET_NAME, |
| 215 | + gcp_conn_id=CONNECTION_GLOUD_ID, |
| 216 | + trigger_rule=TriggerRule.ALL_DONE, |
110 | 217 | ) |
111 | 218 |
|
| 219 | + @task(task_id="delete_connection_gloud") |
| 220 | + def delete_connection_gloud(connection_id: str) -> None: |
| 221 | + delete_airflow_connection(connection_id=connection_id) |
| 222 | + |
| 223 | + delete_connection_gloud_task = delete_connection_gloud(connection_id=CONNECTION_GLOUD_ID) |
| 224 | + |
| 225 | + @task(task_id="delete_connection_ads") |
| 226 | + def delete_connection_ads(connection_id: str) -> None: |
| 227 | + delete_airflow_connection(connection_id=connection_id) |
| 228 | + |
| 229 | + delete_connection_ads_task = delete_connection_ads(connection_id=CONNECTION_ADS_ID) |
| 230 | + |
112 | 231 | ( |
113 | 232 | # TEST SETUP |
114 | | - create_bucket |
| 233 | + [ |
| 234 | + get_google_ads_client_id_task, |
| 235 | + get_google_ads_service_account_key_task, |
| 236 | + get_google_ads_developer_token_task, |
| 237 | + ] |
| 238 | + >> create_connection_gcloud_for_ads |
| 239 | + >> create_connection_ads_task |
| 240 | + >> create_bucket |
115 | 241 | # TEST BODY |
116 | 242 | >> run_operator |
117 | 243 | >> list_accounts |
118 | 244 | # TEST TEARDOWN |
119 | 245 | >> delete_bucket |
| 246 | + >> [delete_connection_gloud_task, delete_connection_ads_task] |
120 | 247 | ) |
121 | 248 |
|
122 | 249 | from tests_common.test_utils.watcher import watcher |
|
0 commit comments