|
27 | 27 |
|
28 | 28 | import datetime |
29 | 29 |
|
30 | | -import airflow.providers.apache.kylin.hooks.kylin as kylin_hooks |
31 | 30 | from airflow import DAG |
32 | | -from airflow.providers.apache.kylin.hooks.kylin import KylinHook as OriginalKylinHook |
33 | | -from airflow.providers.common.sql.hooks.sql import DbApiHook |
34 | 31 | from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator |
35 | 32 |
|
36 | | - |
37 | | -class PatchedKylinHook(DbApiHook, OriginalKylinHook): |
38 | | - """ |
39 | | - Patched version of KylinHook that inherits from DbApiHook. |
40 | | - This allows SQLExecuteQueryOperator to use it. |
41 | | - """ |
42 | | - |
43 | | - conn_name_attr = "kylin_conn_id" |
44 | | - default_conn_name = "my_kylin_conn" |
45 | | - supports_autocommit = True |
46 | | - |
47 | | - def get_conn(self): |
48 | | - """ |
49 | | - Return connection details. |
50 | | - In a production scenario, you may want to establish a proper connection |
51 | | - object to interact with Kylin's REST API. |
52 | | - """ |
53 | | - conn = self.get_connection(self.kylin_conn_id) |
54 | | - host = conn.host |
55 | | - port = conn.port or 7070 |
56 | | - return (host, port, conn.login, conn.password) |
57 | | - |
58 | | - def run(self, sql, autocommit=True, parameters=None, **kwargs): |
59 | | - """ |
60 | | - Executes the given SQL query against Apache Kylin via its REST API. |
61 | | - The `autocommit` parameter and any additional keyword arguments are accepted. |
62 | | - It also includes the project name (from connection schema) in the request payload. |
63 | | - """ |
64 | | - conn = self.get_connection(self.kylin_conn_id) |
65 | | - host = conn.host |
66 | | - port = conn.port or 7070 |
67 | | - project = conn.schema # 使用連線中的 schema 作為 project name |
68 | | - if not project: |
69 | | - raise ValueError("Project name must be provided in the connection's schema field.") |
70 | | - url = f"http://{host}:{port}/kylin/api/query" |
71 | | - import requests |
72 | | - |
73 | | - payload = {"sql": sql, "project": project} |
74 | | - response = requests.post(url, json=payload, auth=(conn.login, conn.password)) |
75 | | - response.raise_for_status() |
76 | | - return response.json() |
77 | | - |
78 | | - |
79 | | -# Apply the monkey patch by replacing the original KylinHook with our patched version. |
80 | | -kylin_hooks.KylinHook = PatchedKylinHook |
81 | | - |
82 | | -# ===================== Monkey Patch End ===================== |
83 | | - |
84 | 33 | DAG_ID = "example_kylin" |
85 | 34 |
|
86 | 35 | with DAG( |
|
0 commit comments