diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 02666eae676a..ec38927f3a9d 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -14,6 +14,7 @@ from __future__ import absolute_import +import datetime import time import uuid @@ -103,4 +104,65 @@ def test_subscribe_to_messages(): assert callback.call_count >= 50 finally: publisher.delete_topic(topic_name) - subscriber.delete_subscription(sub_name) + + +def test_subscribe_to_messages_async_callbacks(): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + topic_name = _resource_name('topic') + sub_name = _resource_name('subscription') + + try: + # Create a topic. + publisher.create_topic(topic_name) + + # Subscribe to the topic. This must happen before the messages + # are published. + subscriber.create_subscription(sub_name, topic_name) + subscription = subscriber.subscribe(sub_name) + + # Publish some messages. + futures = [publisher.publish( + topic_name, + b'Wooooo! The claaaaaw!', + num=str(i), + ) for i in range(0, 2)] + + # Make sure the publish completes. + [f.result() for f in futures] + + # We want to make sure that the callback was called asynchronously. So + # track when each call happened and make sure below. + call_times = [] + + def process_message(message): + # list.append() is thread-safe. + call_times.append(datetime.datetime.now()) + time.sleep(2) + message.ack() + + callback = mock.Mock(wraps=process_message) + side_effect = mock.Mock() + callback.side_effect = side_effect + + # Actually open the subscription and hold it open for a few seconds. + subscription.open(callback) + for second in range(0, 5): + time.sleep(4) + + # The callback should have fired at least two times, but it may + # take some time. + if callback.call_count >= 2 and side_effect.call_count >= 2: + first = min(call_times[:2]) + last = max(call_times[:2]) + diff = last - first + # "Ensure" the first two callbacks were executed asynchronously + # (sequentially would have resulted in a difference of 2+ + # seconds). + assert diff.days == 0 + assert diff.seconds < 2 + + # Okay, we took too long; fail out. + assert callback.call_count >= 2 + finally: + publisher.delete_topic(topic_name) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index 76aec184815e..c86b54e4edd2 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -94,8 +94,15 @@ def test_on_exception_other(): def test_on_response(): callback = mock.Mock(spec=()) + # Create mock ThreadPoolExecutor, pass into create_policy(), and verify + # that both executor.submit() and future.add_done_callback are called + # twice. + future = mock.Mock() + attrs = {'submit.return_value': future} + executor = mock.Mock(**attrs) + # Set up the policy. - policy = create_policy() + policy = create_policy(executor=executor) policy._callback = callback # Set up the messages to send. @@ -112,9 +119,25 @@ def test_on_response(): ], ) - # Actually run the method and prove that the callback was - # called in the expected way. + # Actually run the method and prove that executor.submit and + # future.add_done_callback were called in the expected way. policy.on_response(response) - assert callback.call_count == 2 - for call in callback.mock_calls: - assert isinstance(call[1][0], message.Message) + + submit_calls = [m for m in executor.method_calls if m[0] == 'submit'] + assert len(submit_calls) == 2 + for call in submit_calls: + assert call[1][0] == callback + assert isinstance(call[1][1], message.Message) + + add_done_callback_calls = [ + m for m in future.method_calls if m[0] == 'add_done_callback'] + assert len(add_done_callback_calls) == 2 + for call in add_done_callback_calls: + assert call[1][0] == thread._callback_completed + + +def test__callback_completed(): + future = mock.Mock() + thread._callback_completed(future) + result_calls = [m for m in future.method_calls if m[0] == 'result'] + assert len(result_calls) == 1