Skip to content

Commit cbd5c20

Browse files
authored
Merge pull request googleapis#2602 from dhermes/pubsub-iterators
Using Iterators for list_topics() in Pub/Sub.
2 parents dc57df9 + f99a348 commit cbd5c20

11 files changed

Lines changed: 317 additions & 116 deletions

File tree

core/google/cloud/_testing.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,12 @@ def __init__(self, items, page_token):
8181
self.page_token = page_token
8282

8383
def next(self):
84+
if self._items is None:
85+
raise StopIteration
8486
items, self._items = self._items, None
8587
return items
88+
89+
__next__ = next
90+
91+
def __iter__(self):
92+
return self

core/google/cloud/iterator.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,14 @@ class Iterator(object):
202202
takes the :class:`Iterator` that started the page,
203203
the :class:`Page` that was started and the dictionary
204204
containing the page response.
205+
206+
:type page_iter: callable
207+
:param page_iter: (Optional) Callable to produce a pages iterator from the
208+
current iterator. Assumed signature takes the
209+
:class:`Iterator` that started the page. By default uses
210+
the HTTP pages iterator. Meant to provide a custom
211+
way to create pages (potentially with a custom
212+
transport such as gRPC).
205213
"""
206214

207215
_PAGE_TOKEN = 'pageToken'
@@ -211,7 +219,7 @@ class Iterator(object):
211219
def __init__(self, client, path, item_to_value,
212220
items_key=DEFAULT_ITEMS_KEY,
213221
page_token=None, max_results=None, extra_params=None,
214-
page_start=_do_nothing_page_start):
222+
page_start=_do_nothing_page_start, page_iter=None):
215223
self._started = False
216224
self.client = client
217225
self.path = path
@@ -220,8 +228,14 @@ def __init__(self, client, path, item_to_value,
220228
self.max_results = max_results
221229
self.extra_params = extra_params
222230
self._page_start = page_start
231+
self._page_iter = None
232+
# Verify inputs / provide defaults.
223233
if self.extra_params is None:
224234
self.extra_params = {}
235+
if page_iter is None:
236+
self._page_iter = self._default_page_iter()
237+
else:
238+
self._page_iter = page_iter(self)
225239
self._verify_params()
226240
# The attributes below will change over the life of the iterator.
227241
self.page_number = 0
@@ -239,7 +253,7 @@ def _verify_params(self):
239253
raise ValueError('Using a reserved parameter',
240254
reserved_in_use)
241255

242-
def _pages_iter(self):
256+
def _default_page_iter(self):
243257
"""Generator of pages of API responses.
244258
245259
Yields :class:`Page` instances.
@@ -263,11 +277,11 @@ def pages(self):
263277
if self._started:
264278
raise ValueError('Iterator has already started', self)
265279
self._started = True
266-
return self._pages_iter()
280+
return self._page_iter
267281

268282
def _items_iter(self):
269283
"""Iterator for each item returned."""
270-
for page in self._pages_iter():
284+
for page in self._page_iter:
271285
# Decrement the total results since the pages iterator adds
272286
# to it when each page is encountered.
273287
self.num_results -= page.num_items

core/unit_tests/test_iterator.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,22 @@ def test_constructor_w_extra_param_collision(self):
128128
with self.assertRaises(ValueError):
129129
self._makeOne(client, path, None, extra_params=extra_params)
130130

131+
def test_constructor_non_default_page_iter(self):
132+
connection = _Connection()
133+
client = _Client(connection)
134+
path = '/foo'
135+
result = object()
136+
called = []
137+
138+
def page_iter(iterator):
139+
called.append(iterator)
140+
return result
141+
142+
iterator = self._makeOne(client, path, None,
143+
page_iter=page_iter)
144+
self.assertEqual(called, [iterator])
145+
self.assertIs(iterator._page_iter, result)
146+
131147
def test_pages_iter_empty_then_another(self):
132148
import six
133149
from google.cloud._testing import _Monkey

docs/pubsub_snippets.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,8 @@ def do_something_with(sub): # pylint: disable=unused-argument
4646
pass
4747

4848
# [START client_list_topics]
49-
topics, token = client.list_topics() # API request
50-
while True:
51-
for topic in topics:
52-
do_something_with(topic)
53-
if token is None:
54-
break
55-
topics, token = client.list_topics(page_token=token) # API request
49+
for topic in client.list_topics(): # API request(s)
50+
do_something_with(topic)
5651
# [END client_list_topics]
5752

