Skip to content

Commit 5a0c7a6

Browse files
authored
feat: async apprise notifier (#57541)
1 parent c676b61 commit 5a0c7a6

4 files changed

Lines changed: 141 additions & 32 deletions

File tree

providers/apprise/src/airflow/providers/apprise/hooks/apprise.py

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424
import apprise
2525
from apprise import AppriseConfig, NotifyFormat, NotifyType
2626

27+
from airflow.providers.common.compat.connection import get_async_connection
2728
from airflow.providers.common.compat.sdk import BaseHook
2829

2930
if TYPE_CHECKING:
3031
from apprise import AppriseAttachment
3132

33+
from airflow.providers.common.compat.sdk import Connection
34+
3235

3336
class AppriseHook(BaseHook):
3437
"""
@@ -50,14 +53,13 @@ def __init__(self, apprise_conn_id: str = default_conn_name) -> None:
5053
super().__init__()
5154
self.apprise_conn_id = apprise_conn_id
5255

53-
def get_config_from_conn(self):
54-
conn = self.get_connection(self.apprise_conn_id)
56+
def get_config_from_conn(self, conn: Connection):
5557
config = conn.extra_dejson["config"]
5658
return json.loads(config) if isinstance(config, str) else config
5759

58-
def set_config_from_conn(self, apprise_obj: apprise.Apprise):
60+
def set_config_from_conn(self, conn: Connection, apprise_obj: apprise.Apprise):
5961
"""Set config from connection to apprise object."""
60-
config_object = self.get_config_from_conn()
62+
config_object = self.get_config_from_conn(conn=conn)
6163
if isinstance(config_object, list):
6264
for config in config_object:
6365
apprise_obj.add(config["path"], tag=config.get("tag", None))
@@ -101,7 +103,8 @@ def notify(
101103
if config:
102104
apprise_obj.add(config)
103105
else:
104-
self.set_config_from_conn(apprise_obj)
106+
conn = self.get_connection(self.apprise_conn_id)
107+
self.set_config_from_conn(conn=conn, apprise_obj=apprise_obj)
105108
apprise_obj.notify(
106109
body=body,
107110
title=title,
@@ -112,6 +115,50 @@ def notify(
112115
interpret_escapes=interpret_escapes,
113116
)
114117

118+
async def async_notify(
119+
self,
120+
body: str,
121+
title: str | None = None,
122+
notify_type: NotifyType = NotifyType.INFO,
123+
body_format: NotifyFormat = NotifyFormat.TEXT,
124+
tag: str | Iterable[str] = "all",
125+
attach: AppriseAttachment | None = None,
126+
interpret_escapes: bool | None = None,
127+
config: AppriseConfig | None = None,
128+
):
129+
r"""
130+
Send message to plugged-in services asynchronously.
131+
132+
:param body: Specify the message body
133+
:param title: Specify the message title. (optional)
134+
:param notify_type: Specify the message type (default=info). Possible values are "info",
135+
"success", "failure", and "warning"
136+
:param body_format: Specify the input message format (default=text). Possible values are "text",
137+
"html", and "markdown".
138+
:param tag: Specify one or more tags to filter which services to notify
139+
:param attach: Specify one or more file attachment locations
140+
:param interpret_escapes: Enable interpretation of backslash escapes. For example, this would convert
141+
sequences such as \n and \r to their respective ascii new-line and carriage return characters
142+
:param config: Specify one or more configuration
143+
"""
144+
title = title or ""
145+
146+
apprise_obj = apprise.Apprise()
147+
if config:
148+
apprise_obj.add(config)
149+
else:
150+
conn = await get_async_connection(self.apprise_conn_id)
151+
self.set_config_from_conn(conn=conn, apprise_obj=apprise_obj)
152+
await apprise_obj.async_notify(
153+
body=body,
154+
title=title,
155+
notify_type=notify_type,
156+
body_format=body_format,
157+
tag=tag,
158+
attach=attach,
159+
interpret_escapes=interpret_escapes,
160+
)
161+
115162
def get_conn(self) -> None:
116163
raise NotImplementedError()
117164

providers/apprise/src/airflow/providers/apprise/notifications/apprise.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from apprise import AppriseConfig, NotifyFormat, NotifyType
2424

2525
from airflow.providers.apprise.hooks.apprise import AppriseHook
26+
from airflow.providers.apprise.version_compat import AIRFLOW_V_3_1_PLUS
2627
from airflow.providers.common.compat.notifier import BaseNotifier
2728

2829

@@ -58,8 +59,13 @@ def __init__(
5859
interpret_escapes: bool | None = None,
5960
config: AppriseConfig | None = None,
6061
apprise_conn_id: str = AppriseHook.default_conn_name,
62+
**kwargs,
6163
):
62-
super().__init__()
64+
if AIRFLOW_V_3_1_PLUS:
65+
# Support for passing context was added in 3.1.0
66+
super().__init__(**kwargs)
67+
else:
68+
super().__init__()
6369
self.apprise_conn_id = apprise_conn_id
6470
self.body = body
6571
self.title = title
@@ -88,5 +94,18 @@ def notify(self, context):
8894
config=self.config,
8995
)
9096

97+
async def async_notify(self, context):
98+
"""Send a alert to a apprise configured service."""
99+
await self.hook.async_notify(
100+
body=self.body,
101+
title=self.title,
102+
notify_type=self.notify_type,
103+
body_format=self.body_format,
104+
tag=self.tag,
105+
attach=self.attach,
106+
interpret_escapes=self.interpret_escapes,
107+
config=self.config,
108+
)
109+
91110

92111
send_apprise_notification = AppriseNotifier

providers/apprise/tests/unit/apprise/hooks/test_apprise.py

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import json
2020
from unittest import mock
21-
from unittest.mock import MagicMock, call, patch
21+
from unittest.mock import AsyncMock, MagicMock, call, patch
2222

2323
import apprise
2424
import pytest
@@ -42,13 +42,9 @@ class TestAppriseHook:
4242
)
4343
def test_get_config_from_conn(self, config):
4444
extra = {"config": config}
45-
with patch.object(
46-
AppriseHook,
47-
"get_connection",
48-
return_value=Connection(conn_type="apprise", extra=extra),
49-
):
50-
hook = AppriseHook()
51-
assert hook.get_config_from_conn() == (json.loads(config) if isinstance(config, str) else config)
45+
conn = Connection(conn_type="apprise", extra=extra)
46+
hook = AppriseHook()
47+
assert hook.get_config_from_conn(conn) == (json.loads(config) if isinstance(config, str) else config)
5248

5349
def test_set_config_from_conn_with_dict(self):
5450
"""
@@ -57,13 +53,9 @@ def test_set_config_from_conn_with_dict(self):
5753
extra = {"config": {"path": "http://some_path_that_dont_exist/", "tag": "alert"}}
5854
apprise_obj = apprise.Apprise()
5955
apprise_obj.add = MagicMock()
60-
with patch.object(
61-
AppriseHook,
62-
"get_connection",
63-
return_value=Connection(conn_type="apprise", extra=extra),
64-
):
65-
hook = AppriseHook()
66-
hook.set_config_from_conn(apprise_obj)
56+
conn = Connection(conn_type="apprise", extra=extra)
57+
hook = AppriseHook()
58+
hook.set_config_from_conn(conn=conn, apprise_obj=apprise_obj)
6759

6860
apprise_obj.add.assert_called_once_with("http://some_path_that_dont_exist/", tag="alert")
6961

@@ -80,13 +72,9 @@ def test_set_config_from_conn_with_list(self):
8072

8173
apprise_obj = apprise.Apprise()
8274
apprise_obj.add = MagicMock()
83-
with patch.object(
84-
AppriseHook,
85-
"get_connection",
86-
return_value=Connection(conn_type="apprise", extra=extra),
87-
):
88-
hook = AppriseHook()
89-
hook.set_config_from_conn(apprise_obj)
75+
conn = Connection(conn_type="apprise", extra=extra)
76+
hook = AppriseHook()
77+
hook.set_config_from_conn(conn=conn, apprise_obj=apprise_obj)
9078

9179
apprise_obj.add.assert_has_calls(
9280
[
@@ -97,17 +85,17 @@ def test_set_config_from_conn_with_list(self):
9785

9886
@mock.patch(
9987
"airflow.providers.apprise.hooks.apprise.AppriseHook.get_connection",
100-
return_value=Connection(
88+
)
89+
def test_notify(self, mock_conn):
90+
mock_conn.return_value = Connection(
10191
conn_id="apprise",
10292
extra={
10393
"config": [
10494
{"path": "http://some_path_that_dont_exist/", "tag": "p0"},
10595
{"path": "http://some_other_path_that_dont_exist/", "tag": "p1"},
10696
]
10797
},
108-
),
109-
)
110-
def test_notify(self, connection):
98+
)
11199
apprise_obj = apprise.Apprise()
112100
apprise_obj.notify = MagicMock()
113101
apprise_obj.add = MagicMock()
@@ -124,3 +112,35 @@ def test_notify(self, connection):
124112
attach=None,
125113
interpret_escapes=None,
126114
)
115+
116+
@pytest.mark.asyncio
117+
@mock.patch(
118+
"airflow.providers.apprise.hooks.apprise.get_async_connection",
119+
)
120+
async def test_async_notify(self, mock_conn):
121+
mock_conn.return_value = Connection(
122+
conn_id="apprise",
123+
extra={
124+
"config": [
125+
{"path": "http://some_path_that_dont_exist/", "tag": "p0"},
126+
{"path": "http://some_other_path_that_dont_exist/", "tag": "p1"},
127+
]
128+
},
129+
)
130+
apprise_obj = apprise.Apprise()
131+
apprise_obj.async_notify = AsyncMock()
132+
apprise_obj.add = MagicMock()
133+
with patch.object(apprise, "Apprise", return_value=apprise_obj):
134+
hook = AppriseHook()
135+
await hook.async_notify(body="test")
136+
137+
mock_conn.assert_called()
138+
apprise_obj.async_notify.assert_called_once_with(
139+
body="test",
140+
title="",
141+
notify_type=NotifyType.INFO,
142+
body_format=NotifyFormat.TEXT,
143+
tag="all",
144+
attach=None,
145+
interpret_escapes=None,
146+
)

providers/apprise/tests/unit/apprise/notifications/test_apprise.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,26 @@ def test_notifier_templated(self, mock_apprise_hook, create_dag_without_db):
9898
"config": None,
9999
}
100100
mock_apprise_hook.return_value.notify.assert_called_once()
101+
102+
@pytest.mark.asyncio
103+
@mock.patch("airflow.providers.apprise.notifications.apprise.AppriseHook")
104+
async def test_async_apprise_notifier(self, mock_apprise_hook, create_dag_without_db):
105+
mock_apprise_hook.return_value.async_notify = mock.AsyncMock()
106+
107+
notifier = send_apprise_notification(body="DISK at 99%", notify_type=NotifyType.FAILURE)
108+
109+
await notifier.async_notify({"dag": create_dag_without_db("test_notifier")})
110+
111+
call_args = mock_apprise_hook.return_value.async_notify.call_args.kwargs
112+
113+
assert call_args == {
114+
"body": "DISK at 99%",
115+
"notify_type": NotifyType.FAILURE,
116+
"title": None,
117+
"body_format": NotifyFormat.TEXT,
118+
"tag": "all",
119+
"attach": None,
120+
"interpret_escapes": None,
121+
"config": None,
122+
}
123+
mock_apprise_hook.return_value.async_notify.assert_called_once()

0 commit comments

Comments
 (0)