Skip to content

Commit ad8bef3

Browse files
committed
Add reading data with snapshot system test
1 parent 6bb61e2 commit ad8bef3

2 files changed

Lines changed: 73 additions & 0 deletions

File tree

bigquery_storage/noxfile.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ def system(session):
111111
# Use pre-release gRPC for system tests.
112112
session.install("--pre", "grpcio")
113113

114+
session.install("protobuf")
115+
114116
# Install all test dependencies, then install this package into the
115117
# virtualenv's dist-packages.
116118
session.install("mock", "pytest")

bigquery_storage/tests/system/test_reader.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@
1515
# limitations under the License.
1616
"""System tests for reading rows from tables."""
1717

18+
import datetime as dt
19+
import json
20+
import io
21+
1822
import pytest
1923

24+
from google.cloud import bigquery
2025
from google.cloud import bigquery_storage_v1beta1
26+
from google.protobuf import timestamp_pb2
2127

2228

2329
@pytest.mark.parametrize(
@@ -145,3 +151,68 @@ def test_column_selection_read(client, project_id, table_with_data_ref, data_for
145151

146152
for row in rows:
147153
assert sorted(row.keys()) == ["age", "first_name"]
154+
155+
156+
def test_snapshot(client, project_id, table_with_data_ref):
157+
before_new_data = timestamp_pb2.Timestamp()
158+
before_new_data.GetCurrentTime()
159+
160+
# load additional data into the table
161+
new_data = [
162+
{u"first_name": u"NewGuyFoo", u"last_name": u"Smith", u"age": 46},
163+
{u"first_name": u"NewGuyBar", u"last_name": u"Jones", u"age": 30},
164+
]
165+
_add_rows(table_with_data_ref, new_data)
166+
167+
# read data using the timestamp before the additional data load
168+
session = client.create_read_session(
169+
table_with_data_ref,
170+
"projects/{}".format(project_id),
171+
format_=bigquery_storage_v1beta1.enums.DataFormat.AVRO,
172+
requested_streams=1,
173+
table_modifiers={"snapshot_time": before_new_data},
174+
)
175+
stream_pos = bigquery_storage_v1beta1.types.StreamPosition(
176+
stream=session.streams[0]
177+
)
178+
179+
rows = list(client.read_rows(stream_pos).rows(session))
180+
181+
# verify that only the data before the timestamp was returned
182+
assert len(rows) == 5 # all initial records
183+
184+
for row in rows:
185+
assert "NewGuy" not in row["first_name"] # no new records
186+
187+
188+
def _add_rows(table_ref, new_data):
189+
"""Insert additional rows into an existing table.
190+
191+
Args:
192+
table_ref (bigquery_storage_v1beta1.types.TableReference):
193+
A reference to the target table.
194+
new_data (Iterable[Dict[str, Any]]):
195+
New data to insert with each row represented as a dictionary.
196+
The keys must match the table column names, and the values
197+
must be JSON serializable.
198+
"""
199+
bq_client = bigquery.Client()
200+
201+
job_config = bigquery.LoadJobConfig(
202+
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
203+
)
204+
205+
new_data_str = u"\n".join(json.dumps(item) for item in new_data)
206+
new_data_file = io.BytesIO(new_data_str.encode())
207+
208+
destination_ref = bigquery.table.TableReference.from_api_repr(
209+
{
210+
"projectId": table_ref.project_id,
211+
"datasetId": table_ref.dataset_id,
212+
"tableId": table_ref.table_id,
213+
}
214+
)
215+
job = bq_client.load_table_from_file(
216+
new_data_file, destination=destination_ref, job_config=job_config
217+
)
218+
job.result() # wait for the load to complete

0 commit comments

Comments
 (0)