diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 59f8bef83bb3..97d3b76e6996 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -500,6 +500,10 @@ def _get_resource_config(cls, resource): '["configuration"]["%s"]' % cls._JOB_TYPE) return job_id, resource['configuration'] + def _build_resource(self): + """Helper: Generate a resource for :meth:`_begin`.""" + raise NotImplementedError("Abstract") + def _begin(self, client=None, retry=DEFAULT_RETRY): """API call: begin the job via a POST request @@ -1237,7 +1241,7 @@ def output_rows(self): return int(statistics['load']['outputRows']) def _build_resource(self): - """Generate a resource for :meth:`begin`.""" + """Generate a resource for :meth:`_begin`.""" configuration = self._configuration.to_api_repr() if self.source_uris is not None: _helpers._set_sub_prop( @@ -1413,7 +1417,7 @@ def destination_encryption_configuration(self): return self._configuration.destination_encryption_configuration def _build_resource(self): - """Generate a resource for :meth:`begin`.""" + """Generate a resource for :meth:`_begin`.""" source_refs = [{ 'projectId': table.project, @@ -1631,7 +1635,7 @@ def destination_uri_file_counts(self): return None def _build_resource(self): - """Generate a resource for :meth:`begin`.""" + """Generate a resource for :meth:`_begin`.""" source_ref = { 'projectId': self.source.project, @@ -2202,7 +2206,7 @@ def schema_update_options(self): return self._configuration.schema_update_options def _build_resource(self): - """Generate a resource for :meth:`begin`.""" + """Generate a resource for :meth:`_begin`.""" configuration = self._configuration.to_api_repr() resource = { @@ -2437,6 +2441,22 @@ def undeclared_query_parameters(self): return parameters + @property + def estimated_bytes_processed(self): + """Return the estimated number of bytes processed by the query. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.estimatedBytesProcessed + + :rtype: int or None + :returns: number of DML rows affected by the job, or None if job is not + yet complete. + """ + result = self._job_statistics().get('estimatedBytesProcessed') + if result is not None: + result = int(result) + return result + def done(self, retry=DEFAULT_RETRY): """Refresh the job and checks if it is complete. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 23cdbaa30857..40067551d133 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -13,21 +13,14 @@ # limitations under the License. import copy +import unittest +import mock from six.moves import http_client -import unittest try: import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None -from google.cloud.bigquery.job import CopyJobConfig -from google.cloud.bigquery.job import ExtractJobConfig -from google.cloud.bigquery.job import LoadJobConfig -from google.cloud.bigquery.dataset import DatasetReference -from google.cloud.bigquery.table import EncryptionConfiguration -from google.cloud._helpers import _RFC3339_MICROS - -import mock def _make_credentials(): @@ -80,6 +73,850 @@ def test_missing_reason(self): self.assertEqual(exception.code, http_client.INTERNAL_SERVER_ERROR) +class Test_JobReference(unittest.TestCase): + JOB_ID = 'job-id' + PROJECT = 'test-project-123' + LOCATION = 'us-central' + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery import job + + return job._JobReference + + def _make_one(self, job_id, project, location): + return self._get_target_class()(job_id, project, location) + + def test_ctor(self): + job_ref = self._make_one(self.JOB_ID, self.PROJECT, self.LOCATION) + + self.assertEqual(job_ref.job_id, self.JOB_ID) + self.assertEqual(job_ref.project, self.PROJECT) + self.assertEqual(job_ref.location, self.LOCATION) + + def test__to_api_repr(self): + job_ref = self._make_one(self.JOB_ID, self.PROJECT, self.LOCATION) + + self.assertEqual(job_ref._to_api_repr(), { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': self.LOCATION, + }) + + def test_from_api_repr(self): + api_repr = { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': self.LOCATION, + } + + job_ref = self._get_target_class()._from_api_repr(api_repr) + + self.assertEqual(job_ref.job_id, self.JOB_ID) + self.assertEqual(job_ref.project, self.PROJECT) + self.assertEqual(job_ref.location, self.LOCATION) + + +class Test_AsyncJob(unittest.TestCase): + JOB_ID = 'job-id' + PROJECT = 'test-project-123' + LOCATION = 'us-central' + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery import job + + return job._AsyncJob + + def _make_one(self, job_id, client): + return self._get_target_class()(job_id, client) + + def _make_derived_class(self): + class Derived(self._get_target_class()): + _JOB_TYPE = 'derived' + + return Derived + + def _make_derived(self, job_id, client): + return self._make_derived_class()(job_id, client) + + @staticmethod + def _job_reference(job_id, project, location): + from google.cloud.bigquery import job + + return job._JobReference(job_id, project, location) + + def test_ctor_w_bare_job_id(self): + import threading + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + + self.assertEqual(job.job_id, self.JOB_ID) + self.assertEqual(job.project, self.PROJECT) + self.assertIsNone(job.location) + self.assertIs(job._client, client) + self.assertEqual(job._properties, {}) + self.assertIsInstance(job._completion_lock, type(threading.Lock())) + self.assertEqual( + job.path, + '/projects/{}/jobs/{}'.format(self.PROJECT, self.JOB_ID)) + + def test_ctor_w_job_ref(self): + import threading + + other_project = 'other-project-234' + client = _make_client(project=other_project) + job_ref = self._job_reference(self.JOB_ID, self.PROJECT, self.LOCATION) + job = self._make_one(job_ref, client) + + self.assertEqual(job.job_id, self.JOB_ID) + self.assertEqual(job.project, self.PROJECT) + self.assertEqual(job.location, self.LOCATION) + self.assertIs(job._client, client) + self.assertEqual(job._properties, {}) + self.assertFalse(job._result_set) + self.assertIsInstance(job._completion_lock, type(threading.Lock())) + self.assertEqual( + job.path, + '/projects/{}/jobs/{}'.format(self.PROJECT, self.JOB_ID)) + + def test__require_client_w_none(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + + self.assertIs(job._require_client(None), client) + + def test__require_client_w_other(self): + client = _make_client(project=self.PROJECT) + other = object() + job = self._make_one(self.JOB_ID, client) + + self.assertIs(job._require_client(other), other) + + def test_job_type(self): + client = _make_client(project=self.PROJECT) + derived = self._make_derived(self.JOB_ID, client) + + self.assertEqual(derived.job_type, 'derived') + + def test_etag(self): + etag = 'ETAG-123' + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.etag) + job._properties['etag'] = etag + self.assertEqual(job.etag, etag) + + def test_self_link(self): + self_link = 'https://api.example.com/123' + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.self_link) + job._properties['selfLink'] = self_link + self.assertEqual(job.self_link, self_link) + + def test_user_email(self): + user_email = 'user@example.com' + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.user_email) + job._properties['user_email'] = user_email + self.assertEqual(job.user_email, user_email) + + @staticmethod + def _datetime_and_millis(): + import datetime + import pytz + from google.cloud._helpers import _millis + now = datetime.datetime.utcnow().replace( + microsecond=123000, # stats timestamps have ms precision + tzinfo=pytz.UTC) + return now, _millis(now) + + def test_created(self): + now, millis = self._datetime_and_millis() + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.created) + stats = job._properties['statistics'] = {} + self.assertIsNone(job.created) + stats['creationTime'] = millis + self.assertEqual(job.created, now) + + def test_started(self): + now, millis = self._datetime_and_millis() + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.started) + stats = job._properties['statistics'] = {} + self.assertIsNone(job.started) + stats['startTime'] = millis + self.assertEqual(job.started, now) + + def test_ended(self): + now, millis = self._datetime_and_millis() + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.ended) + stats = job._properties['statistics'] = {} + self.assertIsNone(job.ended) + stats['endTime'] = millis + self.assertEqual(job.ended, now) + + def test__job_statistics(self): + statistics = {'foo': 'bar'} + client = _make_client(project=self.PROJECT) + derived = self._make_derived(self.JOB_ID, client) + self.assertEqual(derived._job_statistics(), {}) + stats = derived._properties['statistics'] = {} + self.assertEqual(derived._job_statistics(), {}) + stats['derived'] = statistics + self.assertEqual(derived._job_statistics(), statistics) + + def test_error_result(self): + error_result = { + 'debugInfo': 'DEBUG INFO', + 'location': 'LOCATION', + 'message': 'MESSAGE', + 'reason': 'REASON' + } + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.error_result) + status = job._properties['status'] = {} + self.assertIsNone(job.error_result) + status['errorResult'] = error_result + self.assertEqual(job.error_result, error_result) + + def test_errors(self): + errors = [{ + 'debugInfo': 'DEBUG INFO', + 'location': 'LOCATION', + 'message': 'MESSAGE', + 'reason': 'REASON' + }] + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.errors) + status = job._properties['status'] = {} + self.assertIsNone(job.errors) + status['errors'] = errors + self.assertEqual(job.errors, errors) + + def test_state(self): + state = 'STATE' + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + self.assertIsNone(job.state) + status = job._properties['status'] = {} + self.assertIsNone(job.state) + status['state'] = state + self.assertEqual(job.state, state) + + def test__scrub_local_properties(self): + before = {'foo': 'bar'} + resource = before.copy() + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._scrub_local_properties(resource) # no raise + self.assertEqual(resource, before) + + def test__copy_configuration_properties(self): + before = {'foo': 'bar'} + resource = before.copy() + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + with self.assertRaises(NotImplementedError): + job._copy_configuration_properties(resource) + self.assertEqual(resource, before) + + def _set_properties_job(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._scrub_local_properties = mock.Mock() + job._copy_configuration_properties = mock.Mock() + job._set_future_result = mock.Mock() + job._properties = { + 'foo': 'bar', + } + return job + + def test__set_properties_no_stats(self): + config = { + 'test': True, + } + resource = { + 'configuration': config, + } + job = self._set_properties_job() + + job._set_properties(resource) + + self.assertEqual(job._properties, resource) + + job._scrub_local_properties.assert_called_once_with(resource) + job._copy_configuration_properties.assert_called_once_with(config) + + def test__set_properties_w_creation_time(self): + now, millis = self._datetime_and_millis() + config = { + 'test': True, + } + stats = { + 'creationTime': str(millis), + } + resource = { + 'configuration': config, + 'statistics': stats, + } + job = self._set_properties_job() + + job._set_properties(resource) + + cleaned = copy.deepcopy(resource) + cleaned['statistics']['creationTime'] = float(millis) + self.assertEqual(job._properties, cleaned) + + job._scrub_local_properties.assert_called_once_with(resource) + job._copy_configuration_properties.assert_called_once_with(config) + + def test__set_properties_w_start_time(self): + now, millis = self._datetime_and_millis() + config = { + 'test': True, + } + stats = { + 'startTime': str(millis), + } + resource = { + 'configuration': config, + 'statistics': stats, + } + job = self._set_properties_job() + + job._set_properties(resource) + + cleaned = copy.deepcopy(resource) + cleaned['statistics']['startTime'] = float(millis) + self.assertEqual(job._properties, cleaned) + + job._scrub_local_properties.assert_called_once_with(resource) + job._copy_configuration_properties.assert_called_once_with(config) + + def test__set_properties_w_end_time(self): + now, millis = self._datetime_and_millis() + config = { + 'test': True, + } + stats = { + 'endTime': str(millis), + } + resource = { + 'configuration': config, + 'statistics': stats, + } + job = self._set_properties_job() + + job._set_properties(resource) + + cleaned = copy.deepcopy(resource) + cleaned['statistics']['endTime'] = float(millis) + self.assertEqual(job._properties, cleaned) + + job._scrub_local_properties.assert_called_once_with(resource) + job._copy_configuration_properties.assert_called_once_with(config) + + def test__get_resource_config_missing_job_ref(self): + resource = {} + klass = self._make_derived_class() + + with self.assertRaises(KeyError): + klass._get_resource_config(resource) + + def test__get_resource_config_missing_job_id(self): + resource = { + 'jobReference': {}, + } + klass = self._make_derived_class() + + with self.assertRaises(KeyError): + klass._get_resource_config(resource) + + def test__get_resource_config_missing_configuration(self): + resource = { + 'jobReference': {'jobId': self.JOB_ID}, + } + klass = self._make_derived_class() + + with self.assertRaises(KeyError): + klass._get_resource_config(resource) + + def test__get_resource_config_missing_config_type(self): + resource = { + 'jobReference': {'jobId': self.JOB_ID}, + 'configuration': {}, + } + klass = self._make_derived_class() + + with self.assertRaises(KeyError): + klass._get_resource_config(resource) + + def test__get_resource_config_ok(self): + derived_config = {'foo': 'bar'} + resource = { + 'jobReference': {'jobId': self.JOB_ID}, + 'configuration': { + 'derived': derived_config, + }, + } + klass = self._make_derived_class() + + job_id, config = klass._get_resource_config(resource) + + self.assertEqual(job_id, self.JOB_ID) + self.assertEqual(config, {'derived': derived_config}) + + def test__build_resource(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + with self.assertRaises(NotImplementedError): + job._build_resource() + + def test__begin_already(self): + job = self._set_properties_job() + job._properties['status'] = {'state': 'WHATEVER'} + + with self.assertRaises(ValueError): + job._begin() + + def test__begin_defaults(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + job = self._set_properties_job() + builder = job._build_resource = mock.Mock() + builder.return_value = resource + call_api = job._client._call_api = mock.Mock() + call_api.return_value = resource + + job._begin() + + call_api.assert_called_once_with( + DEFAULT_RETRY, + method='POST', + path='/projects/{}/jobs'.format(self.PROJECT), + data=resource, + ) + self.assertEqual(job._properties, resource) + + def test__begin_explicit(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + other_project = 'other-project-234' + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + job = self._set_properties_job() + builder = job._build_resource = mock.Mock() + builder.return_value = resource + client = _make_client(project=other_project) + call_api = client._call_api = mock.Mock() + call_api.return_value = resource + retry = DEFAULT_RETRY.with_deadline(1) + + job._begin(client=client, retry=retry) + + call_api.assert_called_once_with( + retry, + method='POST', + path='/projects/{}/jobs'.format(self.PROJECT), + data=resource, + ) + self.assertEqual(job._properties, resource) + + def test_exists_defaults_miss(self): + from google.cloud.exceptions import NotFound + from google.cloud.bigquery.retry import DEFAULT_RETRY + + job = self._set_properties_job() + job._job_ref._properties['location'] = self.LOCATION + call_api = job._client._call_api = mock.Mock() + call_api.side_effect = NotFound('testing') + + self.assertFalse(job.exists()) + + call_api.assert_called_once_with( + DEFAULT_RETRY, + method='GET', + path='/projects/{}/jobs/{}'.format(self.PROJECT, self.JOB_ID), + query_params={ + 'fields': 'id', + 'location': self.LOCATION, + } + ) + + def test_exists_explicit_hit(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + other_project = 'other-project-234' + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + job = self._set_properties_job() + client = _make_client(project=other_project) + call_api = client._call_api = mock.Mock() + call_api.return_value = resource + retry = DEFAULT_RETRY.with_deadline(1) + + self.assertTrue(job.exists(client=client, retry=retry)) + + call_api.assert_called_once_with( + retry, + method='GET', + path='/projects/{}/jobs/{}'.format(self.PROJECT, self.JOB_ID), + query_params={'fields': 'id'} + ) + + def test_reload_defaults(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + job = self._set_properties_job() + job._job_ref._properties['location'] = self.LOCATION + call_api = job._client._call_api = mock.Mock() + call_api.return_value = resource + + job.reload() + + call_api.assert_called_once_with( + DEFAULT_RETRY, + method='GET', + path='/projects/{}/jobs/{}'.format(self.PROJECT, self.JOB_ID), + query_params={'location': self.LOCATION}, + ) + self.assertEqual(job._properties, resource) + + def test_reload_explicit(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + other_project = 'other-project-234' + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + job = self._set_properties_job() + client = _make_client(project=other_project) + call_api = client._call_api = mock.Mock() + call_api.return_value = resource + retry = DEFAULT_RETRY.with_deadline(1) + + job.reload(client=client, retry=retry) + + call_api.assert_called_once_with( + retry, + method='GET', + path='/projects/{}/jobs/{}'.format(self.PROJECT, self.JOB_ID), + query_params={}, + ) + self.assertEqual(job._properties, resource) + + def test_cancel_defaults(self): + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + response = {'job': resource} + job = self._set_properties_job() + job._job_ref._properties['location'] = self.LOCATION + connection = job._client._connection = _make_connection(response) + + self.assertTrue(job.cancel()) + + connection.api_request.assert_called_once_with( + method='POST', + path='/projects/{}/jobs/{}/cancel'.format( + self.PROJECT, self.JOB_ID), + query_params={'location': self.LOCATION}, + ) + self.assertEqual(job._properties, resource) + + def test_cancel_explicit(self): + other_project = 'other-project-234' + resource = { + 'jobReference': { + 'jobId': self.JOB_ID, + 'projectId': self.PROJECT, + 'location': None, + }, + 'configuration': { + 'test': True, + } + } + response = {'job': resource} + job = self._set_properties_job() + client = _make_client(project=other_project) + connection = client._connection = _make_connection(response) + + self.assertTrue(job.cancel(client=client)) + + connection.api_request.assert_called_once_with( + method='POST', + path='/projects/{}/jobs/{}/cancel'.format( + self.PROJECT, self.JOB_ID), + query_params={}, + ) + self.assertEqual(job._properties, resource) + + def test__set_future_result_wo_done(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + set_exception = job.set_exception = mock.Mock() + set_result = job.set_result = mock.Mock() + + job._set_future_result() + + set_exception.assert_not_called() + set_result.assert_not_called() + + def test__set_future_result_w_result_set(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = {'state': 'DONE'} + job._result_set = True + set_exception = job.set_exception = mock.Mock() + set_result = job.set_result = mock.Mock() + + job._set_future_result() + + set_exception.assert_not_called() + set_result.assert_not_called() + + def test__set_future_result_w_done_wo_result_set_w_error(self): + from google.cloud.exceptions import NotFound + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = { + 'state': 'DONE', + 'errorResult': { + 'reason': 'notFound', + 'message': 'testing' + } + } + set_exception = job.set_exception = mock.Mock() + set_result = job.set_result = mock.Mock() + + job._set_future_result() + + set_exception.assert_called_once() + args, kw = set_exception.call_args + exception, = args + self.assertIsInstance(exception, NotFound) + self.assertEqual(exception.message, 'testing') + self.assertEqual(kw, {}) + set_result.assert_not_called() + + def test__set_future_result_w_done_wo_result_set_wo_error(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = {'state': 'DONE'} + set_exception = job.set_exception = mock.Mock() + set_result = job.set_result = mock.Mock() + + job._set_future_result() + + set_exception.assert_not_called() + set_result.assert_called_once_with(job) + + def test_done_defaults_wo_state(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + reload_ = job.reload = mock.Mock() + + self.assertFalse(job.done()) + + reload_.assert_called_once_with(retry=DEFAULT_RETRY) + + def test_done_explicit_wo_state(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + reload_ = job.reload = mock.Mock() + retry = DEFAULT_RETRY.with_deadline(1) + + self.assertFalse(job.done(retry=retry)) + + reload_.assert_called_once_with(retry=retry) + + def test_done_already(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = {'state': 'DONE'} + + self.assertTrue(job.done()) + + @mock.patch('google.api_core.future.polling.PollingFuture.result') + def test_result_default_wo_state(self, result): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + begin = job._begin = mock.Mock() + + self.assertIs(job.result(), result.return_value) + + begin.assert_called_once() + result.assert_called_once_with(timeout=None) + + @mock.patch('google.api_core.future.polling.PollingFuture.result') + def test_result_explicit_w_state(self, result): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = {'state': 'DONE'} + begin = job._begin = mock.Mock() + timeout = 1 + + self.assertIs(job.result(timeout=timeout), result.return_value) + + begin.assert_not_called() + result.assert_called_once_with(timeout=timeout) + + def test_cancelled_wo_error_result(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + + self.assertFalse(job.cancelled()) + + def test_cancelled_w_error_result_not_stopped(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = { + 'errorResult': { + 'reason': 'other', + } + } + + self.assertFalse(job.cancelled()) + + def test_cancelled_w_error_result_w_stopped(self): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + job._properties['status'] = { + 'errorResult': { + 'reason': 'stopped', + } + } + + self.assertTrue(job.cancelled()) + + +class Test_JobConfig(unittest.TestCase): + JOB_TYPE = 'testing' + + @staticmethod + def _get_target_class(): + from google.cloud.bigquery import job + + return job._JobConfig + + def _make_one(self, job_type=JOB_TYPE): + return self._get_target_class()(job_type) + + def test_ctor(self): + job_config = self._make_one() + self.assertEqual(job_config._job_type, self.JOB_TYPE) + self.assertEqual(job_config._properties, {self.JOB_TYPE: {}}) + + @mock.patch('google.cloud.bigquery._helpers._get_sub_prop') + def test__get_sub_prop_wo_default(self, _get_sub_prop): + job_config = self._make_one() + key = 'key' + self.assertIs( + job_config._get_sub_prop(key), _get_sub_prop.return_value) + _get_sub_prop.assert_called_once_with( + job_config._properties, [self.JOB_TYPE, key], default=None) + + @mock.patch('google.cloud.bigquery._helpers._get_sub_prop') + def test__get_sub_prop_w_default(self, _get_sub_prop): + job_config = self._make_one() + key = 'key' + default = 'default' + self.assertIs( + job_config._get_sub_prop(key, default=default), + _get_sub_prop.return_value) + _get_sub_prop.assert_called_once_with( + job_config._properties, [self.JOB_TYPE, key], default=default) + + @mock.patch('google.cloud.bigquery._helpers._set_sub_prop') + def test__set_sub_prop(self, _set_sub_prop): + job_config = self._make_one() + key = 'key' + value = 'value' + job_config._set_sub_prop(key, value) + _set_sub_prop.assert_called_once_with( + job_config._properties, [self.JOB_TYPE, key], value) + + def test_to_api_repr(self): + job_config = self._make_one() + expected = job_config._properties = { + self.JOB_TYPE: { + 'foo': 'bar', + } + } + found = job_config.to_api_repr() + self.assertEqual(found, expected) + self.assertIsNot(found, expected) # copied + + # 'from_api_repr' cannot be tested on '_JobConfig', because it presumes + # the ctor can be called w/o arguments + + class _Base(object): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.table import TableReference @@ -257,6 +1094,8 @@ def test_api_repr(self): self.assertEqual(config.to_api_repr(), resource) def test_to_api_repr_with_encryption(self): + from google.cloud.bigquery.table import EncryptionConfiguration + config = self._make_one() config.destination_encryption_configuration = EncryptionConfiguration( kms_key_name=self.KMS_KEY_NAME) @@ -459,6 +1298,7 @@ def test_ctor(self): def test_ctor_w_config(self): from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.job import LoadJobConfig client = _make_client(project=self.PROJECT) full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') @@ -512,11 +1352,14 @@ def test_result_invokes_begin(self): self.assertEqual(reload_request[1]['method'], 'GET') def test_schema_setter_non_list(self): + from google.cloud.bigquery.job import LoadJobConfig + config = LoadJobConfig() with self.assertRaises(TypeError): config.schema = object() def test_schema_setter_invalid_field(self): + from google.cloud.bigquery.job import LoadJobConfig from google.cloud.bigquery.schema import SchemaField config = LoadJobConfig() @@ -525,6 +1368,7 @@ def test_schema_setter_invalid_field(self): config.schema = [full_name, object()] def test_schema_setter(self): + from google.cloud.bigquery.job import LoadJobConfig from google.cloud.bigquery.schema import SchemaField config = LoadJobConfig() @@ -728,6 +1572,8 @@ def test_begin_w_bound_client(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_autodetect(self): + from google.cloud.bigquery.job import LoadJobConfig + path = '/projects/{}/jobs'.format(self.PROJECT) resource = self._make_resource() resource['configuration']['load']['autodetect'] = True @@ -769,6 +1615,7 @@ def test_begin_w_autodetect(self): def test_begin_w_alternate_client(self): from google.cloud.bigquery.job import CreateDisposition + from google.cloud.bigquery.job import LoadJobConfig from google.cloud.bigquery.job import SchemaUpdateOption from google.cloud.bigquery.job import WriteDisposition from google.cloud.bigquery.schema import SchemaField @@ -1053,6 +1900,8 @@ def _get_target_class(): return CopyJobConfig def test_to_api_repr_with_encryption(self): + from google.cloud.bigquery.table import EncryptionConfiguration + config = self._make_one() config.destination_encryption_configuration = EncryptionConfiguration( kms_key_name=self.KMS_KEY_NAME) @@ -1355,6 +2204,8 @@ def test_begin_w_bound_client(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_alternate_client(self): + from google.cloud.bigquery.job import CopyJobConfig + from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import WriteDisposition PATH = '/projects/%s/jobs' % (self.PROJECT,) @@ -1682,6 +2533,8 @@ def test_from_api_repr_w_properties(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_bound_client(self): + from google.cloud.bigquery.dataset import DatasetReference + PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._make_resource() # Ensure None for missing server-set props @@ -1720,8 +2573,10 @@ def test_begin_w_bound_client(self): self._verifyResourceProperties(job, RESOURCE) def test_begin_w_alternate_client(self): + from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.job import Compression from google.cloud.bigquery.job import DestinationFormat + from google.cloud.bigquery.job import ExtractJobConfig PATH = '/projects/%s/jobs' % (self.PROJECT,) RESOURCE = self._make_resource(ended=True) @@ -1801,6 +2656,8 @@ def test_exists_hit_w_alternate_client(self): query_params={'fields': 'id'}) def test_reload_w_bound_client(self): + from google.cloud.bigquery.dataset import DatasetReference + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() conn = _make_connection(RESOURCE) @@ -1817,6 +2674,8 @@ def test_reload_w_bound_client(self): self._verifyResourceProperties(job, RESOURCE) def test_reload_w_alternate_client(self): + from google.cloud.bigquery.dataset import DatasetReference + PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_ID) RESOURCE = self._make_resource() conn1 = _make_connection() @@ -1885,6 +2744,8 @@ def test_from_api_repr_empty(self): self.assertIsNone(config.destination_encryption_configuration) def test_from_api_repr_normal(self): + from google.cloud.bigquery.dataset import DatasetReference + resource = { 'query': { 'useLegacySql': True, @@ -1914,6 +2775,8 @@ def test_from_api_repr_normal(self): 'I should be saved, too.') def test_to_api_repr_normal(self): + from google.cloud.bigquery.dataset import DatasetReference + config = self._make_one() config.use_legacy_sql = True config.default_dataset = DatasetReference( @@ -1934,6 +2797,8 @@ def test_to_api_repr_normal(self): resource['someNewProperty'], 'Woohoo, alpha stuff.') def test_to_api_repr_with_encryption(self): + from google.cloud.bigquery.table import EncryptionConfiguration + config = self._make_one() config.destination_encryption_configuration = EncryptionConfiguration( kms_key_name=self.KMS_KEY_NAME) @@ -2290,6 +3155,7 @@ def test_done(self): self.assertTrue(job.done()) def test_query_plan(self): + from google.cloud._helpers import _RFC3339_MICROS from google.cloud.bigquery.job import QueryPlanEntry from google.cloud.bigquery.job import QueryPlanEntryStep @@ -2694,6 +3560,22 @@ def test_undeclared_query_parameters(self): self.assertEqual(struct.struct_types, {'count': 'INT64'}) self.assertEqual(struct.struct_values, {'count': 123}) + def test_estimated_bytes_processed(self): + est_bytes = 123456 + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, self.QUERY, client) + self.assertIsNone(job.estimated_bytes_processed) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.estimated_bytes_processed) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.estimated_bytes_processed) + + query_stats['estimatedBytesProcessed'] = str(est_bytes) + self.assertEqual(job.estimated_bytes_processed, est_bytes) + def test_result(self): query_resource = { 'jobComplete': True, @@ -3551,6 +4433,8 @@ def test_from_api_repr_normal(self): self.assertEqual(entry.steps, steps) def test_start(self): + from google.cloud._helpers import _RFC3339_MICROS + klass = self._get_target_class() entry = klass.from_api_repr({}) @@ -3564,6 +4448,8 @@ def test_start(self): self.START_RFC3339_MICROS) def test_end(self): + from google.cloud._helpers import _RFC3339_MICROS + klass = self._get_target_class() entry = klass.from_api_repr({})