Skip to content

Commit 483bbf6

Browse files
committed
unit tests for powerbi operator
1 parent eea4470 commit 483bbf6

4 files changed

Lines changed: 92 additions & 247 deletions

File tree

airflow/providers/microsoft/azure/hooks/powerbi.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,20 +146,6 @@ def raw_to_refresh_details(cls, refresh_details: dict) -> dict[str, str]:
146146
PowerBIDatasetRefreshFields.ERROR.value: str(refresh_details.get("serviceExceptionJson")),
147147
}
148148

149-
async def get_latest_refresh_details(self, dataset_id: str, group_id: str) -> dict[str, str] | None:
150-
"""
151-
Get the refresh details of the most recent dataset refresh in the refresh history of the data source.
152-
153-
:return: Dictionary containing refresh status and end time if refresh history exists, otherwise None.
154-
"""
155-
history = await self.get_refresh_history(dataset_id=dataset_id, group_id=group_id)
156-
157-
if len(history) == 0:
158-
return None
159-
160-
refresh_details = history[0]
161-
return refresh_details
162-
163149
async def get_refresh_details_by_refresh_id(
164150
self, dataset_id: str, group_id: str, refresh_id: str
165151
) -> dict[str, str]:

tests/providers/microsoft/azure/hooks/test_powerbi.py

Lines changed: 47 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,17 @@
1717
from __future__ import annotations
1818

1919
from unittest import mock
20-
from unittest.mock import MagicMock
2120

2221
import pytest
2322

24-
from airflow.models.connection import Connection
23+
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
2524
from airflow.providers.microsoft.azure.hooks.powerbi import (
2625
PowerBIDatasetRefreshException,
2726
PowerBIDatasetRefreshFields,
2827
PowerBIDatasetRefreshStatus,
2928
PowerBIHook,
3029
)
3130

32-
DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
33-
MODULE = "airflow.providers.microsoft.azure.hooks.powerbi"
34-
CLIENT_ID = "client_id"
35-
CLIENT_SECRET = "client_secret"
36-
TENANT_ID = "tenant_id"
37-
BASE_URL = "https://api.powerbi.com"
38-
API_VERSION = "v1.0"
39-
GROUP_ID = "group_id"
40-
DATASET_ID = "dataset_id"
41-
42-
API_RAW_RESPONSE = {
43-
"value": [
44-
# Completed refresh
45-
{
46-
"requestId": "5e2d9921-e91b-491f-b7e1-e7d8db49194c",
47-
"status": "Completed",
48-
"endTime": "2024-04-15T20:14:08.1458221Z",
49-
# serviceExceptionJson is not present when status is not "Failed"
50-
},
51-
# In-progress refresh
52-
{
53-
"requestId": "6b6536c1-cfcb-4148-9c21-402c3f5241e4",
54-
"status": "Unknown", # endtime is not available.
55-
},
56-
# Failed refresh
57-
{
58-
"requestId": "11bf290a-346b-48b7-8973-c5df149337ff",
59-
"status": "Failed",
60-
"endTime": "2024-04-15T20:14:08.1458221Z",
61-
"serviceExceptionJson": '{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}',
62-
},
63-
]
64-
}
65-
6631
FORMATTED_RESPONSE = [
6732
# Completed refresh
6833
{
@@ -84,181 +49,82 @@
8449
},
8550
]
8651

52+
DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
53+
GROUP_ID = "group_id"
54+
DATASET_ID = "dataset_id"
8755

88-
@pytest.fixture
89-
def powerbi_hook():
90-
client = PowerBIHook(powerbi_conn_id=DEFAULT_CONNECTION_CLIENT_SECRET)
91-
return client
56+
CONFIG = {"conn_id": DEFAULT_CONNECTION_CLIENT_SECRET, "timeout": 3, "api_version": "v1.0"}
9257

9358