5853

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""GAX wrapper for Pubsub API requests."""
1616

17+
import functools
18+
1719
from google.cloud.gapic.pubsub.v1.publisher_api import PublisherApi
1820
from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi
1921
from google.gax import CallOptions
@@ -29,16 +31,27 @@
2931
from google.cloud._helpers import _pb_timestamp_to_rfc3339
3032
from google.cloud.exceptions import Conflict
3133
from google.cloud.exceptions import NotFound
34+
from google.cloud.iterator import Iterator
35+
from google.cloud.iterator import Page
36+
from google.cloud.pubsub.topic import Topic
37+
38+
39+
_FAKE_ITEMS_KEY = 'not-a-key'
3240

3341

3442
class _PublisherAPI(object):
3543
"""Helper mapping publisher-related APIs.
3644
3745
:type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi`
3846
:param gax_api: API object used to make GAX requests.
47+
48+
:type client: :class:`~google.cloud.pubsub.client.Client`
49+
:param client: The client that owns this API object.
3950
"""
40-
def __init__(self, gax_api):
51+
52+
def __init__(self, gax_api, client):
4153
self._gax_api = gax_api
54+
self._client = client
4255

4356
def list_topics(self, project, page_size=0, page_token=None):
4457
"""List topics for the project associated with this API.
@@ -58,21 +71,21 @@ def list_topics(self, project, page_size=0, page_token=None):
5871
passed, the API will return the first page of
5972
topics.
6073
61-
:rtype: tuple, (list, str)
62-
:returns: list of ``Topic`` resource dicts, plus a
63-
"next page token" string: if not None, indicates that
64-
more topics can be retrieved with another call (pass that
65-
value as ``page_token``).
74+
:rtype: :class:`~google.cloud.iterator.Iterator`
75+
:returns: Iterator of :class:`~google.cloud.pubsub.topic.Topic`
76+
accessible to the current API.
6677
"""
6778
if page_token is None:
6879
page_token = INITIAL_PAGE
6980
options = CallOptions(page_token=page_token)
7081
path = 'projects/%s' % (project,)
7182
page_iter = self._gax_api.list_topics(
7283
path, page_size=page_size, options=options)
73-
topics = [{'name': topic_pb.name} for topic_pb in page_iter.next()]
74-
token = page_iter.page_token or None
75-
return topics, token
84+
page_iter = functools.partial(_recast_page_iterator, page_iter)
85+
86+
return Iterator(client=self._client, path=path,
87+
item_to_value=_item_to_topic,
88+
page_iter=page_iter)
7689

7790
def topic_create(self, topic_path):
7891
"""API call: create a topic
@@ -543,3 +556,42 @@ def make_gax_subscriber_api(connection):
543556
if connection.in_emulator:
544557
channel = insecure_channel(connection.host)
545558
return SubscriberApi(channel=channel)
559+
560+
561+
def _item_to_topic(iterator, resource):
562+
"""Convert a JSON job to the native object.
563+
564+
:type iterator: :class:`~google.cloud.iterator.Iterator`
565+
:param iterator: The iterator that is currently in use.
566+
567+
:type resource: :class:`google.pubsub.v1.pubsub_pb2.Topic`
568+
:param resource: A topic returned from the API.
569+
570+
:rtype: :class:`~google.cloud.pubsub.topic.Topic`
571+
:returns: The next topic in the page.
572+
"""
573+
return Topic.from_api_repr(
574+
{'name': resource.name}, iterator.client)
575+
576+
577+
def _recast_page_iterator(page_iter, iterator):
578+
"""Wrap GAX pages generator.
579+
580+
In particular, wrap each page and capture some state from the
581+
GAX iterator.
582+
583+
Yields :class:`~google.cloud.iterator.Page` instances
584+
585+
:type page_iter: :class:`~google.gax.PageIterator`
586+
:param page_iter: The iterator to wrap.
587+
588+
:type iterator: :class:`~google.cloud.iterator.Iterator`
589+
:param iterator: The iterator that owns each page.
590+
"""
591+
for items in page_iter:
592+
fake_response = {_FAKE_ITEMS_KEY: items}
593+
page = Page(
594+
iterator, fake_response, _FAKE_ITEMS_KEY, _item_to_topic)
595+
iterator.next_page_token = page_iter.page_token or None
596+
iterator.num_results += page.num_items
597+
yield page

pubsub/google/cloud/pubsub/client.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ def publisher_api(self):
8787
if self._publisher_api is None:
8888
if self._use_gax:
8989
generated = make_gax_publisher_api(self.connection)
90-
self._publisher_api = GAXPublisherAPI(generated)
90+
self._publisher_api = GAXPublisherAPI(generated, self)
9191
else:
92-
self._publisher_api = JSONPublisherAPI(self.connection)
92+
self._publisher_api = JSONPublisherAPI(self)
9393
return self._publisher_api
9494

9595
@property
@@ -131,18 +131,13 @@ def list_topics(self, page_size=None, page_token=None):
131131
passed, the API will return the first page of
132132
topics.
133133
134-
:rtype: tuple, (list, str)
135-
:returns: list of :class:`google.cloud.pubsub.topic.Topic`, plus a
136-
"next page token" string: if not None, indicates that
137-
more topics can be retrieved with another call (pass that
138-
value as ``page_token``).
134+
:rtype: :class:`~google.cloud.iterator.Iterator`
135+
:returns: Iterator of :class:`~google.cloud.pubsub.topic.Topic`
136+
accessible to the current API.
139137
"""
140138
api = self.publisher_api
141-
resources, next_token = api.list_topics(
139+
return api.list_topics(
142140
self.project, page_size, page_token)
143-
topics = [Topic.from_api_repr(resource, self)
144-
for resource in resources]
145-
return topics, next_token
146141

147142
def list_subscriptions(self, page_size=None, page_token=None):
148143
"""List subscriptions for the project associated with this client.

pubsub/google/cloud/pubsub/connection.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
from google.cloud import connection as base_connection
2121
from google.cloud.environment_vars import PUBSUB_EMULATOR
22+
from google.cloud.iterator import Iterator
23+
from google.cloud.pubsub.topic import Topic
2224

2325

2426
PUBSUB_API_HOST = 'pubsub.googleapis.com'
@@ -97,12 +99,13 @@ def build_api_url(self, path, query_params=None,
9799
class _PublisherAPI(object):
98100
"""Helper mapping publisher-related APIs.
99101
100-
:type connection: :class:`Connection`
101-
:param connection: the connection used to make API requests.
102+
:type client: :class:`~google.cloud.pubsub.client.Client`
103+
:param client: the client used to make API requests.
102104
"""
103105

104-
def __init__(self, connection):
105-
self._connection = connection
106+
def __init__(self, client):
107+
self._client = client
108+
self._connection = client.connection
106109

107110
def list_topics(self, project, page_size=None, page_token=None):
108111
"""API call: list topics for a given project
@@ -122,24 +125,18 @@ def list_topics(self, project, page_size=None, page_token=None):
122125
passed, the API will return the first page of
123126
topics.
124127
125-
:rtype: tuple, (list, str)
126-
:returns: list of ``Topic`` resource dicts, plus a
127-
"next page token" string: if not None, indicates that
128-
more topics can be retrieved with another call (pass that
129-
value as ``page_token``).
128+
:rtype: :class:`~google.cloud.iterator.Iterator`
129+
:returns: Iterator of :class:`~google.cloud.pubsub.topic.Topic`
130+
accessible to the current connection.
130131
"""
131-
conn = self._connection
132-
params = {}
133-
132+
extra_params = {}
134133
if page_size is not None:
135-
params['pageSize'] = page_size
136-
137-
if page_token is not None:
138-
params['pageToken'] = page_token
139-
134+
extra_params['pageSize'] = page_size
140135
path = '/projects/%s/topics' % (project,)
141-
resp = conn.api_request(method='GET', path=path, query_params=params)
142-
return resp.get('topics', ()), resp.get('nextPageToken')
136+
137+
return Iterator(client=self._client, path=path,
138+
items_key='topics', item_to_value=_item_to_topic,
139+
page_token=page_token, extra_params=extra_params)
143140

144141
def topic_create(self, topic_path):
145142
"""API call: create a topic
@@ -576,3 +573,18 @@ def _transform_messages_base64(messages, transform, key=None):
576573
message = message[key]
577574
if 'data' in message:
578575
message['data'] = transform(message['data'])
576+
577+
578+
def _item_to_topic(iterator, resource):
579+
"""Convert a JSON job to the native object.
580+
581+
:type iterator: :class:`~google.cloud.iterator.Iterator`
582+
:param iterator: The iterator that is currently in use.
583+
584+
:type resource: dict
585+
:param resource: A topic returned from the API.
586+
587+
:rtype: :class:`~google.cloud.pubsub.topic.Topic`
588+
:returns: The next topic in the page.
589+
"""
590+
return Topic.from_api_repr(resource, iterator.client)

0 commit comments

Comments
 (0)