From 4a52cc1a4b49b7f99a45fd0125d8534f912871c1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 26 Mar 2020 18:46:28 +0530 Subject: [PATCH 1/5] feat(bigquery): add support of model for extract job --- google/cloud/bigquery/client.py | 28 +++++-- google/cloud/bigquery/job.py | 33 ++++++-- google/cloud/bigquery/model.py | 12 +++ tests/unit/test_client.py | 134 ++++++++++++++++++++++++++++++++ tests/unit/test_job.py | 36 ++++++++- 5 files changed, 226 insertions(+), 17 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 6fe474218..79140cb0b 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -63,6 +63,7 @@ from google.cloud.bigquery import job from google.cloud.bigquery.model import Model from google.cloud.bigquery.model import ModelReference +from google.cloud.bigquery.model import _model_arg_to_model_ref from google.cloud.bigquery.query import _QueryResults from google.cloud.bigquery.retry import DEFAULT_RETRY from google.cloud.bigquery.routine import Routine @@ -2216,6 +2217,7 @@ def extract_table( job_config=None, retry=DEFAULT_RETRY, timeout=None, + source_type="Table", ): """Start a job to extract a table into Cloud Storage files. @@ -2226,9 +2228,11 @@ def extract_table( source (Union[ \ google.cloud.bigquery.table.Table, \ google.cloud.bigquery.table.TableReference, \ + google.cloud.bigquery.model.Model, \ + google.cloud.bigquery.model.ModelReference, \ src, \ ]): - Table to be extracted. + Table or Model to be extracted. destination_uris (Union[str, Sequence[str]]): URIs of Cloud Storage file(s) into which table data is to be extracted; in format @@ -2253,9 +2257,9 @@ def extract_table( timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. - Args: - source (google.cloud.bigquery.table.TableReference): table to be extracted. - + source_type (str): + (Optional) Type of source to be extracted.``Table`` or ``Model``. + Defaults to ``Table``. Returns: google.cloud.bigquery.job.ExtractJob: A new extract job instance. @@ -2263,7 +2267,9 @@ def extract_table( TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.ExtractJobConfig` class. - """ + ValueError: + If ``source_type`` is not among ``Table``,``Model``. + """ job_id = _make_job_id(job_id, job_id_prefix) if project is None: @@ -2273,7 +2279,17 @@ def extract_table( location = self.location job_ref = job._JobReference(job_id, project=project, location=location) - source = _table_arg_to_table_ref(source, default_project=self.project) + + if source_type.lower() == "table": + source = _table_arg_to_table_ref(source, default_project=self.project) + elif source_type.lower() == "model": + source = _model_arg_to_model_ref(source, default_project=self.project) + else: + raise ValueError( + "Cannot pass `{}` as a ``source_type``, Pass Table or Model".format( + source_type + ) + ) if isinstance(destination_uris, six.string_types): destination_uris = [destination_uris] diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index ab2eaede5..67b2259e0 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -2067,14 +2067,23 @@ def destination_uri_file_counts(self): def to_api_repr(self): """Generate a resource for :meth:`_begin`.""" + configuration = self._configuration.to_api_repr() source_ref = { "projectId": self.source.project, "datasetId": self.source.dataset_id, - "tableId": self.source.table_id, } - configuration = self._configuration.to_api_repr() - _helpers._set_sub_prop(configuration, ["extract", "sourceTable"], source_ref) + if isinstance(self.source, TableReference): + source_ref["tableId"] = self.source.table_id + _helpers._set_sub_prop( + configuration, ["extract", "sourceTable"], source_ref + ) + else: + source_ref["modelId"] = self.source.model_id + _helpers._set_sub_prop( + configuration, ["extract", "sourceModel"], source_ref + ) + _helpers._set_sub_prop( configuration, ["extract", "destinationUris"], self.destination_uris ) @@ -2112,10 +2121,20 @@ def from_api_repr(cls, resource, client): source_config = _helpers._get_sub_prop( config_resource, ["extract", "sourceTable"] ) - dataset = DatasetReference( - source_config["projectId"], source_config["datasetId"] - ) - source = dataset.table(source_config["tableId"]) + if source_config: + dataset = DatasetReference( + source_config["projectId"], source_config["datasetId"] + ) + source = dataset.table(source_config["tableId"]) + else: + source_config = _helpers._get_sub_prop( + config_resource, ["extract", "sourceModel"] + ) + dataset = DatasetReference( + source_config["projectId"], source_config["datasetId"] + ) + source = dataset.model(source_config["modelId"]) + destination_uris = _helpers._get_sub_prop( config_resource, ["extract", "destinationUris"] ) diff --git a/google/cloud/bigquery/model.py b/google/cloud/bigquery/model.py index d39ec5f2f..c331b55e4 100644 --- a/google/cloud/bigquery/model.py +++ b/google/cloud/bigquery/model.py @@ -433,3 +433,15 @@ def __repr__(self): return "ModelReference(project='{}', dataset_id='{}', project_id='{}')".format( self.project, self.dataset_id, self.model_id ) + + +def _model_arg_to_model_ref(value, default_project=None): + """Helper to convert a string or Model to ModelReference. + + This function keeps ModelReference and other kinds of objects unchanged. + """ + if isinstance(value, six.string_types): + value = ModelReference.from_string(value, default_project=default_project) + if isinstance(value, Model): + value = value.reference + return value diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index e4bc6af75..9a23cd5f1 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -4047,6 +4047,140 @@ def test_extract_table_w_destination_uris(self): self.assertEqual(job.source, source) self.assertEqual(list(job.destination_uris), [DESTINATION1, DESTINATION2]) + def test_extract_table_for_source_type_model(self): + from google.cloud.bigquery.job import ExtractJob + + JOB = "job_id" + SOURCE = "source_model" + DESTINATION = "gs://bucket_name/object_name" + RESOURCE = { + "jobReference": {"projectId": self.PROJECT, "jobId": JOB}, + "configuration": { + "extract": { + "sourceModel": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "modelId": SOURCE, + }, + "destinationUris": [DESTINATION], + } + }, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(RESOURCE) + dataset = DatasetReference(self.PROJECT, self.DS_ID) + source = dataset.model(SOURCE) + + job = client.extract_table( + source, DESTINATION, job_id=JOB, timeout=7.5, source_type="Model" + ) + + # Check that extract_table actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=7.5, + ) + + # Check the job resource. + self.assertIsInstance(job, ExtractJob) + self.assertIs(job._client, client) + self.assertEqual(job.job_id, JOB) + self.assertEqual(job.source, source) + self.assertEqual(list(job.destination_uris), [DESTINATION]) + + def test_extract_table_for_source_type_model_w_string_model_id(self): + JOB = "job_id" + source_id = "source_model" + DESTINATION = "gs://bucket_name/object_name" + RESOURCE = { + "jobReference": {"projectId": self.PROJECT, "jobId": JOB}, + "configuration": { + "extract": { + "sourceModel": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "modelId": source_id, + }, + "destinationUris": [DESTINATION], + } + }, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(RESOURCE) + + client.extract_table( + # Test with string for model ID. + "{}.{}".format(self.DS_ID, source_id), + DESTINATION, + job_id=JOB, + timeout=7.5, + source_type="Model", + ) + + # Check that extract_table actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=7.5, + ) + + def test_extract_table_for_source_type_model_w_model_object(self): + from google.cloud.bigquery.model import Model + + JOB = "job_id" + DESTINATION = "gs://bucket_name/object_name" + model_id = "{}.{}.{}".format(self.PROJECT, self.DS_ID, self.MODEL_ID) + model = Model(model_id) + RESOURCE = { + "jobReference": {"projectId": self.PROJECT, "jobId": JOB}, + "configuration": { + "extract": { + "sourceModel": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "modelId": self.MODEL_ID, + }, + "destinationUris": [DESTINATION], + } + }, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(RESOURCE) + + client.extract_table( + # Test with Model class object. + model, + DESTINATION, + job_id=JOB, + timeout=7.5, + source_type="Model", + ) + + # Check that extract_table actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=7.5, + ) + + def test_extract_table_for_invalid_source_type_model(self): + JOB = "job_id" + SOURCE = "source_model" + DESTINATION = "gs://bucket_name/object_name" + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + dataset = DatasetReference(self.PROJECT, self.DS_ID) + source = dataset.model(SOURCE) + + with self.assertRaises(ValueError) as exc: + client.extract_table( + source, DESTINATION, job_id=JOB, timeout=7.5, source_type="foo" + ) + + self.assertIn("Cannot pass", exc.exception.args[0]) + def test_query_defaults(self): from google.cloud.bigquery.job import QueryJob diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 3e642142d..d97efd946 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -3176,10 +3176,16 @@ def _verifyResourceProperties(self, job, resource): self.assertEqual(job.destination_uris, config["destinationUris"]) - table_ref = config["sourceTable"] - self.assertEqual(job.source.project, table_ref["projectId"]) - self.assertEqual(job.source.dataset_id, table_ref["datasetId"]) - self.assertEqual(job.source.table_id, table_ref["tableId"]) + if "sourceTable" in config: + table_ref = config["sourceTable"] + self.assertEqual(job.source.project, table_ref["projectId"]) + self.assertEqual(job.source.dataset_id, table_ref["datasetId"]) + self.assertEqual(job.source.table_id, table_ref["tableId"]) + else: + model_ref = config["sourceModel"] + self.assertEqual(job.source.project, model_ref["projectId"]) + self.assertEqual(job.source.dataset_id, model_ref["datasetId"]) + self.assertEqual(job.source.model_id, model_ref["modelId"]) if "compression" in config: self.assertEqual(job.compression, config["compression"]) @@ -3281,6 +3287,28 @@ def test_from_api_repr_bare(self): self.assertIs(job._client, client) self._verifyResourceProperties(job, RESOURCE) + def test_from_api_repr_for_model(self): + self._setUpConstants() + client = _make_client(project=self.PROJECT) + RESOURCE = { + "id": self.JOB_ID, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "configuration": { + "extract": { + "sourceModel": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "modelId": "model_id", + }, + "destinationUris": [self.DESTINATION_URI], + } + }, + } + klass = self._get_target_class() + job = klass.from_api_repr(RESOURCE, client=client) + self.assertIs(job._client, client) + self._verifyResourceProperties(job, RESOURCE) + def test_from_api_repr_w_properties(self): from google.cloud.bigquery.job import Compression From 7d8cb166950548521d61327c3e1c19eb70323f95 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 27 Mar 2020 19:10:04 +0530 Subject: [PATCH 2/5] feat(bigquery): nit --- google/cloud/bigquery/client.py | 2 +- google/cloud/bigquery/job.py | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3fc77ffb7..6c775abec 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2352,7 +2352,7 @@ def extract_table( source = _model_arg_to_model_ref(source, default_project=self.project) else: raise ValueError( - "Cannot pass `{}` as a ``source_type``, Pass Table or Model".format( + "Cannot pass `{}` as a ``source_type``, pass Table or Model".format( source_type ) ) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index dc2a932aa..e08b006f9 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -2075,15 +2075,12 @@ def to_api_repr(self): if isinstance(self.source, TableReference): source_ref["tableId"] = self.source.table_id - _helpers._set_sub_prop( - configuration, ["extract", "sourceTable"], source_ref - ) + source = "sourceTable" else: source_ref["modelId"] = self.source.model_id - _helpers._set_sub_prop( - configuration, ["extract", "sourceModel"], source_ref - ) + source = "sourceModel" + _helpers._set_sub_prop(configuration, ["extract", source], source_ref) _helpers._set_sub_prop( configuration, ["extract", "destinationUris"], self.destination_uris ) From 137b6c324e35dd580dfc15e61afc91ebeff73ec5 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 1 Apr 2020 15:50:35 +0530 Subject: [PATCH 3/5] feat(bigquery): add source model for create job method --- google/cloud/bigquery/client.py | 8 +++++--- google/cloud/bigquery/job.py | 9 ++++++--- google/cloud/bigquery/model.py | 4 ++-- tests/unit/test_client.py | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 6c775abec..2a0b1f6be 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1365,6 +1365,8 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): job_config ) source = _get_sub_prop(job_config, ["extract", "sourceTable"]) + if not source: + source = _get_sub_prop(job_config, ["extract", "sourceModel"]) destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) return self.extract_table( source, destination_uris, job_config=extract_job_config, retry=retry @@ -2345,10 +2347,10 @@ def extract_table( location = self.location job_ref = job._JobReference(job_id, project=project, location=location) - - if source_type.lower() == "table": + src = source_type.lower() + if src == "table": source = _table_arg_to_table_ref(source, default_project=self.project) - elif source_type.lower() == "model": + elif src == "model": source = _model_arg_to_model_ref(source, default_project=self.project) else: raise ValueError( diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index e08b006f9..5ae74b975 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -1990,8 +1990,11 @@ class ExtractJob(_AsyncJob): Args: job_id (str): the job's ID. - source (google.cloud.bigquery.table.TableReference): - Table into which data is to be loaded. + source (Union[ \ + google.cloud.bigquery.table.TableReference, \ + google.cloud.bigquery.model.ModelReference \ + ]): + Table or Model into which data is to be loaded. destination_uris (List[str]): URIs describing where the extracted data will be written in Cloud @@ -2073,9 +2076,9 @@ def to_api_repr(self): "datasetId": self.source.dataset_id, } + source = "sourceTable" if isinstance(self.source, TableReference): source_ref["tableId"] = self.source.table_id - source = "sourceTable" else: source_ref["modelId"] = self.source.model_id source = "sourceModel" diff --git a/google/cloud/bigquery/model.py b/google/cloud/bigquery/model.py index b0b9985c3..eb459f57a 100644 --- a/google/cloud/bigquery/model.py +++ b/google/cloud/bigquery/model.py @@ -441,7 +441,7 @@ def _model_arg_to_model_ref(value, default_project=None): This function keeps ModelReference and other kinds of objects unchanged. """ if isinstance(value, six.string_types): - value = ModelReference.from_string(value, default_project=default_project) + return ModelReference.from_string(value, default_project=default_project) if isinstance(value, Model): - value = value.reference + return value.reference return value diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index c960f069c..6edb2e168 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2884,6 +2884,21 @@ def test_create_job_extract_config(self): configuration, "google.cloud.bigquery.client.Client.extract_table", ) + def test_create_job_extract_config_for_model(self): + configuration = { + "extract": { + "sourceModel": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "modelId": "source_model", + }, + "destinationUris": ["gs://test_bucket/dst_object*"], + } + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.extract_table", + ) + def test_create_job_query_config(self): configuration = { "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} From c97b6d1dfe865b9734b015b48ae928671b63edff Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 3 Apr 2020 16:45:38 +0530 Subject: [PATCH 4/5] feat(bigquery): nits --- google/cloud/bigquery/client.py | 8 +++++++- google/cloud/bigquery/job.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 2a0b1f6be..da5b30a35 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1365,11 +1365,17 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): job_config ) source = _get_sub_prop(job_config, ["extract", "sourceTable"]) + source_type = "Table" if not source: source = _get_sub_prop(job_config, ["extract", "sourceModel"]) + source_type = "Model" destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) return self.extract_table( - source, destination_uris, job_config=extract_job_config, retry=retry + source, + destination_uris, + job_config=extract_job_config, + retry=retry, + source_type=source_type, ) elif "query" in job_config: copy_config = copy.deepcopy(job_config) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 5ae74b975..b1fd87ffb 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -1994,7 +1994,7 @@ class ExtractJob(_AsyncJob): google.cloud.bigquery.table.TableReference, \ google.cloud.bigquery.model.ModelReference \ ]): - Table or Model into which data is to be loaded. + Table or Model from which data is to be loaded. destination_uris (List[str]): URIs describing where the extracted data will be written in Cloud From 09d05241d0e2f78b7712e0c209316eea7de36af7 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Fri, 8 May 2020 16:10:12 +0530 Subject: [PATCH 5/5] feat(bigquery): nit --- google/cloud/bigquery/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index b1fd87ffb..25dd446e8 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -1994,7 +1994,7 @@ class ExtractJob(_AsyncJob): google.cloud.bigquery.table.TableReference, \ google.cloud.bigquery.model.ModelReference \ ]): - Table or Model from which data is to be loaded. + Table or Model from which data is to be loaded or extracted. destination_uris (List[str]): URIs describing where the extracted data will be written in Cloud