9459
@pytest.fixture
95-
def get_token(powerbi_hook):
96-
powerbi_hook._get_token = MagicMock(return_value="access_token")
97-
return powerbi_hook._get_token()
98-
99-
100-
def test_get_token_with_missing_credentials(powerbi_hook):
101-
# Mock the get_connection method to return a connection with missing credentials
102-
powerbi_hook.get_connection = MagicMock(
103-
return_value=Connection(
104-
conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
105-
conn_type="powerbi",
106-
login=None,
107-
password=None,
108-
extra={
109-
"tenant_id": TENANT_ID,
110-
},
111-
)
112-
)
113-
114-
with pytest.raises(ValueError):
115-
powerbi_hook._get_token()
116-
117-
118-
def test_get_token_with_missing_tenant_id(powerbi_hook):
119-
# Mock the get_connection method to return a connection with missing tenant ID
120-
powerbi_hook.get_connection = MagicMock(
121-
return_value=Connection(
122-
conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
123-
conn_type="powerbi",
124-
login=CLIENT_ID,
125-
password=CLIENT_SECRET,
126-
extra={},
127-
)
128-
)
129-
130-
with pytest.raises(ValueError):
131-
powerbi_hook._get_token()
132-
133-
134-
@mock.patch(f"{MODULE}.ClientSecretCredential")
135-
def test_get_token_with_valid_credentials(mock_credential, powerbi_hook):
136-
# Mock the get_connection method to return a connection with valid credentials
137-
powerbi_hook.get_connection = MagicMock(
138-
return_value=Connection(
139-
conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
140-
conn_type="powerbi",
141-
login=CLIENT_ID,
142-
password=CLIENT_SECRET,
143-
extra={
144-
"tenant_id": TENANT_ID,
145-
},
146-
)
147-
)
148-
149-
token = powerbi_hook._get_token()
150-
mock_credential.assert_called()
151-
152-
assert token is not None
153-
154-
155-
def test_refresh_dataset(powerbi_hook, requests_mock, get_token):
156-
request_id = "request_id"
157-
158-
# Mock the request in _send_request method to return a successful response
159-
requests_mock.post(
160-
f"{BASE_URL}/{API_VERSION}/myorg/groups/{GROUP_ID}/datasets/{DATASET_ID}/refreshes",
161-
status_code=202,
162-
headers={"Authorization": f"Bearer {get_token}", "RequestId": request_id},
163-
)
164-
165-
result = powerbi_hook.refresh_dataset(dataset_id=DATASET_ID, group_id=GROUP_ID)
166-
167-
assert result == request_id
168-
169-
170-
def test_get_refresh_history_success(powerbi_hook, requests_mock, get_token):
171-
url = f"{BASE_URL}/{API_VERSION}/myorg/groups/{GROUP_ID}/datasets/{DATASET_ID}/refreshes"
172-
173-
requests_mock.get(
174-
url, json=API_RAW_RESPONSE, headers={"Authorization": f"Bearer {get_token}"}, status_code=200
175-
)
176-
177-
result = powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
178-
179-
assert len(result) == 3
180-
assert result == FORMATTED_RESPONSE
181-
182-
183-
def test_get_latest_refresh_details_with_no_history(powerbi_hook):
184-
# Mock the get_refresh_history method to return an empty list
185-
powerbi_hook.get_refresh_history = MagicMock(return_value=[])
186-
187-
result = powerbi_hook.get_latest_refresh_details(dataset_id=DATASET_ID, group_id=GROUP_ID)
188-
189-
assert result is None
60+
def powerbi_hook():
61+
return PowerBIHook(**CONFIG)
19062

19163

192-
def test_get_latest_refresh_details_with_history(powerbi_hook):
193-
# Mock the get_refresh_history method to return a list with refresh details
194-
refresh_history = FORMATTED_RESPONSE
195-
powerbi_hook.get_refresh_history = MagicMock(return_value=refresh_history)
64+
@pytest.mark.asyncio
65+
async def test_get_refresh_history(powerbi_hook):
66+
response_data = {"value": [{"requestId": "1234", "status": "Completed", "serviceExceptionJson": ""}]}
19667

197-
result = powerbi_hook.get_latest_refresh_details(dataset_id=DATASET_ID, group_id=GROUP_ID)
68+
with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run:
69+
mock_run.return_value = response_data
70+
result = await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
19871

199-
assert result == FORMATTED_RESPONSE[0]
72+
expected = [{"request_id": "1234", "status": "Completed", "error": ""}]
73+
assert result == expected
20074

20175

202-
def test_get_refresh_details_by_request_id(powerbi_hook):
76+
@pytest.mark.asyncio
77+
async def test_get_refresh_details_by_refresh_id(powerbi_hook):
20378
# Mock the get_refresh_history method to return a list of refresh histories
20479
refresh_histories = FORMATTED_RESPONSE
205-
powerbi_hook.get_refresh_history = MagicMock(return_value=refresh_histories)
80+
powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=refresh_histories)
20681

