diff --git a/docs/pubsub_snippets.py b/docs/pubsub_snippets.py index 039effd42088..2f3fb0705a81 100644 --- a/docs/pubsub_snippets.py +++ b/docs/pubsub_snippets.py @@ -60,14 +60,8 @@ def do_something_with(sub): # pylint: disable=unused-argument pass # [START client_list_subscriptions] - subscriptions, token = client.list_subscriptions() # API request - while True: - for subscription in subscriptions: - do_something_with(subscription) - if token is None: - break - subscriptions, token = client.list_subscriptions( - page_token=token) # API request + for subscription in client.list_subscriptions(): # API request(s) + do_something_with(subscription) # [END client_list_subscriptions] diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 0cd2e485010d..bb700f7018c5 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -14,12 +14,15 @@ """GAX wrapper for Pubsub API requests.""" +import functools + from google.cloud.gapic.pubsub.v1.publisher_api import PublisherApi from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi from google.gax import CallOptions from google.gax import INITIAL_PAGE from google.gax.errors import GaxError from google.gax.grpc import exc_to_code +from google.protobuf.json_format import MessageToDict from google.pubsub.v1.pubsub_pb2 import PubsubMessage from google.pubsub.v1.pubsub_pb2 import PushConfig from grpc import insecure_channel @@ -77,12 +80,7 @@ def list_topics(self, project, page_size=0, page_token=None): path = 'projects/%s' % (project,) page_iter = self._gax_api.list_topics( path, page_size=page_size, options=options) - - iter_kwargs = {} - if page_size: # page_size can be 0 or explicit None. - iter_kwargs['max_results'] = page_size - return GAXIterator(self._client, page_iter, _item_to_topic, - **iter_kwargs) + return GAXIterator(self._client, page_iter, _item_to_topic) def topic_create(self, topic_path): """API call: create a topic @@ -214,11 +212,8 @@ def topic_list_subscriptions(self, topic, page_size=0, page_token=None): raise NotFound(topic_path) raise - iter_kwargs = {} - if page_size: # page_size can be 0 or explicit None. - iter_kwargs['max_results'] = page_size iterator = GAXIterator(self._client, page_iter, - _item_to_subscription, **iter_kwargs) + _item_to_subscription_for_topic) iterator.topic = topic return iterator @@ -228,9 +223,13 @@ class _SubscriberAPI(object): :type gax_api: :class:`google.pubsub.v1.publisher_api.SubscriberApi` :param gax_api: API object used to make GAX requests. + + :type client: :class:`~google.cloud.pubsub.client.Client` + :param client: The client that owns this API object. """ - def __init__(self, gax_api): + def __init__(self, gax_api, client): self._gax_api = gax_api + self._client = client def list_subscriptions(self, project, page_size=0, page_token=None): """List subscriptions for the project associated with this API. @@ -250,11 +249,10 @@ def list_subscriptions(self, project, page_size=0, page_token=None): If not passed, the API will return the first page of subscriptions. - :rtype: tuple, (list, str) - :returns: list of ``Subscription`` resource dicts, plus a - "next page token" string: if not None, indicates that - more topics can be retrieved with another call (pass that - value as ``page_token``). + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of + :class:`~google.cloud.pubsub.subscription.Subscription` + accessible to the current API. """ if page_token is None: page_token = INITIAL_PAGE @@ -262,10 +260,14 @@ def list_subscriptions(self, project, page_size=0, page_token=None): path = 'projects/%s' % (project,) page_iter = self._gax_api.list_subscriptions( path, page_size=page_size, options=options) - subscriptions = [_subscription_pb_to_mapping(sub_pb) - for sub_pb in page_iter.next()] - token = page_iter.page_token or None - return subscriptions, token + + # We attach a mutable topics dictionary so that as topic + # objects are created by Subscription.from_api_repr, they + # can be re-used by other subscriptions from the same topic. + topics = {} + item_to_value = functools.partial( + _item_to_sub_for_client, topics=topics) + return GAXIterator(self._client, page_iter, item_to_value) def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): @@ -313,7 +315,7 @@ def subscription_create(self, subscription_path, topic_path, if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: raise Conflict(topic_path) raise - return _subscription_pb_to_mapping(sub_pb) + return MessageToDict(sub_pb) def subscription_get(self, subscription_path): """API call: retrieve a subscription @@ -335,7 +337,7 @@ def subscription_get(self, subscription_path): if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(subscription_path) raise - return _subscription_pb_to_mapping(sub_pb) + return MessageToDict(sub_pb) def subscription_delete(self, subscription_path): """API call: delete a subscription @@ -474,24 +476,6 @@ def _message_pb_from_mapping(message): attributes=message['attributes']) -def _subscription_pb_to_mapping(sub_pb): - """Helper for :meth:`list_subscriptions`, et aliae - - Performs "impedance matching" between the protobuf attrs and the keys - expected in the JSON API. - """ - mapping = { - 'name': sub_pb.name, - 'topic': sub_pb.topic, - 'ackDeadlineSeconds': sub_pb.ack_deadline_seconds, - } - if sub_pb.push_config.push_endpoint != '': - mapping['pushConfig'] = { - 'pushEndpoint': sub_pb.push_config.push_endpoint, - } - return mapping - - def _message_pb_to_mapping(message_pb): """Helper for :meth:`pull`, et aliae @@ -576,7 +560,7 @@ def _item_to_topic(iterator, resource): {'name': resource.name}, iterator.client) -def _item_to_subscription(iterator, subscription_path): +def _item_to_subscription_for_topic(iterator, subscription_path): """Convert a subscription name to the native object. :type iterator: :class:`~google.cloud.iterator.Iterator` @@ -591,3 +575,33 @@ def _item_to_subscription(iterator, subscription_path): subscription_name = subscription_name_from_path( subscription_path, iterator.client.project) return Subscription(subscription_name, iterator.topic) + + +def _item_to_sub_for_client(iterator, sub_pb, topics): + """Convert a subscription protobuf to the native object. + + .. note:: + + This method does not have the correct signature to be used as + the ``item_to_value`` argument to + :class:`~google.cloud.iterator.Iterator`. It is intended to be + patched with a mutable topics argument that can be updated + on subsequent calls. For an example, see how the method is + used above in :meth:`_SubscriberAPI.list_subscriptions`. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type sub_pb: :class:`~google.pubsub.v1.pubsub_pb2.Subscription` + :param sub_pb: A subscription returned from the API. + + :type topics: dict + :param topics: A dictionary of topics to be used (and modified) + as new subscriptions are created bound to topics. + + :rtype: :class:`~google.cloud.pubsub.subscription.Subscription` + :returns: The next subscription in the page. + """ + resource = MessageToDict(sub_pb) + return Subscription.from_api_repr( + resource, iterator.client, topics=topics) diff --git a/pubsub/google/cloud/pubsub/client.py b/pubsub/google/cloud/pubsub/client.py index 8c7272266660..ec909fda4f2e 100644 --- a/pubsub/google/cloud/pubsub/client.py +++ b/pubsub/google/cloud/pubsub/client.py @@ -22,7 +22,6 @@ from google.cloud.pubsub.connection import _PublisherAPI as JSONPublisherAPI from google.cloud.pubsub.connection import _SubscriberAPI as JSONSubscriberAPI from google.cloud.pubsub.connection import _IAMPolicyAPI -from google.cloud.pubsub.subscription import Subscription from google.cloud.pubsub.topic import Topic try: @@ -98,9 +97,9 @@ def subscriber_api(self): if self._subscriber_api is None: if self._use_gax: generated = make_gax_subscriber_api(self.connection) - self._subscriber_api = GAXSubscriberAPI(generated) + self._subscriber_api = GAXSubscriberAPI(generated, self) else: - self._subscriber_api = JSONSubscriberAPI(self.connection) + self._subscriber_api = JSONSubscriberAPI(self) return self._subscriber_api @property @@ -160,20 +159,14 @@ def list_subscriptions(self, page_size=None, page_token=None): passed, the API will return the first page of topics. - :rtype: tuple, (list, str) - :returns: list of :class:`~.pubsub.subscription.Subscription`, - plus a "next page token" string: if not None, indicates that - more topics can be retrieved with another call (pass that - value as ``page_token``). + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of + :class:`~google.cloud.pubsub.subscription.Subscription` + accessible to the current client. """ api = self.subscriber_api - resources, next_token = api.list_subscriptions( + return api.list_subscriptions( self.project, page_size, page_token) - topics = {} - subscriptions = [Subscription.from_api_repr(resource, self, - topics=topics) - for resource in resources] - return subscriptions, next_token def topic(self, name, timestamp_messages=False): """Creates a topic bound to the current client. diff --git a/pubsub/google/cloud/pubsub/connection.py b/pubsub/google/cloud/pubsub/connection.py index 44f4cc048206..4946ff37bd12 100644 --- a/pubsub/google/cloud/pubsub/connection.py +++ b/pubsub/google/cloud/pubsub/connection.py @@ -15,6 +15,7 @@ """Create / interact with Google Cloud Pub/Sub connections.""" import base64 +import functools import os from google.cloud import connection as base_connection @@ -238,7 +239,8 @@ def topic_list_subscriptions(self, topic, page_size=None, page_token=None): iterator = HTTPIterator( client=self._client, path=path, - item_to_value=_item_to_subscription, items_key='subscriptions', + item_to_value=_item_to_subscription_for_topic, + items_key='subscriptions', page_token=page_token, extra_params=extra_params) iterator.topic = topic return iterator @@ -247,12 +249,13 @@ def topic_list_subscriptions(self, topic, page_size=None, page_token=None): class _SubscriberAPI(object): """Helper mapping subscriber-related APIs. - :type connection: :class:`Connection` - :param connection: the connection used to make API requests. + :type client: :class:`~google.cloud.pubsub.client.Client` + :param client: the client used to make API requests. """ - def __init__(self, connection): - self._connection = connection + def __init__(self, client): + self._client = client + self._connection = client.connection def list_subscriptions(self, project, page_size=None, page_token=None): """API call: list subscriptions for a given project @@ -272,24 +275,26 @@ def list_subscriptions(self, project, page_size=None, page_token=None): If not passed, the API will return the first page of subscriptions. - :rtype: tuple, (list, str) - :returns: list of ``Subscription`` resource dicts, plus a - "next page token" string: if not None, indicates that - more subscriptions can be retrieved with another call (pass - that value as ``page_token``). + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of + :class:`~google.cloud.pubsub.subscription.Subscription` + accessible to the current API. """ - conn = self._connection - params = {} - + extra_params = {} if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - + extra_params['pageSize'] = page_size path = '/projects/%s/subscriptions' % (project,) - resp = conn.api_request(method='GET', path=path, query_params=params) - return resp.get('subscriptions', ()), resp.get('nextPageToken') + + # We attach a mutable topics dictionary so that as topic + # objects are created by Subscription.from_api_repr, they + # can be re-used by other subscriptions from the same topic. + topics = {} + item_to_value = functools.partial( + _item_to_sub_for_client, topics=topics) + return HTTPIterator( + client=self._client, path=path, item_to_value=item_to_value, + items_key='subscriptions', page_token=page_token, + extra_params=extra_params) def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): @@ -590,7 +595,7 @@ def _item_to_topic(iterator, resource): return Topic.from_api_repr(resource, iterator.client) -def _item_to_subscription(iterator, subscription_path): +def _item_to_subscription_for_topic(iterator, subscription_path): """Convert a subscription name to the native object. :type iterator: :class:`~google.cloud.iterator.Iterator` @@ -605,3 +610,32 @@ def _item_to_subscription(iterator, subscription_path): subscription_name = subscription_name_from_path( subscription_path, iterator.client.project) return Subscription(subscription_name, iterator.topic) + + +def _item_to_sub_for_client(iterator, resource, topics): + """Convert a subscription to the native object. + + .. note:: + + This method does not have the correct signature to be used as + the ``item_to_value`` argument to + :class:`~google.cloud.iterator.Iterator`. It is intended to be + patched with a mutable topics argument that can be updated + on subsequent calls. For an example, see how the method is + used above in :meth:`_SubscriberAPI.list_subscriptions`. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type resource: dict + :param resource: A subscription returned from the API. + + :type topics: dict + :param topics: A dictionary of topics to be used (and modified) + as new subscriptions are created bound to topics. + + :rtype: :class:`~google.cloud.pubsub.subscription.Subscription` + :returns: The next subscription in the page. + """ + return Subscription.from_api_repr( + resource, iterator.client, topics=topics) diff --git a/pubsub/unit_tests/test__gax.py b/pubsub/unit_tests/test__gax.py index 52f6b051466b..c47b1cd24111 100644 --- a/pubsub/unit_tests/test__gax.py +++ b/pubsub/unit_tests/test__gax.py @@ -407,30 +407,48 @@ def _getTargetClass(self): def test_ctor(self): gax_api = _GAXSubscriberAPI() - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) self.assertIs(api._gax_api, gax_api) + self.assertIs(api._client, client) def test_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE + from google.pubsub.v1.pubsub_pb2 import PushConfig + from google.pubsub.v1.pubsub_pb2 import Subscription as SubscriptionPB from google.cloud._testing import _GAXPageIterator + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.subscription import Subscription + from google.cloud.pubsub.topic import Topic - sub_pb = _SubscriptionPB( - self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0) + push_cfg_pb = PushConfig(push_endpoint=self.PUSH_ENDPOINT) + local_sub_path = '%s/subscriptions/%s' % ( + self.PROJECT_PATH, self.SUB_NAME) + sub_pb = SubscriptionPB(name=local_sub_path, topic=self.TOPIC_PATH, + push_config=push_cfg_pb) response = _GAXPageIterator([sub_pb]) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) - api = self._makeOne(gax_api) + creds = _Credentials() + client = Client(project=self.PROJECT, credentials=creds) + api = self._makeOne(gax_api, client) - subscriptions, next_token = api.list_subscriptions(self.PROJECT) + iterator = api.list_subscriptions(self.PROJECT) + subscriptions = list(iterator) + next_token = iterator.next_page_token + # Check the token returned. + self.assertIsNone(next_token) + # Check the subscription object returned. self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] - self.assertIsInstance(subscription, dict) - self.assertEqual(subscription['name'], self.SUB_PATH) - self.assertEqual(subscription['topic'], self.TOPIC_PATH) - self.assertEqual(subscription['pushConfig'], - {'pushEndpoint': self.PUSH_ENDPOINT}) - self.assertEqual(subscription['ackDeadlineSeconds'], 0) - self.assertIsNone(next_token) + self.assertIsInstance(subscription, Subscription) + self.assertEqual(subscription.name, self.SUB_NAME) + self.assertIsInstance(subscription.topic, Topic) + self.assertEqual(subscription.topic.name, self.TOPIC_NAME) + self.assertIs(subscription._client, client) + self.assertEqual(subscription._project, self.PROJECT) + self.assertIsNone(subscription.ack_deadline) + self.assertEqual(subscription.push_endpoint, self.PUSH_ENDPOINT) name, page_size, options = gax_api._list_subscriptions_called_with self.assertEqual(name, self.PROJECT_PATH) @@ -438,28 +456,46 @@ def test_list_subscriptions_no_paging(self): self.assertIs(options.page_token, INITIAL_PAGE) def test_list_subscriptions_with_paging(self): + from google.pubsub.v1.pubsub_pb2 import PushConfig + from google.pubsub.v1.pubsub_pb2 import Subscription as SubscriptionPB from google.cloud._testing import _GAXPageIterator + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.subscription import Subscription + from google.cloud.pubsub.topic import Topic + SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - sub_pb = _SubscriptionPB( - self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0) + push_cfg_pb = PushConfig(push_endpoint=self.PUSH_ENDPOINT) + local_sub_path = '%s/subscriptions/%s' % ( + self.PROJECT_PATH, self.SUB_NAME) + sub_pb = SubscriptionPB(name=local_sub_path, topic=self.TOPIC_PATH, + push_config=push_cfg_pb) response = _GAXPageIterator([sub_pb], page_token=NEW_TOKEN) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + creds = _Credentials() + client = Client(project=self.PROJECT, credentials=creds) + api = self._makeOne(gax_api, client) - subscriptions, next_token = api.list_subscriptions( + iterator = api.list_subscriptions( self.PROJECT, page_size=SIZE, page_token=TOKEN) + subscriptions = list(iterator) + next_token = iterator.next_page_token + # Check the token returned. + self.assertEqual(next_token, NEW_TOKEN) + # Check the subscription object returned. self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] - self.assertIsInstance(subscription, dict) - self.assertEqual(subscription['name'], self.SUB_PATH) - self.assertEqual(subscription['topic'], self.TOPIC_PATH) - self.assertEqual(subscription['pushConfig'], - {'pushEndpoint': self.PUSH_ENDPOINT}) - self.assertEqual(subscription['ackDeadlineSeconds'], 0) - self.assertEqual(next_token, NEW_TOKEN) + self.assertIsInstance(subscription, Subscription) + self.assertEqual(subscription.name, self.SUB_NAME) + self.assertIsInstance(subscription.topic, Topic) + self.assertEqual(subscription.topic.name, self.TOPIC_NAME) + self.assertIs(subscription._client, client) + self.assertEqual(subscription._project, self.PROJECT) + self.assertIsNone(subscription.ack_deadline) + self.assertEqual(subscription.push_endpoint, self.PUSH_ENDPOINT) name, page_size, options = gax_api._list_subscriptions_called_with self.assertEqual(name, self.PROJECT_PATH) @@ -467,16 +503,18 @@ def test_list_subscriptions_with_paging(self): self.assertEqual(options.page_token, TOKEN) def test_subscription_create(self): - sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0) + from google.pubsub.v1.pubsub_pb2 import Subscription + + sub_pb = Subscription(name=self.SUB_PATH, topic=self.TOPIC_PATH) gax_api = _GAXSubscriberAPI(_create_subscription_response=sub_pb) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) resource = api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) expected = { 'name': self.SUB_PATH, 'topic': self.TOPIC_PATH, - 'ackDeadlineSeconds': 0, } self.assertEqual(resource, expected) name, topic, push_config, ack_deadline, options = ( @@ -491,7 +529,8 @@ def test_subscription_create_already_exists(self): from google.cloud.exceptions import Conflict DEADLINE = 600 gax_api = _GAXSubscriberAPI(_create_subscription_conflict=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(Conflict): api.subscription_create( @@ -508,7 +547,8 @@ def test_subscription_create_already_exists(self): def test_subscription_create_error(self): from google.gax.errors import GaxError gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) @@ -522,17 +562,21 @@ def test_subscription_create_error(self): self.assertIsNone(options) def test_subscription_get_hit(self): - sub_pb = _SubscriptionPB( - self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0) + from google.pubsub.v1.pubsub_pb2 import PushConfig + from google.pubsub.v1.pubsub_pb2 import Subscription + + push_cfg_pb = PushConfig(push_endpoint=self.PUSH_ENDPOINT) + sub_pb = Subscription(name=self.SUB_PATH, topic=self.TOPIC_PATH, + push_config=push_cfg_pb) gax_api = _GAXSubscriberAPI(_get_subscription_response=sub_pb) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) resource = api.subscription_get(self.SUB_PATH) expected = { 'name': self.SUB_PATH, 'topic': self.TOPIC_PATH, - 'ackDeadlineSeconds': 0, 'pushConfig': { 'pushEndpoint': self.PUSH_ENDPOINT, }, @@ -545,7 +589,8 @@ def test_subscription_get_hit(self): def test_subscription_get_miss(self): from google.cloud.exceptions import NotFound gax_api = _GAXSubscriberAPI() - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(NotFound): api.subscription_get(self.SUB_PATH) @@ -557,7 +602,8 @@ def test_subscription_get_miss(self): def test_subscription_get_error(self): from google.gax.errors import GaxError gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_get(self.SUB_PATH) @@ -568,7 +614,8 @@ def test_subscription_get_error(self): def test_subscription_delete_hit(self): gax_api = _GAXSubscriberAPI(_delete_subscription_ok=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) api.subscription_delete(self.TOPIC_PATH) @@ -579,7 +626,8 @@ def test_subscription_delete_hit(self): def test_subscription_delete_miss(self): from google.cloud.exceptions import NotFound gax_api = _GAXSubscriberAPI(_delete_subscription_ok=False) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(NotFound): api.subscription_delete(self.TOPIC_PATH) @@ -591,7 +639,8 @@ def test_subscription_delete_miss(self): def test_subscription_delete_error(self): from google.gax.errors import GaxError gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_delete(self.TOPIC_PATH) @@ -602,7 +651,8 @@ def test_subscription_delete_error(self): def test_subscription_modify_push_config_hit(self): gax_api = _GAXSubscriberAPI(_modify_push_config_ok=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) api.subscription_modify_push_config(self.SUB_PATH, self.PUSH_ENDPOINT) @@ -614,7 +664,8 @@ def test_subscription_modify_push_config_hit(self): def test_subscription_modify_push_config_miss(self): from google.cloud.exceptions import NotFound gax_api = _GAXSubscriberAPI() - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(NotFound): api.subscription_modify_push_config( @@ -628,7 +679,8 @@ def test_subscription_modify_push_config_miss(self): def test_subscription_modify_push_config_error(self): from google.gax.errors import GaxError gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_modify_push_config( @@ -645,6 +697,7 @@ def test_subscription_pull_explicit(self): from google.cloud._helpers import UTC from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud._helpers import _datetime_to_rfc3339 + NOW = datetime.datetime.utcnow().replace(tzinfo=UTC) NOW_PB = _datetime_to_pb_timestamp(NOW) NOW_RFC3339 = _datetime_to_rfc3339(NOW) @@ -662,7 +715,8 @@ def test_subscription_pull_explicit(self): message_pb = _PubsubMessagePB(MSG_ID, B64, {'a': 'b'}, NOW_PB) response_pb = _PullResponsePB([_ReceivedMessagePB(ACK_ID, message_pb)]) gax_api = _GAXSubscriberAPI(_pull_response=response_pb) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) MAX_MESSAGES = 10 received = api.subscription_pull( @@ -679,7 +733,8 @@ def test_subscription_pull_explicit(self): def test_subscription_pull_defaults_miss(self): from google.cloud.exceptions import NotFound gax_api = _GAXSubscriberAPI() - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(NotFound): api.subscription_pull(self.SUB_PATH) @@ -694,7 +749,8 @@ def test_subscription_pull_defaults_miss(self): def test_subscription_pull_defaults_error(self): from google.gax.errors import GaxError gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_pull(self.SUB_PATH) @@ -710,7 +766,8 @@ def test_subscription_acknowledge_hit(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' gax_api = _GAXSubscriberAPI(_acknowledge_ok=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) @@ -724,7 +781,8 @@ def test_subscription_acknowledge_miss(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' gax_api = _GAXSubscriberAPI() - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(NotFound): api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) @@ -739,7 +797,8 @@ def test_subscription_acknowledge_error(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) @@ -754,7 +813,8 @@ def test_subscription_modify_ack_deadline_hit(self): ACK_ID2 = 'BEADCAFE' NEW_DEADLINE = 90 gax_api = _GAXSubscriberAPI(_modify_ack_deadline_ok=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) api.subscription_modify_ack_deadline( self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) @@ -772,7 +832,8 @@ def test_subscription_modify_ack_deadline_miss(self): ACK_ID2 = 'BEADCAFE' NEW_DEADLINE = 90 gax_api = _GAXSubscriberAPI() - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(NotFound): api.subscription_modify_ack_deadline( @@ -791,7 +852,8 @@ def test_subscription_modify_ack_deadline_error(self): ACK_ID2 = 'BEADCAFE' NEW_DEADLINE = 90 gax_api = _GAXSubscriberAPI(_random_gax_error=True) - api = self._makeOne(gax_api) + client = _Client(self.PROJECT) + api = self._makeOne(gax_api, client) with self.assertRaises(GaxError): api.subscription_modify_ack_deadline( @@ -1057,12 +1119,6 @@ def __init__(self, message_ids): self.message_ids = message_ids -class _PushConfigPB(object): - - def __init__(self, push_endpoint): - self.push_endpoint = push_endpoint - - class _PubsubMessagePB(object): def __init__(self, message_id, data, attributes, publish_time): @@ -1085,15 +1141,6 @@ def __init__(self, received_messages): self.received_messages = received_messages -class _SubscriptionPB(object): - - def __init__(self, name, topic, push_endpoint, ack_deadline_seconds): - self.name = name - self.topic = topic - self.push_config = _PushConfigPB(push_endpoint) - self.ack_deadline_seconds = ack_deadline_seconds - - class _Connection(object): def __init__(self, in_emulator=False, host=None): @@ -1105,3 +1152,10 @@ class _Client(object): def __init__(self, project): self.project = project + + +class _Credentials(object): + + @staticmethod + def create_scoped_required(): + return False diff --git a/pubsub/unit_tests/test_client.py b/pubsub/unit_tests/test_client.py index 4f4d597591fa..08ed1f29f2fc 100644 --- a/pubsub/unit_tests/test_client.py +++ b/pubsub/unit_tests/test_client.py @@ -126,8 +126,9 @@ def _generated_api(*args, **kw): class _GaxSubscriberAPI(object): - def __init__(self, _wrapped): + def __init__(self, _wrapped, client): self._wrapped = _wrapped + self._client = client creds = _Credentials() @@ -139,6 +140,7 @@ def __init__(self, _wrapped): self.assertIsInstance(api, _GaxSubscriberAPI) self.assertIs(api._wrapped, wrapped) + self.assertIs(api._client, client) # API instance is cached again = client.subscriber_api self.assertIs(again, api) @@ -218,29 +220,52 @@ def test_list_topics_missing_key(self): def test_list_subscriptions_no_paging(self): from google.cloud.pubsub.subscription import Subscription + from google.cloud.pubsub.topic import Topic + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} creds = _Credentials() - client = self._makeOne(project=self.PROJECT, credentials=creds) - client.connection = object() - api = client._subscriber_api = _FauxSubscriberAPI() - api._list_subscriptions_response = [SUB_INFO], None + client = self._makeOne(project=self.PROJECT, credentials=creds, + use_gax=False) + returned = {'subscriptions': [SUB_INFO]} + client.connection = _Connection(returned) - subscriptions, next_page_token = client.list_subscriptions() + iterator = client.list_subscriptions() + subscriptions = list(iterator) + next_page_token = iterator.next_page_token - self.assertEqual(len(subscriptions), 1) - self.assertIsInstance(subscriptions[0], Subscription) - self.assertEqual(subscriptions[0].name, self.SUB_NAME) - self.assertEqual(subscriptions[0].topic.name, self.TOPIC_NAME) + # Check the token returned. self.assertIsNone(next_page_token) - - self.assertEqual(api._listed_subscriptions, - (self.PROJECT, None, None)) + # Check the subscription object returned. + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, Subscription) + self.assertEqual(subscription.name, self.SUB_NAME) + self.assertIsInstance(subscription.topic, Topic) + self.assertEqual(subscription.topic.name, self.TOPIC_NAME) + self.assertIs(subscription._client, client) + self.assertEqual(subscription._project, self.PROJECT) + self.assertIsNone(subscription.ack_deadline) + self.assertIsNone(subscription.push_endpoint) + + called_with = client.connection._called_with + expected_path = '/projects/%s/subscriptions' % (self.PROJECT,) + self.assertEqual(called_with, { + 'method': 'GET', + 'path': expected_path, + 'query_params': {}, + }) def test_list_subscriptions_with_paging(self): + import six from google.cloud.pubsub.subscription import Subscription + from google.cloud.pubsub.topic import Topic + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} creds = _Credentials() - client = self._makeOne(project=self.PROJECT, credentials=creds) + client = self._makeOne(project=self.PROJECT, credentials=creds, + use_gax=False) + + # Set up the mock response. ACK_DEADLINE = 42 PUSH_ENDPOINT = 'https://push.example.com/endpoint' SUB_INFO = {'name': self.SUB_PATH, @@ -250,23 +275,42 @@ def test_list_subscriptions_with_paging(self): TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 - client.connection = object() - api = client._subscriber_api = _FauxSubscriberAPI() - api._list_subscriptions_response = [SUB_INFO], TOKEN2 + returned = { + 'subscriptions': [SUB_INFO], + 'nextPageToken': TOKEN2, + } + client.connection = _Connection(returned) - subscriptions, next_page_token = client.list_subscriptions( + iterator = client.list_subscriptions( SIZE, TOKEN1) + page = six.next(iterator.pages) + subscriptions = list(page) + next_page_token = iterator.next_page_token - self.assertEqual(len(subscriptions), 1) - self.assertIsInstance(subscriptions[0], Subscription) - self.assertEqual(subscriptions[0].name, self.SUB_NAME) - self.assertEqual(subscriptions[0].topic.name, self.TOPIC_NAME) - self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE) - self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT) + # Check the token returned. self.assertEqual(next_page_token, TOKEN2) - - self.assertEqual(api._listed_subscriptions, - (self.PROJECT, SIZE, TOKEN1)) + # Check the subscription object returned. + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, Subscription) + self.assertEqual(subscription.name, self.SUB_NAME) + self.assertIsInstance(subscription.topic, Topic) + self.assertEqual(subscription.topic.name, self.TOPIC_NAME) + self.assertIs(subscription._client, client) + self.assertEqual(subscription._project, self.PROJECT) + self.assertEqual(subscription.ack_deadline, ACK_DEADLINE) + self.assertEqual(subscription.push_endpoint, PUSH_ENDPOINT) + + called_with = client.connection._called_with + expected_path = '/projects/%s/subscriptions' % (self.PROJECT,) + self.assertEqual(called_with, { + 'method': 'GET', + 'path': expected_path, + 'query_params': { + 'pageSize': SIZE, + 'pageToken': TOKEN1, + }, + }) def test_list_subscriptions_w_missing_key(self): PROJECT = 'PROJECT' @@ -339,3 +383,16 @@ class _FauxSubscriberAPI(object): def list_subscriptions(self, project, page_size, page_token): self._listed_subscriptions = (project, page_size, page_token) return self._list_subscriptions_response + + +class _Connection(object): + + _called_with = None + + def __init__(self, *responses): + self._responses = responses + + def api_request(self, **kw): + self._called_with = kw + response, self._responses = self._responses[0], self._responses[1:] + return response diff --git a/pubsub/unit_tests/test_connection.py b/pubsub/unit_tests/test_connection.py index 28a63025c6d7..3d6f39b27113 100644 --- a/pubsub/unit_tests/test_connection.py +++ b/pubsub/unit_tests/test_connection.py @@ -417,23 +417,41 @@ def _makeOne(self, *args, **kw): def test_ctor(self): connection = _Connection() - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) self.assertIs(api._connection, connection) + self.assertIs(api._client, client) def test_list_subscriptions_no_paging(self): + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.subscription import Subscription + from google.cloud.pubsub.topic import Topic + SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} RETURNED = {'subscriptions': [SUB_INFO]} connection = _Connection(RETURNED) - api = self._makeOne(connection) + creds = _Credentials() + client = Client(project=self.PROJECT, credentials=creds) + client.connection = connection + api = self._makeOne(client) - subscriptions, next_token = api.list_subscriptions(self.PROJECT) + iterator = api.list_subscriptions(self.PROJECT) + subscriptions = list(iterator) + next_token = iterator.next_page_token + # Check the token returned. + self.assertIsNone(next_token) + # Check the subscription object returned. self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] - self.assertIsInstance(subscription, dict) - self.assertEqual(subscription['name'], self.SUB_PATH) - self.assertEqual(subscription['topic'], self.TOPIC_PATH) - self.assertIsNone(next_token) + self.assertIsInstance(subscription, Subscription) + self.assertEqual(subscription.name, self.SUB_NAME) + self.assertIsInstance(subscription.topic, Topic) + self.assertEqual(subscription.topic.name, self.TOPIC_NAME) + self.assertIs(subscription._client, client) + self.assertEqual(subscription._project, self.PROJECT) + self.assertIsNone(subscription.ack_deadline) + self.assertIsNone(subscription.push_endpoint) self.assertEqual(connection._called_with['method'], 'GET') path = '/%s' % (self.LIST_SUBSCRIPTIONS_PATH,) @@ -441,6 +459,11 @@ def test_list_subscriptions_no_paging(self): self.assertEqual(connection._called_with['query_params'], {}) def test_list_subscriptions_with_paging(self): + import six + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.subscription import Subscription + from google.cloud.pubsub.topic import Topic + TOKEN1 = 'TOKEN1' TOKEN2 = 'TOKEN2' SIZE = 1 @@ -450,17 +473,30 @@ def test_list_subscriptions_with_paging(self): 'nextPageToken': 'TOKEN2', } connection = _Connection(RETURNED) - api = self._makeOne(connection) + creds = _Credentials() + client = Client(project=self.PROJECT, credentials=creds) + client.connection = connection + api = self._makeOne(client) - subscriptions, next_token = api.list_subscriptions( + iterator = api.list_subscriptions( self.PROJECT, page_token=TOKEN1, page_size=SIZE) + page = six.next(iterator.pages) + subscriptions = list(page) + next_token = iterator.next_page_token + # Check the token returned. + self.assertEqual(next_token, TOKEN2) + # Check the subscription object returned. self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] - self.assertIsInstance(subscription, dict) - self.assertEqual(subscription['name'], self.SUB_PATH) - self.assertEqual(subscription['topic'], self.TOPIC_PATH) - self.assertEqual(next_token, TOKEN2) + self.assertIsInstance(subscription, Subscription) + self.assertEqual(subscription.name, self.SUB_NAME) + self.assertIsInstance(subscription.topic, Topic) + self.assertEqual(subscription.topic.name, self.TOPIC_NAME) + self.assertIs(subscription._client, client) + self.assertEqual(subscription._project, self.PROJECT) + self.assertIsNone(subscription.ack_deadline) + self.assertIsNone(subscription.push_endpoint) self.assertEqual(connection._called_with['method'], 'GET') path = '/%s' % (self.LIST_SUBSCRIPTIONS_PATH,) @@ -471,9 +507,12 @@ def test_list_subscriptions_with_paging(self): def test_list_subscriptions_missing_key(self): RETURNED = {} connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) - subscriptions, next_token = api.list_subscriptions(self.PROJECT) + iterator = api.list_subscriptions(self.PROJECT) + subscriptions = list(iterator) + next_token = iterator.next_page_token self.assertEqual(len(subscriptions), 0) self.assertIsNone(next_token) @@ -488,7 +527,8 @@ def test_subscription_create_defaults(self): RETURNED = RESOURCE.copy() RETURNED['name'] = self.SUB_PATH connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) resource = api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) @@ -511,7 +551,8 @@ def test_subscription_create_explicit(self): RETURNED = RESOURCE.copy() RETURNED['name'] = self.SUB_PATH connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) resource = api.subscription_create( self.SUB_PATH, self.TOPIC_PATH, @@ -533,7 +574,8 @@ def test_subscription_get(self): 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}, } connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) resource = api.subscription_get(self.SUB_PATH) @@ -545,7 +587,8 @@ def test_subscription_get(self): def test_subscription_delete(self): RETURNED = {} connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) api.subscription_delete(self.SUB_PATH) @@ -560,7 +603,8 @@ def test_subscription_modify_push_config(self): } RETURNED = {} connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) api.subscription_modify_push_config(self.SUB_PATH, PUSH_ENDPOINT) @@ -580,7 +624,8 @@ def test_subscription_pull_defaults(self): 'receivedMessages': [{'ackId': ACK_ID, 'message': MESSAGE}], } connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) BODY = { 'returnImmediately': False, 'maxMessages': 1, @@ -606,7 +651,8 @@ def test_subscription_pull_explicit(self): 'receivedMessages': [{'ackId': ACK_ID, 'message': MESSAGE}], } connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) MAX_MESSAGES = 10 BODY = { 'returnImmediately': True, @@ -630,7 +676,8 @@ def test_subscription_acknowledge(self): } RETURNED = {} connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) api.subscription_acknowledge(self.SUB_PATH, [ACK_ID1, ACK_ID2]) @@ -649,7 +696,8 @@ def test_subscription_modify_ack_deadline(self): } RETURNED = {} connection = _Connection(RETURNED) - api = self._makeOne(connection) + client = _Client(connection, self.PROJECT) + api = self._makeOne(client) api.subscription_modify_ack_deadline( self.SUB_PATH, [ACK_ID1, ACK_ID2], NEW_DEADLINE) @@ -829,3 +877,10 @@ class _Client(object): def __init__(self, connection, project): self.connection = connection self.project = project + + +class _Credentials(object): + + @staticmethod + def create_scoped_required(): + return False