From 4c86521ed804d4117e318649e929e0d43ea80acc Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Thu, 12 Oct 2017 00:24:42 -0700 Subject: [PATCH 1/8] Move debug logging call in Future.add_done_callback (#4167) --- pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index bdca8dec004b..b53706083fe0 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -30,6 +30,11 @@ logger = logging.getLogger(__name__) +def _callback_completed(future): + """Simple callback that just logs a `Future`'s result.""" + logger.debug('Result: %s', future.result()) + + class Policy(base.BasePolicy): """A consumer class based on :class:`threading.Thread`. @@ -144,4 +149,4 @@ def on_response(self, response): logger.debug(self._callback) message = Message(msg.message, msg.ack_id, self._request_queue) future = self._executor.submit(self._callback, message) - logger.debug('Result: %s' % future.result()) + future.add_done_callback(_callback_completed) From 133db15011a827bfa1171940c067ae002c3495cf Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Fri, 13 Oct 2017 18:36:50 -0700 Subject: [PATCH 2/8] Add PubSub system test for fix in #4174 --- pubsub/tests/system.py | 65 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 02666eae676a..324fced79dc2 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -14,6 +14,7 @@ from __future__ import absolute_import +from datetime import datetime import time import uuid @@ -103,4 +104,66 @@ def test_subscribe_to_messages(): assert callback.call_count >= 50 finally: publisher.delete_topic(topic_name) - subscriber.delete_subscription(sub_name) + + +def test_subscribe_to_messages_async_callbacks(): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + topic_name = _resource_name('topic') + sub_name = _resource_name('subscription') + + try: + # Create a topic. + publisher.create_topic(topic_name) + + # Subscribe to the topic. This must happen before the messages + # are published. + subscriber.create_subscription(sub_name, topic_name) + subscription = subscriber.subscribe(sub_name) + + # Publish some messages. + futures = [publisher.publish( + topic_name, + b'Wooooo! The claaaaaw!', + num=str(i), + ) for i in range(0, 2)] + + # Make sure the publish completes. + [f.result() for f in futures] + + # We want to make sure that the callback was called asynchronously. So + # track when each call happened and make sure below. + call_times = [] + + def process_message(message): + # list.append() is thread-safe. + call_times.append(datetime.now()) + time.sleep(2) + message.ack() + + callback = mock.Mock(wraps=process_message) + side_effect = mock.Mock() + callback.side_effect = side_effect + + # Actually open the subscription and hold it open for a few seconds. + subscription.open(callback) + for second in range(0, 5): + time.sleep(4) + + # The callback should have fired at least two times, but it may + # take some time. + if callback.call_count >= 2: + first = min(call_times[:2]) + last = max(call_times[:2]) + diff = last - first + # "Ensure" the first two callbacks were executed asynchronously + # (sequentially would have resulted in a difference of 2+ + # seconds). + assert diff.days == 0 + assert diff.seconds < 2 + assert side_effect.call_count >= 2 + + # Okay, we took too long; fail out. + assert callback.call_count >= 2 + finally: + publisher.delete_topic(topic_name) From 905fe6f27195e6ce43101124a8319ac2c932f62f Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Fri, 13 Oct 2017 18:58:15 -0700 Subject: [PATCH 3/8] Close race condition in PubSub system test for fix in #4174 --- pubsub/tests/system.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 324fced79dc2..7798983c9299 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -152,7 +152,7 @@ def process_message(message): # The callback should have fired at least two times, but it may # take some time. - if callback.call_count >= 2: + if callback.call_count >= 2 and side_effect.call_count >= 2: first = min(call_times[:2]) last = max(call_times[:2]) diff = last - first @@ -161,7 +161,6 @@ def process_message(message): # seconds). assert diff.days == 0 assert diff.seconds < 2 - assert side_effect.call_count >= 2 # Okay, we took too long; fail out. assert callback.call_count >= 2 From c48f3760a20b1948e78c164d1bda060889dcf28f Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Fri, 13 Oct 2017 19:16:42 -0700 Subject: [PATCH 4/8] Add sleep for 1 sec to test_policy_thread.test_on_response to allow callbacks to complete. Guessing this test was succeeding before because of the bug fixed in #4174 itself. --- pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 76aec184815e..6efc0fc1a22c 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -17,6 +17,7 @@ from concurrent import futures import queue import threading +import time import grpc @@ -115,6 +116,7 @@ def test_on_response(): # Actually run the method and prove that the callback was # called in the expected way. policy.on_response(response) + time.sleep(1) assert callback.call_count == 2 for call in callback.mock_calls: assert isinstance(call[1][0], message.Message) From b750437dc457d7cc99859091933652ffaa692a2c Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Fri, 13 Oct 2017 20:22:12 -0700 Subject: [PATCH 5/8] Import module instead of type, per code review --- pubsub/tests/system.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 7798983c9299..ec38927f3a9d 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -14,7 +14,7 @@ from __future__ import absolute_import -from datetime import datetime +import datetime import time import uuid @@ -137,7 +137,7 @@ def test_subscribe_to_messages_async_callbacks(): def process_message(message): # list.append() is thread-safe. - call_times.append(datetime.now()) + call_times.append(datetime.datetime.now()) time.sleep(2) message.ack() From 97afe3e3733e8909e274fa4921e67b0130e50740 Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Mon, 16 Oct 2017 16:58:47 -0700 Subject: [PATCH 6/8] Replace potentially flaky time.sleep with proper use of mock-based assertions in test_on_response, per code review --- .../subscriber/test_policy_thread.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 6efc0fc1a22c..0dbc55b15329 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -17,7 +17,6 @@ from concurrent import futures import queue import threading -import time import grpc @@ -95,8 +94,15 @@ def test_on_exception_other(): def test_on_response(): callback = mock.Mock(spec=()) + # TODO Create mock ThreadPoolExecutor, pass into create_policy(), and + # verify that both executor.submit() and future.add_done_callback are + # called twice. + future = mock.Mock() + attrs = {'submit.return_value': future} + executor = mock.Mock(**attrs) + # Set up the policy. - policy = create_policy() + policy = create_policy(executor=executor) policy._callback = callback # Set up the messages to send. @@ -113,10 +119,18 @@ def test_on_response(): ], ) - # Actually run the method and prove that the callback was - # called in the expected way. + # Actually run the method and prove that executor.submit and + # future.add_done_callback were called in the expected way. policy.on_response(response) - time.sleep(1) - assert callback.call_count == 2 - for call in callback.mock_calls: - assert isinstance(call[1][0], message.Message) + + submit_calls = [m for m in executor.method_calls if m[0] == 'submit'] + assert len(submit_calls) == 2 + for call in submit_calls: + assert call[1][0] == callback + assert isinstance(call[1][1], message.Message) + + add_done_callback_calls = [ + m for m in future.method_calls if m[0] == 'add_done_callback'] + assert len(add_done_callback_calls) == 2 + for call in add_done_callback_calls: + assert call[1][0] == thread._callback_completed From ec7454e38597dc40e8fa07afb500c028e7225d96 Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Mon, 16 Oct 2017 20:04:28 -0700 Subject: [PATCH 7/8] Add unit test for thread._callback_completed for 100% coverage --- .../tests/unit/pubsub_v1/subscriber/test_policy_thread.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 0dbc55b15329..eca6ab4628e8 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -134,3 +134,10 @@ def test_on_response(): assert len(add_done_callback_calls) == 2 for call in add_done_callback_calls: assert call[1][0] == thread._callback_completed + + +def test__callback_completed(): + future = mock.Mock() + thread._callback_completed(future) + result_calls = [m for m in future.method_calls if m[0] == 'result'] + assert len(result_calls) == 1 From 3b8ef2e0ff61d62493d8cc1ad350d98c16b6a036 Mon Sep 17 00:00:00 2001 From: Jeffrey Scott Keone Payne Date: Mon, 16 Oct 2017 20:47:54 -0700 Subject: [PATCH 8/8] Comment change --- .../tests/unit/pubsub_v1/subscriber/test_policy_thread.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index eca6ab4628e8..c86b54e4edd2 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -94,9 +94,9 @@ def test_on_exception_other(): def test_on_response(): callback = mock.Mock(spec=()) - # TODO Create mock ThreadPoolExecutor, pass into create_policy(), and - # verify that both executor.submit() and future.add_done_callback are - # called twice. + # Create mock ThreadPoolExecutor, pass into create_policy(), and verify + # that both executor.submit() and future.add_done_callback are called + # twice. future = mock.Mock() attrs = {'submit.return_value': future} executor = mock.Mock(**attrs)