20782
# Call the function with a valid request ID
208-
request_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
209-
result = powerbi_hook.get_refresh_details_by_request_id(
210-
dataset_id=DATASET_ID, group_id=GROUP_ID, request_id=request_id
83+
refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
84+
result = await powerbi_hook.get_refresh_details_by_refresh_id(
85+
dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id
21186
)
21287

21388
# Assert that the correct refresh details are returned
21489
assert result == {
21590
PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c",
21691
PowerBIDatasetRefreshFields.STATUS.value: "Completed",
217-
PowerBIDatasetRefreshFields.END_TIME.value: "2024-04-15T20:14:08.1458221Z",
21892
PowerBIDatasetRefreshFields.ERROR.value: "None",
21993
}
22094

22195
# Call the function with an invalid request ID
22296
invalid_request_id = "invalid_request_id"
22397
with pytest.raises(PowerBIDatasetRefreshException):
224-
powerbi_hook.get_refresh_details_by_request_id(
225-
dataset_id=DATASET_ID, group_id=GROUP_ID, request_id=invalid_request_id
98+
await powerbi_hook.get_refresh_details_by_refresh_id(
99+
dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=invalid_request_id
226100
)
227101

228102

229-
_wait_for_dataset_refresh_status_test_args = [
230-
(PowerBIDatasetRefreshStatus.COMPLETED, PowerBIDatasetRefreshStatus.COMPLETED, True),
231-
(PowerBIDatasetRefreshStatus.FAILED, PowerBIDatasetRefreshStatus.COMPLETED, False),
232-
(PowerBIDatasetRefreshStatus.IN_PROGRESS, PowerBIDatasetRefreshStatus.COMPLETED, "timeout"),
233-
]
103+
@pytest.mark.asyncio
104+
async def test_trigger_dataset_refresh(powerbi_hook):
105+
response_data = {"requestid": "5e2d9921-e91b-491f-b7e1-e7d8db49194c"}
234106

107+
with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run:
108+
mock_run.return_value = response_data
109+
result = await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID)
110+
111+
assert result == "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
235112

236-
@pytest.mark.parametrize(
237-
argnames=("dataset_refresh_status", "expected_status", "expected_result"),
238-
argvalues=_wait_for_dataset_refresh_status_test_args,
239-
ids=[
240-
f"refresh_status_{argval[0]}_expected_{argval[1]}"
241-
for argval in _wait_for_dataset_refresh_status_test_args
242-
],
243-
)
244-
def test_wait_for_dataset_refresh_status(
245-
powerbi_hook, dataset_refresh_status, expected_status, expected_result
246-
):
247-
config = {
248-
"dataset_id": DATASET_ID,
249-
"group_id": GROUP_ID,
250-
"request_id": "5e2d9921-e91b-491f-b7e1-e7d8db49194c",
251-
"timeout": 3,
252-
"check_interval": 1,
253-
"expected_status": expected_status,
254-
}
255113

256-
# Mock the get_refresh_details_by_request_id method to return a dataset refresh details
257-
dataset_refresh_details = {PowerBIDatasetRefreshFields.STATUS.value: dataset_refresh_status}
258-
powerbi_hook.get_refresh_details_by_request_id = MagicMock(return_value=dataset_refresh_details)
114+
@pytest.mark.asyncio
115+
async def test_cancel_dataset_refresh(powerbi_hook):
116+
dataset_refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
259117

260-
if expected_result != "timeout":
261-
assert powerbi_hook.wait_for_dataset_refresh_status(**config) == expected_result
262-
else:
263-
with pytest.raises(PowerBIDatasetRefreshException):
264-
powerbi_hook.wait_for_dataset_refresh_status(**config)
118+
with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run:
119+
await powerbi_hook.cancel_dataset_refresh(DATASET_ID, GROUP_ID, dataset_refresh_id)
120+
121+
mock_run.assert_called_once_with(
122+
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
123+
response_type=None,
124+
path_parameters={
125+
"group_id": GROUP_ID,
126+
"dataset_id": DATASET_ID,
127+
"dataset_refresh_id": dataset_refresh_id,
128+
},
129+
method="DELETE",
130+
)

0 commit comments

Comments
 (0)