Skip to content

Commit e3a018f

Browse files
Taragolisephraimbuddy
authored andcommitted
Optionally use client.files_upload_v2 in Slack Provider (#36757)
* Optionally use `client.files_upload_v2` in Slack Provider * Revert default value for method * Remove redundand assigment (cherry picked from commit 9758acf)
1 parent b9f54a5 commit e3a018f

12 files changed

Lines changed: 547 additions & 144 deletions

File tree

airflow/providers/slack/hooks/slack.py

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
import warnings
2222
from functools import cached_property
2323
from pathlib import Path
24-
from typing import TYPE_CHECKING, Any, Sequence
24+
from typing import TYPE_CHECKING, Any, Sequence, TypedDict
2525

2626
from slack_sdk import WebClient
2727
from slack_sdk.errors import SlackApiError
28+
from typing_extensions import NotRequired
2829

2930
from airflow.exceptions import AirflowNotFoundException
3031
from airflow.hooks.base import BaseHook
@@ -36,6 +37,26 @@
3637
from slack_sdk.web.slack_response import SlackResponse
3738

3839

40+
class FileUploadTypeDef(TypedDict):
41+
"""
42+
Represents the structure of the file upload data.
43+
44+
:ivar file: Optional. Path to file which need to be sent.
45+
:ivar content: Optional. File contents. If omitting this parameter, you must provide a file.
46+
:ivar filename: Optional. Displayed filename.
47+
:ivar title: Optional. The title of the uploaded file.
48+
:ivar alt_txt: Optional. Description of image for screen-reader.
49+
:ivar snippet_type: Optional. Syntax type of the snippet being uploaded.
50+
"""
51+
52+
file: NotRequired[str | None]
53+
content: NotRequired[str | None]
54+
filename: NotRequired[str | None]
55+
title: NotRequired[str | None]
56+
alt_txt: NotRequired[str | None]
57+
snippet_type: NotRequired[str | None]
58+
59+
3960
class SlackHook(BaseHook):
4061
"""
4162
Creates a Slack API Connection to be used for calls.
@@ -111,6 +132,9 @@ def __init__(
111132
extra_client_args["logger"] = self.log
112133
self.extra_client_args = extra_client_args
113134

135+
# Use for caching channels result
136+
self._channels_mapping: dict[str, str] = {}
137+
114138
@cached_property
115139
def client(self) -> WebClient:
116140
"""Get the underlying slack_sdk.WebClient (cached)."""
@@ -212,6 +236,128 @@ def send_file(
212236
channels=channels,
213237
)
214238

239+
def send_file_v2(
240+
self,
241+
*,
242+
channel_id: str | None = None,
243+
file_uploads: FileUploadTypeDef | list[FileUploadTypeDef],
244+
initial_comment: str | None = None,
245+
) -> SlackResponse:
246+
"""
247+
Sends one or more files to a Slack channel using the Slack SDK Client method `files_upload_v2`.
248+
249+
:param channel_id: The ID of the channel to send the file to.
250+
If omitting this parameter, then file will send to workspace.
251+
:param file_uploads: The file(s) specification to upload.
252+
:param initial_comment: The message text introducing the file in specified ``channel``.
253+
"""
254+
if channel_id and channel_id.startswith("#"):
255+
retried_channel_id = self.get_channel_id(channel_id[1:])
256+
warnings.warn(
257+
"The method `files_upload_v2` in the Slack SDK Client expects a Slack Channel ID, "
258+
f"but received a Slack Channel Name. To resolve this, consider replacing {channel_id!r} "
259+
f"with the corresponding Channel ID {retried_channel_id!r}.",
260+
UserWarning,
261+
stacklevel=2,
262+
)
263+
channel_id = retried_channel_id
264+
265+
if not isinstance(file_uploads, list):
266+
file_uploads = [file_uploads]
267+
for file_upload in file_uploads:
268+
if not file_upload.get("filename"):
269+
# Some of early version of Slack SDK (such as 3.19.0) raise an error if ``filename`` not set.
270+
file_upload["filename"] = "Uploaded file"
271+
272+
return self.client.files_upload_v2(
273+
channel=channel_id,
274+
# mypy doesn't happy even if TypedDict used instead of dict[str, Any]
275+
# see: https://github.com/python/mypy/issues/4976
276+
file_uploads=file_uploads, # type: ignore[arg-type]
277+
initial_comment=initial_comment,
278+
)
279+
280+
def send_file_v1_to_v2(
281+
self,
282+
*,
283+
channels: str | Sequence[str] | None = None,
284+
file: str | Path | None = None,
285+
content: str | None = None,
286+
filename: str | None = None,
287+
initial_comment: str | None = None,
288+
title: str | None = None,
289+
filetype: str | None = None,
290+
) -> list[SlackResponse]:
291+
"""
292+
Smooth transition between ``send_file`` and ``send_file_v2`` methods.
293+
294+
:param channels: Comma-separated list of channel names or IDs where the file will be shared.
295+
If omitting this parameter, then file will send to workspace.
296+
File would be uploaded for each channel individually.
297+
:param file: Path to file which need to be sent.
298+
:param content: File contents. If omitting this parameter, you must provide a file.
299+
:param filename: Displayed filename.
300+
:param initial_comment: The message text introducing the file in specified ``channels``.
301+
:param title: Title of the file.
302+
:param filetype: A file type identifier.
303+
"""
304+
if not exactly_one(file, content):
305+
raise ValueError("Either `file` or `content` must be provided, not both.")
306+
if file:
307+
file = Path(file)
308+
file_uploads: FileUploadTypeDef = {"file": file.__fspath__(), "filename": filename or file.name}
309+
else:
310+
file_uploads = {"content": content, "filename": filename}
311+
312+
file_uploads.update({"title": title, "snippet_type": filetype})
313+
314+
if channels:
315+
if isinstance(channels, str):
316+
channels = channels.split(",")
317+
channels_to_share: list[str | None] = list(map(str.strip, channels))
318+
else:
319+
channels_to_share = [None]
320+
321+
responses = []
322+
for channel in channels_to_share:
323+
responses.append(
324+
self.send_file_v2(
325+
channel_id=channel, file_uploads=file_uploads, initial_comment=initial_comment
326+
)
327+
)
328+
return responses
329+
330+
def get_channel_id(self, channel_name: str) -> str:
331+
"""
332+
Retrieves a Slack channel id by a channel name.
333+
334+
It continuously iterates over all Slack channels (public and private)
335+
until it finds the desired channel name in addition cache results for further usage.
336+
337+
.. seealso::
338+
https://api.slack.com/methods/conversations.list
339+
340+
:param channel_name: The name of the Slack channel for which ID has to be found.
341+
"""
342+
next_cursor = None
343+
while not (channel_id := self._channels_mapping.get(channel_name)):
344+
res = self.client.conversations_list(cursor=next_cursor, types="public_channel,private_channel")
345+
if TYPE_CHECKING:
346+
# Slack SDK response type too broad, this should make mypy happy
347+
assert isinstance(res.data, dict)
348+
349+
for channel_data in res.data.get("channels", []):
350+
self._channels_mapping[channel_data["name"]] = channel_data["id"]
351+
352+
if not (next_cursor := res.data.get("response_metadata", {}).get("next_cursor")):
353+
channel_id = self._channels_mapping.get(channel_name)
354+
break
355+
356+
if not channel_id:
357+
msg = f"Unable to find slack channel with name: {channel_name!r}"
358+
raise LookupError(msg)
359+
return channel_id
360+
215361
def test_connection(self):
216362
"""Tests the Slack API connection.
217363

airflow/providers/slack/operators/slack.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,24 @@
2222
from functools import cached_property
2323
from typing import TYPE_CHECKING, Any, Sequence
2424

25+
from typing_extensions import Literal
26+
2527
from airflow.exceptions import AirflowProviderDeprecationWarning
2628
from airflow.models import BaseOperator
2729
from airflow.providers.slack.hooks.slack import SlackHook
2830

2931
if TYPE_CHECKING:
3032
from slack_sdk.http_retry import RetryHandler
3133

34+
from airflow.utils.context import Context
35+
3236

3337
class SlackAPIOperator(BaseOperator):
3438
"""Base Slack Operator class.
3539
3640
:param slack_conn_id: :ref:`Slack API Connection <howto/connection:slack>`
3741
which its password is Slack API token.
38-
:param method: The Slack API Method to Call (https://api.slack.com/methods). Optional
42+
:param method: The Slack API Method to Call (https://api.slack.com/methods).
3943
:param api_params: API Method call parameters (https://api.slack.com/methods). Optional
4044
:param timeout: The maximum number of seconds the client will wait to connect
4145
and receive a response from Slack. Optional
@@ -56,6 +60,13 @@ def __init__(
5660
retry_handlers: list[RetryHandler] | None = None,
5761
**kwargs,
5862
) -> None:
63+
if not method:
64+
warnings.warn(
65+
"Define `method` parameter as empty string or None is deprecated. "
66+
"In the future it will raise an error on task initialisation.",
67+
AirflowProviderDeprecationWarning,
68+
stacklevel=2,
69+
)
5970
super().__init__(**kwargs)
6071
self.slack_conn_id = slack_conn_id
6172
self.method = method
@@ -90,7 +101,10 @@ def construct_api_call_params(self) -> Any:
90101
"SlackAPIOperator should not be used directly. Chose one of the subclasses instead"
91102
)
92103

93-
def execute(self, **kwargs):
104+
def execute(self, context: Context):
105+
if not self.method:
106+
msg = f"Expected non empty `method` attribute in {type(self).__name__!r}, but got {self.method!r}"
107+
raise ValueError(msg)
94108
if not self.api_params:
95109
self.construct_api_call_params()
96110
self.hook.call(self.method, json=self.api_params)
@@ -139,14 +153,13 @@ def __init__(
139153
attachments: list | None = None,
140154
**kwargs,
141155
) -> None:
142-
self.method = "chat.postMessage"
156+
super().__init__(method="chat.postMessage", **kwargs)
143157
self.channel = channel
144158
self.username = username
145159
self.text = text
146160
self.icon_url = icon_url
147161
self.attachments = attachments or []
148162
self.blocks = blocks or []
149-
super().__init__(method=self.method, **kwargs)
150163

151164
def construct_api_call_params(self) -> Any:
152165
self.api_params = {
@@ -192,7 +205,7 @@ class SlackAPIFileOperator(SlackAPIOperator):
192205
:param filetype: slack filetype. (templated) See: https://api.slack.com/types/file#file_types
193206
:param content: file content. (templated)
194207
:param title: title of file. (templated)
195-
:param channel: (deprecated) channel in which to sent file on slack name
208+
:param method_version: The version of the method of Slack SDK Client to be used, either "v1" or "v2".
196209
"""
197210

198211
template_fields: Sequence[str] = (
@@ -213,10 +226,10 @@ def __init__(
213226
filetype: str | None = None,
214227
content: str | None = None,
215228
title: str | None = None,
216-
channel: str | None = None,
229+
method_version: Literal["v1", "v2"] = "v1",
217230
**kwargs,
218231
) -> None:
219-
if channel:
232+
if channel := kwargs.pop("channel", None):
220233
warnings.warn(
221234
"Argument `channel` is deprecated and will removed in a future releases. "
222235
"Please use `channels` instead.",
@@ -227,16 +240,23 @@ def __init__(
227240
raise ValueError(f"Cannot set both arguments: channel={channel!r} and channels={channels!r}.")
228241
channels = channel
229242

243+
super().__init__(method="files.upload", **kwargs)
230244
self.channels = channels
231245
self.initial_comment = initial_comment
232246
self.filename = filename
233247
self.filetype = filetype
234248
self.content = content
235249
self.title = title
236-
super().__init__(method="files.upload", **kwargs)
250+
self.method_version = method_version
251+
252+
@property
253+
def _method_resolver(self):
254+
if self.method_version == "v1":
255+
return self.hook.send_file
256+
return self.hook.send_file_v1_to_v2
237257

238-
def execute(self, **kwargs):
239-
self.hook.send_file(
258+
def execute(self, context: Context):
259+
self._method_resolver(
240260
channels=self.channels,
241261
# For historical reason SlackAPIFileOperator use filename as reference to file
242262
file=self.filename,

airflow/providers/slack/provider.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ versions:
5858
dependencies:
5959
- apache-airflow>=2.6.0
6060
- apache-airflow-providers-common-sql>=1.3.1
61-
- slack_sdk>=3.0.0
61+
- slack_sdk>=3.19.0
6262

6363
integrations:
6464
- integration-name: Slack

airflow/providers/slack/transfers/sql_to_slack.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
from __future__ import annotations
1818

1919
import warnings
20+
from functools import cached_property
2021
from tempfile import NamedTemporaryFile
2122
from typing import TYPE_CHECKING, Any, Mapping, Sequence
2223

24+
from typing_extensions import Literal
25+
2326
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
2427
from airflow.providers.slack.hooks.slack import SlackHook
2528
from airflow.providers.slack.transfers.base_sql_to_slack import BaseSqlToSlackOperator
@@ -53,6 +56,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
5356
:param slack_initial_comment: The message text introducing the file in specified ``slack_channels``.
5457
:param slack_title: Title of file.
5558
:param slack_base_url: A string representing the Slack API base URL. Optional
59+
:param slack_method_version: The version of the Slack SDK Client method to be used, either "v1" or "v2".
5660
:param df_kwargs: Keyword arguments forwarded to ``pandas.DataFrame.to_{format}()`` method.
5761
"""
5862

@@ -81,6 +85,7 @@ def __init__(
8185
slack_initial_comment: str | None = None,
8286
slack_title: str | None = None,
8387
slack_base_url: str | None = None,
88+
slack_method_version: Literal["v1", "v2"] = "v1",
8489
df_kwargs: dict | None = None,
8590
**kwargs,
8691
):
@@ -93,22 +98,33 @@ def __init__(
9398
self.slack_initial_comment = slack_initial_comment
9499
self.slack_title = slack_title
95100
self.slack_base_url = slack_base_url
101+
self.slack_method_version = slack_method_version
96102
self.df_kwargs = df_kwargs or {}
97103

104+
@cached_property
105+
def slack_hook(self):
106+
"""Slack API Hook."""
107+
return SlackHook(
108+
slack_conn_id=self.slack_conn_id,
109+
base_url=self.slack_base_url,
110+
timeout=self.slack_timeout,
111+
proxy=self.slack_proxy,
112+
retry_handlers=self.slack_retry_handlers,
113+
)
114+
115+
@property
116+
def _method_resolver(self):
117+
if self.slack_method_version == "v1":
118+
return self.slack_hook.send_file
119+
return self.slack_hook.send_file_v1_to_v2
120+
98121
def execute(self, context: Context) -> None:
99122
# Parse file format from filename
100123
output_file_format, _ = parse_filename(
101124
filename=self.slack_filename,
102125
supported_file_formats=self.SUPPORTED_FILE_FORMATS,
103126
)
104127

105-
slack_hook = SlackHook(
106-
slack_conn_id=self.slack_conn_id,
107-
base_url=self.slack_base_url,
108-
timeout=self.slack_timeout,
109-
proxy=self.slack_proxy,
110-
retry_handlers=self.slack_retry_handlers,
111-
)
112128
with NamedTemporaryFile(mode="w+", suffix=f"_{self.slack_filename}") as fp:
113129
# tempfile.NamedTemporaryFile used only for create and remove temporary file,
114130
# pandas will open file in correct mode itself depend on file type.
@@ -129,7 +145,7 @@ def execute(self, context: Context) -> None:
129145
# if SUPPORTED_FILE_FORMATS extended and no actual implementation for specific format.
130146
raise AirflowException(f"Unexpected output file format: {output_file_format}")
131147

132-
slack_hook.send_file(
148+
self._method_resolver(
133149
channels=self.slack_channels,
134150
file=output_file_name,
135151
filename=self.slack_filename,

0 commit comments

Comments
 (0)