Skip to content

Commit d6969a4

Browse files
authored
Merge pull request #5338 from khushboobhatia01/consumer_shutdown
Shutdown kombu consumer on worker shutdown
2 parents 4cfbc02 + 0d36dbb commit d6969a4

File tree

3 files changed

+74
-0
lines changed

3 files changed

+74
-0
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ Changed
3333

3434
Contributed by @lukepatrick
3535

36+
* Actionrunner worker shutdown should stop Kombu consumer thread. #5338
37+
38+
Contributed by @khushboobhatia01
39+
3640
3.5.0 - June 23, 2021
3741
---------------------
3842

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2021 The StackStorm Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import absolute_import
16+
17+
import random
18+
import eventlet
19+
20+
from kombu import Exchange
21+
from kombu import Queue
22+
from unittest2 import TestCase
23+
24+
from st2common.transport.consumers import ActionsQueueConsumer
25+
from st2common.transport.publishers import PoolPublisher
26+
from st2common.transport import utils as transport_utils
27+
from st2common.models.db.liveaction import LiveActionDB
28+
29+
__all__ = ["ActionsQueueConsumerTestCase"]
30+
31+
32+
class ActionsQueueConsumerTestCase(TestCase):
33+
message_count = 0
34+
message_type = LiveActionDB
35+
36+
def test_stop_consumption_on_shutdown(self):
37+
exchange = Exchange("st2.execution.test", type="topic")
38+
queue_name = "test-" + str(random.randint(1, 10000))
39+
queue = Queue(
40+
name=queue_name, exchange=exchange, routing_key="#", auto_delete=True
41+
)
42+
publisher = PoolPublisher()
43+
with transport_utils.get_connection() as connection:
44+
connection.connect()
45+
watcher = ActionsQueueConsumer(
46+
connection=connection, queues=queue, handler=self
47+
)
48+
watcher_thread = eventlet.greenthread.spawn(watcher.run)
49+
50+
# Give it some time to start up since we are publishing on a new queue
51+
eventlet.sleep(0.5)
52+
body = LiveActionDB(
53+
status="scheduled", action="core.local", action_is_workflow=False
54+
)
55+
publisher.publish(payload=body, exchange=exchange)
56+
eventlet.sleep(0.2)
57+
self.assertEqual(self.message_count, 1)
58+
body = LiveActionDB(
59+
status="scheduled", action="core.local", action_is_workflow=True
60+
)
61+
watcher.shutdown()
62+
eventlet.sleep(1)
63+
publisher.publish(payload=body, exchange=exchange)
64+
# Second published message won't be consumed.
65+
self.assertEqual(self.message_count, 1)
66+
watcher_thread.kill()
67+
68+
def process(self, liveaction):
69+
self.message_count = self.message_count + 1

st2common/st2common/transport/consumers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def process(self, body, message):
156156
def shutdown(self):
157157
self._workflows_dispatcher.shutdown()
158158
self._actions_dispatcher.shutdown()
159+
self.should_stop = True
159160

160161

161162
class VariableMessageQueueConsumer(QueueConsumer):

0 commit comments

Comments
 (0)