diff --git a/gcloud/datastore/batch.py b/gcloud/datastore/batch.py index ae749c43957f..f70f26a0d985 100644 --- a/gcloud/datastore/batch.py +++ b/gcloud/datastore/batch.py @@ -60,11 +60,12 @@ class Batch(object): :type client: :class:`gcloud.datastore.client.Client` :param client: The client used to connect to datastore. """ + _id = None # "protected" attribute, always None for non-transactions def __init__(self, client): self._client = client - self._mutation = _datastore_pb2.Mutation() + self._commit_request = _datastore_pb2.CommitRequest() self._partial_key_entities = [] def current(self): @@ -114,6 +115,9 @@ def _add_complete_key_entity_pb(self): :returns: The newly created entity protobuf that will be updated and sent with a commit. """ + # We use ``upsert`` for entities with completed keys, rather than + # ``insert`` or ``update``, in order not to create race conditions + # based on prior existence / removal of the entity. return self.mutations.upsert.add() def _add_delete_key_pb(self): @@ -129,17 +133,16 @@ def _add_delete_key_pb(self): def mutations(self): """Getter for the changes accumulated by this batch. - Every batch is committed with a single Mutation - representing the 'work' to be done as part of the batch. - Inside a batch, calling :meth:`put` with an entity, or - :meth:`delete` with a key, builds up the mutation. - This getter returns the Mutation protobuf that - has been built-up so far. + Every batch is committed with a single commit request containing all + the work to be done as mutations. Inside a batch, calling :meth:`put` + with an entity, or :meth:`delete` with a key, builds up the request by + adding a new mutation. This getter returns the protobuf that has been + built-up so far. :rtype: :class:`gcloud.datastore._generated.datastore_pb2.Mutation` :returns: The Mutation protobuf to be sent in the commit request. """ - return self._mutation + return self._commit_request.mutation def put(self, entity): """Remember an entity's state to be saved during :meth:`commit`. @@ -156,8 +159,8 @@ def put(self, entity): "bytes" ('str' in Python2, 'bytes' in Python3) map to 'blob_value'. When an entity has a partial key, calling :meth:`commit` sends it as - an ``insert_auto_id`` mutation and the key is completed. On return, the - key for the ``entity`` passed in as updated to match the key ID + an ``insert_auto_id`` mutation and the key is completed. On return, + the key for the ``entity`` passed in is updated to match the key ID assigned by the server. :type entity: :class:`gcloud.datastore.entity.Entity` @@ -212,11 +215,10 @@ def commit(self): context manager. """ _, updated_keys = self.connection.commit( - self.dataset_id, self.mutations, self._id) + self.dataset_id, self._commit_request, self._id) # If the back-end returns without error, we are guaranteed that - # the response's 'insert_auto_id_key' will match (length and order) - # the request's 'insert_auto_id` entities, which are derived from - # our '_partial_key_entities' (no partial success). + # :meth:`Connection.commit` will return keys that match (length and + # order) directly ``_partial_key_entities``. for new_key_pb, entity in zip(updated_keys, self._partial_key_entities): new_id = new_key_pb.path_element[-1].id diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index e3f2164fdc84..6aedb533d44e 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -293,16 +293,16 @@ def begin_transaction(self, dataset_id): _datastore_pb2.BeginTransactionResponse) return response.transaction - def commit(self, dataset_id, mutation_pb, transaction_id): - """Commit dataset mutations in context of current transation (if any). + def commit(self, dataset_id, commit_request, transaction_id): + """Commit mutations in context of current transation (if any). Maps the ``DatastoreService.Commit`` protobuf RPC. :type dataset_id: string :param dataset_id: The ID dataset to which the transaction applies. - :type mutation_pb: :class:`._generated.datastore_pb2.Mutation` - :param mutation_pb: The protobuf for the mutations being saved. + :type commit_request: :class:`._generated.datastore_pb2.CommitRequest` + :param commit_request: The protobuf with the mutations being committed. :type transaction_id: string or None :param transaction_id: The transaction ID returned from @@ -315,6 +315,7 @@ def commit(self, dataset_id, mutation_pb, transaction_id): that was completed in the commit. """ request = _datastore_pb2.CommitRequest() + request.CopyFrom(commit_request) if transaction_id: request.mode = _datastore_pb2.CommitRequest.TRANSACTIONAL @@ -322,7 +323,6 @@ def commit(self, dataset_id, mutation_pb, transaction_id): else: request.mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL - request.mutation.CopyFrom(mutation_pb) response = self._rpc(dataset_id, 'commit', request, _datastore_pb2.CommitResponse) return _parse_commit_response(response) diff --git a/gcloud/datastore/test_batch.py b/gcloud/datastore/test_batch.py index 12474be53aaf..065b0b7a5d62 100644 --- a/gcloud/datastore/test_batch.py +++ b/gcloud/datastore/test_batch.py @@ -209,7 +209,7 @@ def test_commit(self): batch.commit() self.assertEqual(connection._committed, - [(_DATASET, batch.mutations, None)]) + [(_DATASET, batch._commit_request, None)]) def test_commit_w_partial_key_entities(self): _DATASET = 'DATASET' @@ -225,7 +225,7 @@ def test_commit_w_partial_key_entities(self): batch.commit() self.assertEqual(connection._committed, - [(_DATASET, batch.mutations, None)]) + [(_DATASET, batch._commit_request, None)]) self.assertFalse(entity.key.is_partial) self.assertEqual(entity.key._id, _NEW_ID) @@ -248,7 +248,7 @@ def test_as_context_mgr_wo_error(self): mutated_entity = _mutated_pb(self, batch.mutations, 'upsert') self.assertEqual(mutated_entity.key, key._key) self.assertEqual(connection._committed, - [(_DATASET, batch.mutations, None)]) + [(_DATASET, batch._commit_request, None)]) def test_as_context_mgr_nested(self): _DATASET = 'DATASET' @@ -280,8 +280,8 @@ def test_as_context_mgr_nested(self): self.assertEqual(mutated_entity2.key, key2._key) self.assertEqual(connection._committed, - [(_DATASET, batch2.mutations, None), - (_DATASET, batch1.mutations, None)]) + [(_DATASET, batch2._commit_request, None), + (_DATASET, batch1._commit_request, None)]) def test_as_context_mgr_w_error(self): _DATASET = 'DATASET' @@ -329,8 +329,8 @@ def __init__(self, *new_keys): self._committed = [] self._index_updates = 0 - def commit(self, dataset_id, mutation, transaction_id): - self._committed.append((dataset_id, mutation, transaction_id)) + def commit(self, dataset_id, commit_request, transaction_id): + self._committed.append((dataset_id, commit_request, transaction_id)) return self._index_updates, self._completed_keys diff --git a/gcloud/datastore/test_client.py b/gcloud/datastore/test_client.py index 6713d8a964e0..85c346ea0c5c 100644 --- a/gcloud/datastore/test_client.py +++ b/gcloud/datastore/test_client.py @@ -625,9 +625,10 @@ def test_put_multi_no_batch_w_partial_key(self): self.assertTrue(result is None) self.assertEqual(len(client.connection._commit_cw), 1) - dataset_id, mutation, transaction_id = client.connection._commit_cw[0] + (dataset_id, + commit_req, transaction_id) = client.connection._commit_cw[0] self.assertEqual(dataset_id, self.DATASET_ID) - inserts = list(mutation.insert_auto_id) + inserts = list(commit_req.mutation.insert_auto_id) self.assertEqual(len(inserts), 1) self.assertEqual(inserts[0].key, key.to_protobuf()) @@ -697,9 +698,10 @@ def test_delete_multi_no_batch(self): result = client.delete_multi([key]) self.assertEqual(result, None) self.assertEqual(len(client.connection._commit_cw), 1) - dataset_id, mutation, transaction_id = client.connection._commit_cw[0] + (dataset_id, + commit_req, transaction_id) = client.connection._commit_cw[0] self.assertEqual(dataset_id, self.DATASET_ID) - self.assertEqual(list(mutation.delete), [key.to_protobuf()]) + self.assertEqual(list(commit_req.mutation.delete), [key.to_protobuf()]) self.assertTrue(transaction_id is None) def test_delete_multi_w_existing_batch(self): @@ -1012,8 +1014,8 @@ def lookup(self, dataset_id, key_pbs, eventual=False, transaction_id=None): results, missing, deferred = triple return results, missing, deferred - def commit(self, dataset_id, mutation, transaction_id): - self._commit_cw.append((dataset_id, mutation, transaction_id)) + def commit(self, dataset_id, commit_request, transaction_id): + self._commit_cw.append((dataset_id, commit_request, transaction_id)) response, self._commit = self._commit[0], self._commit[1:] return self._index_updates, response diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index 11cbb030d4ba..28924cf2a5f3 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -675,7 +675,8 @@ def test_commit_wo_transaction(self): DATASET_ID = 'DATASET' key_pb = self._make_key_pb(DATASET_ID) rsp_pb = datastore_pb2.CommitResponse() - mutation = datastore_pb2.Mutation() + req_pb = datastore_pb2.CommitRequest() + mutation = req_pb.mutation insert = mutation.upsert.add() insert.key.CopyFrom(key_pb) value_pb = _new_value_pb(insert, 'foo') @@ -700,7 +701,7 @@ def mock_parse(response): return expected_result with _Monkey(MUT, _parse_commit_response=mock_parse): - result = conn.commit(DATASET_ID, mutation, None) + result = conn.commit(DATASET_ID, req_pb, None) self.assertTrue(result is expected_result) cw = http._called_with @@ -722,7 +723,8 @@ def test_commit_w_transaction(self): DATASET_ID = 'DATASET' key_pb = self._make_key_pb(DATASET_ID) rsp_pb = datastore_pb2.CommitResponse() - mutation = datastore_pb2.Mutation() + req_pb = datastore_pb2.CommitRequest() + mutation = req_pb.mutation insert = mutation.upsert.add() insert.key.CopyFrom(key_pb) value_pb = _new_value_pb(insert, 'foo') @@ -747,7 +749,7 @@ def mock_parse(response): return expected_result with _Monkey(MUT, _parse_commit_response=mock_parse): - result = conn.commit(DATASET_ID, mutation, b'xact') + result = conn.commit(DATASET_ID, req_pb, b'xact') self.assertTrue(result is expected_result) cw = http._called_with diff --git a/gcloud/datastore/test_transaction.py b/gcloud/datastore/test_transaction.py index beb1ce0f76d9..43e03fec771f 100644 --- a/gcloud/datastore/test_transaction.py +++ b/gcloud/datastore/test_transaction.py @@ -102,10 +102,11 @@ def test_commit_no_partial_keys(self): connection = _Connection(234) client = _Client(_DATASET, connection) xact = self._makeOne(client) - xact._mutation = mutation = object() + xact._commit_request = commit_request = object() xact.begin() xact.commit() - self.assertEqual(connection._committed, (_DATASET, mutation, 234)) + self.assertEqual(connection._committed, + (_DATASET, commit_request, 234)) self.assertEqual(xact.id, None) def test_commit_w_partial_keys(self): @@ -118,10 +119,11 @@ def test_commit_w_partial_keys(self): xact = self._makeOne(client) entity = _Entity() xact.put(entity) - xact._mutation = mutation = object() + xact._commit_request = commit_request = object() xact.begin() xact.commit() - self.assertEqual(connection._committed, (_DATASET, mutation, 234)) + self.assertEqual(connection._committed, + (_DATASET, commit_request, 234)) self.assertEqual(xact.id, None) self.assertEqual(entity.key.path, [{'kind': _KIND, 'id': _ID}]) @@ -130,11 +132,12 @@ def test_context_manager_no_raise(self): connection = _Connection(234) client = _Client(_DATASET, connection) xact = self._makeOne(client) - xact._mutation = mutation = object() + xact._commit_request = commit_request = object() with xact: self.assertEqual(xact.id, 234) self.assertEqual(connection._begun, _DATASET) - self.assertEqual(connection._committed, (_DATASET, mutation, 234)) + self.assertEqual(connection._committed, + (_DATASET, commit_request, 234)) self.assertEqual(xact.id, None) def test_context_manager_w_raise(self): @@ -186,8 +189,8 @@ def begin_transaction(self, dataset_id): def rollback(self, dataset_id, transaction_id): self._rolled_back = dataset_id, transaction_id - def commit(self, dataset_id, mutation, transaction_id): - self._committed = (dataset_id, mutation, transaction_id) + def commit(self, dataset_id, commit_request, transaction_id): + self._committed = (dataset_id, commit_request, transaction_id) return self._index_updates, self._completed_keys