diff --git a/google/cloud/spanner_admin_database_v1/gapic_metadata.json b/google/cloud/spanner_admin_database_v1/gapic_metadata.json index b0fb4f1384..977d7a32f5 100644 --- a/google/cloud/spanner_admin_database_v1/gapic_metadata.json +++ b/google/cloud/spanner_admin_database_v1/gapic_metadata.json @@ -321,6 +321,106 @@ ] } } + }, + "rest": { + "libraryClient": "DatabaseAdminClient", + "rpcs": { + "CopyBackup": { + "methods": [ + "copy_backup" + ] + }, + "CreateBackup": { + "methods": [ + "create_backup" + ] + }, + "CreateDatabase": { + "methods": [ + "create_database" + ] + }, + "DeleteBackup": { + "methods": [ + "delete_backup" + ] + }, + "DropDatabase": { + "methods": [ + "drop_database" + ] + }, + "GetBackup": { + "methods": [ + "get_backup" + ] + }, + "GetDatabase": { + "methods": [ + "get_database" + ] + }, + "GetDatabaseDdl": { + "methods": [ + "get_database_ddl" + ] + }, + "GetIamPolicy": { + "methods": [ + "get_iam_policy" + ] + }, + "ListBackupOperations": { + "methods": [ + "list_backup_operations" + ] + }, + "ListBackups": { + "methods": [ + "list_backups" + ] + }, + "ListDatabaseOperations": { + "methods": [ + "list_database_operations" + ] + }, + "ListDatabaseRoles": { + "methods": [ + "list_database_roles" + ] + }, + "ListDatabases": { + "methods": [ + "list_databases" + ] + }, + "RestoreDatabase": { + "methods": [ + "restore_database" + ] + }, + "SetIamPolicy": { + "methods": [ + "set_iam_policy" + ] + }, + "TestIamPermissions": { + "methods": [ + "test_iam_permissions" + ] + }, + "UpdateBackup": { + "methods": [ + "update_backup" + ] + }, + "UpdateDatabaseDdl": { + "methods": [ + "update_database_ddl" + ] + } + } } } } diff --git a/google/cloud/spanner_admin_database_v1/services/database_admin/transports/rest.py b/google/cloud/spanner_admin_database_v1/services/database_admin/transports/rest.py index bd35307fcc..5aaedde91c 100644 --- a/google/cloud/spanner_admin_database_v1/services/database_admin/transports/rest.py +++ b/google/cloud/spanner_admin_database_v1/services/database_admin/transports/rest.py @@ -28,7 +28,6 @@ from google.protobuf import json_format from google.api_core import operations_v1 -from google.longrunning import operations_pb2 from requests import __version__ as requests_version import dataclasses import re @@ -46,8 +45,8 @@ from google.cloud.spanner_admin_database_v1.types import spanner_database_admin from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore -from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore +from google.longrunning import operations_pb2 # type: ignore from .base import ( DatabaseAdminTransport, diff --git a/google/cloud/spanner_admin_instance_v1/services/instance_admin/transports/rest.py b/google/cloud/spanner_admin_instance_v1/services/instance_admin/transports/rest.py index c743fa011d..2ba6d65087 100644 --- a/google/cloud/spanner_admin_instance_v1/services/instance_admin/transports/rest.py +++ b/google/cloud/spanner_admin_instance_v1/services/instance_admin/transports/rest.py @@ -43,8 +43,8 @@ from google.cloud.spanner_admin_instance_v1.types import spanner_instance_admin from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore -from google.longrunning import operations_pb2 # type: ignore from google.protobuf import empty_pb2 # type: ignore +from google.longrunning import operations_pb2 # type: ignore from .base import ( InstanceAdminTransport, @@ -505,10 +505,12 @@ class InstanceAdminRestTransport(InstanceAdminTransport): """REST backend transport for InstanceAdmin. Cloud Spanner Instance Admin API + The Cloud Spanner Instance Admin API can be used to create, delete, modify and list instances. Instances are dedicated Cloud Spanner serving and storage resources to be used by Cloud Spanner databases. + Each instance has a "configuration", which dictates where the serving resources for the Cloud Spanner instance are located (e.g., US-central, Europe). Configurations are created by Google diff --git a/google/cloud/spanner_v1/__init__.py b/google/cloud/spanner_v1/__init__.py index 039919563f..a1b3bb143a 100644 --- a/google/cloud/spanner_v1/__init__.py +++ b/google/cloud/spanner_v1/__init__.py @@ -38,6 +38,7 @@ from .types.spanner import CommitRequest from .types.spanner import CreateSessionRequest from .types.spanner import DeleteSessionRequest +from .types.spanner import DirectedReadOptions from .types.spanner import ExecuteBatchDmlRequest from .types.spanner import ExecuteBatchDmlResponse from .types.spanner import ExecuteSqlRequest @@ -77,6 +78,7 @@ This value can only be used for timestamp columns that have set the option ``(allow_commit_timestamp=true)`` in the schema. """ +TransactionTypes = DirectedReadOptions.ReplicaSelection.Type __all__ = ( @@ -96,6 +98,7 @@ "TransactionPingingPool", # local "COMMIT_TIMESTAMP", + "TransactionTypes", # google.cloud.spanner_v1.types "BatchCreateSessionsRequest", "BatchCreateSessionsResponse", @@ -104,6 +107,7 @@ "CommitResponse", "CreateSessionRequest", "DeleteSessionRequest", + "DirectedReadOptions", "ExecuteBatchDmlRequest", "ExecuteBatchDmlResponse", "ExecuteSqlRequest", diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index e0e2bfdbd0..b7b6f0bb9e 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -28,6 +28,8 @@ from google.cloud.spanner_v1 import TypeCode from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import JsonObject +from google.cloud.spanner_v1 import DirectedReadOptions +from google.api_core.exceptions import InvalidArgument # Validation error messages NUMERIC_MAX_SCALE_ERR_MSG = ( @@ -358,3 +360,51 @@ def _metadata_with_leader_aware_routing(value, **kw): List[Tuple[str, str]]: RPC metadata with leader aware routing header """ return ("x-goog-spanner-route-to-leader", str(value).lower()) + + +def verify_directed_read_options(directed_read_options): + """Verifies if value of directed_read_options is correct. + :type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: directed_read_options for ReadRequests and ExecuteSqlRequests. + + :raises InvalidArguement: if directed_read_options is incorrect. + """ + if type(directed_read_options) == dict: + if ( + "include_replicas" in directed_read_options.keys() + and "exclude_replicas" in directed_read_options.keys() + ): + raise InvalidArgument( + "Only one of include_replicas or exclude_replicas can be set" + ) + if ( + "include_replicas" in directed_read_options.keys() + and "replica_selections" in directed_read_options["include_replicas"].keys() + and len(directed_read_options["include_replicas"]["replica_selections"]) + > 10 + ) or ( + "exclude_replicas" in directed_read_options.keys() + and "replica_selections" in directed_read_options["exclude_replicas"].keys() + and len(directed_read_options["exclude_replicas"]["replica_selections"]) + > 10 + ): + raise InvalidArgument("Maximum length of replica selection allowed is 10") + elif isinstance(directed_read_options, DirectedReadOptions): + if ( + directed_read_options.include_replicas is not None + and directed_read_options.exclude_replicas is not None + ): + raise InvalidArgument( + "Only one of include_replicas or exclude_replicas can be set" + ) + if ( + directed_read_options.include_replicas is not None + and directed_read_options.include_replicas.replica_selections is not None + and len(directed_read_options.include_replicas.replica_selections) > 10 + ) or ( + directed_read_options.include_replicas is not None + and directed_read_options.exclude_replicas.replica_selections is not None + and len(directed_read_options.exclude_replicas.replica_selections) > 10 + ): + raise InvalidArgument("Maximum length of replica selection allowed is 10") diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index a0e848228b..8a36e8ef4d 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -47,6 +47,7 @@ from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._helpers import verify_directed_read_options from google.cloud.spanner_v1.instance import Instance _CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__) @@ -120,6 +121,12 @@ class Client(ClientWithProject): disable leader aware routing. Disabling leader aware routing would route all requests in RW/PDML transactions to the closest region. + :type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Client options used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas + or regions should be used for non-transactional reads or queries. + :raises: :class:`ValueError ` if both ``read_only`` and ``admin`` are :data:`True` """ @@ -139,6 +146,7 @@ def __init__( client_options=None, query_options=None, route_to_leader_enabled=True, + directed_read_options=None, ): self._emulator_host = _get_spanner_emulator_host() @@ -180,6 +188,9 @@ def __init__( self._route_to_leader_enabled = route_to_leader_enabled + verify_directed_read_options(directed_read_options) + self._directed_read_options = directed_read_options + @property def credentials(self): """Getter for client's credentials. @@ -260,6 +271,17 @@ def route_to_leader_enabled(self): """ return self._route_to_leader_enabled + @property + def directed_read_options(self): + """Getter for directed_read_options. + + :rtype: + :class:`~googlecloud.spanner_v1.types.DirectedReadOptions` + or :class:`dict` + :returns: The directed_read_options for the client. + """ + return self._directed_read_options + def copy(self): """Make a copy of this client. @@ -383,3 +405,13 @@ def list_instances(self, filter_="", page_size=None): request=request, metadata=metadata ) return page_iter + + def set_directed_read_options(self, directed_read_options): + """Sets directed_read_options for the client + :type directed_read_options: :class:`~google.cloud.spanner_v1.types.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: Client options used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas + or regions should be used for non-transactional reads or queries. + """ + self._directed_read_options = directed_read_options diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 1d211f7d6d..81c144833e 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -166,6 +166,7 @@ def __init__( self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled self._enable_drop_protection = enable_drop_protection self._reconciling = False + self._directed_read_options = self._instance._client.directed_read_options if pool is None: pool = BurstyPool(database_role=database_role) diff --git a/google/cloud/spanner_v1/services/spanner/transports/rest.py b/google/cloud/spanner_v1/services/spanner/transports/rest.py index 83abd878df..d7157886a5 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/rest.py +++ b/google/cloud/spanner_v1/services/spanner/transports/rest.py @@ -493,6 +493,7 @@ class SpannerRestTransport(SpannerTransport): """REST backend transport for Spanner. Cloud Spanner API + The Cloud Spanner API can be used to manage sessions and execute transactions on data stored in Cloud Spanner databases. diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 573042aa11..e8bca4f99d 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -28,6 +28,7 @@ from google.api_core.exceptions import InternalServerError from google.api_core.exceptions import ServiceUnavailable from google.api_core.exceptions import InvalidArgument +from google.api_core.exceptions import BadRequest from google.api_core import gapic_v1 from google.cloud.spanner_v1._helpers import ( _make_value_pb, @@ -41,6 +42,7 @@ from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1._helpers import verify_directed_read_options _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = ( "RST_STREAM", @@ -176,6 +178,7 @@ def read( *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + directed_read_options=None, ): """Perform a ``StreamingRead`` API request for rows in a table. @@ -224,12 +227,22 @@ def read( ``partition_token``, the API will return an ``INVALID_ARGUMENT`` error. + :type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Client options used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas + or regions should be used for non-transactional reads or queries. + :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet` :returns: a result set instance which can be used to consume rows. :raises ValueError: for reuse of single-use snapshots, or if a transaction ID is already pending for multiple-use snapshots. + + :raises InvalidArguement: if directed_read_options is incorrect. + + :raises BadRequest: if directed_read_options are set for READ-WRITE Transaction or PDML. """ if self._read_request_count > 0: if not self._multi_use: @@ -253,8 +266,19 @@ def read( if self._read_only: # Transaction tags are not supported for read only transactions. request_options.transaction_tag = None - elif self.transaction_tag is not None: - request_options.transaction_tag = self.transaction_tag + if ( + directed_read_options is None + and database._directed_read_options is not None + ): + directed_read_options = database._directed_read_options + else: + if self.transaction_tag is not None: + request_options.transaction_tag = self.transaction_tag + if directed_read_options is not None: + raise BadRequest( + "directed_read_options can't be set for readWrite transactions or partitioned dml requests" + ) + verify_directed_read_options(directed_read_options) request = ReadRequest( session=self._session.name, @@ -266,6 +290,7 @@ def read( partition_token=partition, request_options=request_options, data_boost_enabled=data_boost_enabled, + directed_read_options=directed_read_options, ) restart = functools.partial( api.streaming_read, @@ -322,6 +347,7 @@ def execute_sql( retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, data_boost_enabled=False, + directed_read_options=None, ): """Perform an ``ExecuteStreamingSql`` API request. @@ -379,9 +405,19 @@ def execute_sql( ``partition_token``, the API will return an ``INVALID_ARGUMENT`` error. + :type directed_read_options: :class:`~googlecloud.spanner_v1.types.DirectedReadOptions` + or :class:`dict` + :param directed_read_options: (Optional) Client options used to set the directed_read_options + for all ReadRequests and ExecuteSqlRequests for the Client which indicate which replicas + or regions should be used for non-transactional reads or queries. + :raises ValueError: for reuse of single-use snapshots, or if a transaction ID is already pending for multiple-use snapshots. + + :raises InvalidArguement: if directed_read_options is incorrect. + + :raises BadRequest: if directed_read_options are set for READ-WRITE Transaction or PDML. """ if self._read_request_count > 0: if not self._multi_use: @@ -419,8 +455,19 @@ def execute_sql( if self._read_only: # Transaction tags are not supported for read only transactions. request_options.transaction_tag = None - elif self.transaction_tag is not None: - request_options.transaction_tag = self.transaction_tag + if ( + directed_read_options is None + and database._directed_read_options is not None + ): + directed_read_options = database._directed_read_options + else: + if self.transaction_tag is not None: + request_options.transaction_tag = self.transaction_tag + if directed_read_options is not None: + raise BadRequest( + "directed_read_options can't be set for readWrite transactions or partitioned dml requests" + ) + verify_directed_read_options(directed_read_options) request = ExecuteSqlRequest( session=self._session.name, @@ -433,6 +480,7 @@ def execute_sql( query_options=query_options, request_options=request_options, data_boost_enabled=data_boost_enabled, + directed_read_options=directed_read_options, ) restart = functools.partial( api.execute_streaming_sql, diff --git a/google/cloud/spanner_v1/types/__init__.py b/google/cloud/spanner_v1/types/__init__.py index df0960d9d9..edff53badd 100644 --- a/google/cloud/spanner_v1/types/__init__.py +++ b/google/cloud/spanner_v1/types/__init__.py @@ -40,6 +40,7 @@ CommitRequest, CreateSessionRequest, DeleteSessionRequest, + DirectedReadOptions, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest, @@ -85,6 +86,7 @@ "CommitRequest", "CreateSessionRequest", "DeleteSessionRequest", + "DirectedReadOptions", "ExecuteBatchDmlRequest", "ExecuteBatchDmlResponse", "ExecuteSqlRequest", diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index b69e61012e..eab5945ca3 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -41,6 +41,7 @@ "ListSessionsResponse", "DeleteSessionRequest", "RequestOptions", + "DirectedReadOptions", "ExecuteSqlRequest", "ExecuteBatchDmlRequest", "ExecuteBatchDmlResponse", @@ -379,6 +380,153 @@ class Priority(proto.Enum): ) +class DirectedReadOptions(proto.Message): + r"""The DirectedReadOptions can be used to indicate which + replicas or regions should be used for non-transactional reads + or queries. + + Not all requests can be sent to non-leader replicas. In + particular, some requests such as reads within read-write + transactions must be sent to a designated leader replica. These + requests ignores DirectedReadOptions. + + This message has `oneof`_ fields (mutually exclusive fields). + For each oneof, at most one member field can be set at the same time. + Setting any member of the oneof automatically clears all other + members. + + .. _oneof: https://proto-plus-python.readthedocs.io/en/stable/fields.html#oneofs-mutually-exclusive-fields + + Attributes: + include_replicas (google.cloud.spanner_v1.types.DirectedReadOptions.IncludeReplicas): + Include_replicas indicates the order of replicas (as they + appear in this list) to process the request. If all replicas + are exhausted without finding a healthy replica, Spanner + will wait for a replica in the list to become available, + requests may fail due to ``DEADLINE_EXCEEDED`` errors. In + particular, the request will never be sent to a region or + replica which does not appear in the list. + + This field is a member of `oneof`_ ``replicas``. + exclude_replicas (google.cloud.spanner_v1.types.DirectedReadOptions.ExcludeReplicas): + Exclude_replicas indicates that should be excluded from + serving requests. Spanner will not route requests to the + replicas in this list. + + This field is a member of `oneof`_ ``replicas``. + """ + + class ReplicaSelection(proto.Message): + r"""The directed read replica selector. Callers must provide one or more + of the following fields for replica selection: + + - ``location`` - The location must be one of the regions within the + multi-region configuration of your database. + - ``type`` - The type of the replica. + + Some examples of using replica_selectors are: + + - ``location:us-east1`` --> The "us-east1" replica(s) of any + available type will be used to process the request. + - ``type:READ_ONLY`` --> The "READ_ONLY" type replica(s) in nearest + . available location will be used to process the request. + - ``location:us-east1 type:READ_ONLY`` --> The "READ_ONLY" type + replica(s) in location "us-east1" will be used to process the + request. + + Attributes: + location (str): + The location or region of the serving + requests, e.g. "us-east1". + type_ (google.cloud.spanner_v1.types.DirectedReadOptions.ReplicaSelection.Type): + The type of replica. + """ + + class Type(proto.Enum): + r"""Indicates the type of replica. + + Values: + TYPE_UNSPECIFIED (0): + Not specified. + READ_WRITE (1): + Read-write replicas support both reads and + writes. + READ_ONLY (2): + Read-only replicas only support reads (not + writes). + """ + TYPE_UNSPECIFIED = 0 + READ_WRITE = 1 + READ_ONLY = 2 + + location: str = proto.Field( + proto.STRING, + number=1, + ) + type_: "DirectedReadOptions.ReplicaSelection.Type" = proto.Field( + proto.ENUM, + number=2, + enum="DirectedReadOptions.ReplicaSelection.Type", + ) + + class IncludeReplicas(proto.Message): + r"""An IncludeReplicas contains a repeated set of + ReplicaSelection which indicates the order in which replicas + should be considered. + + Attributes: + replica_selections (MutableSequence[google.cloud.spanner_v1.types.DirectedReadOptions.ReplicaSelection]): + The directed read replica selector. + auto_failover (bool): + If true, Spanner will route requests to healthy replica + outside the list when all the replicas in the + include_replicas list are unavailable or unhealthy. Default + value is ``false``. + """ + + replica_selections: MutableSequence[ + "DirectedReadOptions.ReplicaSelection" + ] = proto.RepeatedField( + proto.MESSAGE, + number=1, + message="DirectedReadOptions.ReplicaSelection", + ) + auto_failover: bool = proto.Field( + proto.BOOL, + number=2, + ) + + class ExcludeReplicas(proto.Message): + r"""An ExcludeReplicas contains a repeated set of + ReplicaSelection that should be excluded from serving requests. + + Attributes: + replica_selections (MutableSequence[google.cloud.spanner_v1.types.DirectedReadOptions.ReplicaSelection]): + The directed read replica selector. + """ + + replica_selections: MutableSequence[ + "DirectedReadOptions.ReplicaSelection" + ] = proto.RepeatedField( + proto.MESSAGE, + number=1, + message="DirectedReadOptions.ReplicaSelection", + ) + + include_replicas: IncludeReplicas = proto.Field( + proto.MESSAGE, + number=1, + oneof="replicas", + message=IncludeReplicas, + ) + exclude_replicas: ExcludeReplicas = proto.Field( + proto.MESSAGE, + number=2, + oneof="replicas", + message=ExcludeReplicas, + ) + + class ExecuteSqlRequest(proto.Message): r"""The request for [ExecuteSql][google.spanner.v1.Spanner.ExecuteSql] and @@ -390,6 +538,7 @@ class ExecuteSqlRequest(proto.Message): should be performed. transaction (google.cloud.spanner_v1.types.TransactionSelector): The transaction to use. + For queries, if none is provided, the default is a temporary read-only transaction with strong concurrency. @@ -399,6 +548,7 @@ class ExecuteSqlRequest(proto.Message): single-use transactions are not supported. The caller must either supply an existing transaction ID or begin a new transaction. + Partitioned DML requires an existing Partitioned DML transaction ID. sql (str): @@ -469,6 +619,7 @@ class ExecuteSqlRequest(proto.Message): sequence number, the transaction may be aborted. Replays of previously handled requests will yield the same response as the first execution. + Required for DML statements. Ignored for queries. query_options (google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryOptions): @@ -476,6 +627,8 @@ class ExecuteSqlRequest(proto.Message): given query. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. + directed_read_options (google.cloud.spanner_v1.types.DirectedReadOptions): + Directed read options for this request. data_boost_enabled (bool): If this is for a partitioned query and this field is set to ``true``, the request will be executed via Spanner @@ -623,6 +776,11 @@ class QueryOptions(proto.Message): number=11, message="RequestOptions", ) + directed_read_options: "DirectedReadOptions" = proto.Field( + proto.MESSAGE, + number=15, + message="DirectedReadOptions", + ) data_boost_enabled: bool = proto.Field( proto.BOOL, number=16, @@ -1137,6 +1295,8 @@ class ReadRequest(proto.Message): create this partition_token. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. + directed_read_options (google.cloud.spanner_v1.types.DirectedReadOptions): + Directed read options for this request. data_boost_enabled (bool): If this is for a partitioned read and this field is set to ``true``, the request will be executed via Spanner @@ -1190,6 +1350,11 @@ class ReadRequest(proto.Message): number=11, message="RequestOptions", ) + directed_read_options: "DirectedReadOptions" = proto.Field( + proto.MESSAGE, + number=14, + message="DirectedReadOptions", + ) data_boost_enabled: bool = proto.Field( proto.BOOL, number=15, diff --git a/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json b/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json index 0ede9fccff..11932ae5e8 100644 --- a/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json +++ b/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-spanner-admin-database", - "version": "3.40.1" + "version": "0.1.0" }, "snippets": [ { diff --git a/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json b/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json index 76f704e8fb..9572d4d727 100644 --- a/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json +++ b/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-spanner-admin-instance", - "version": "3.40.1" + "version": "0.1.0" }, "snippets": [ { diff --git a/samples/generated_samples/snippet_metadata_google.spanner.v1.json b/samples/generated_samples/snippet_metadata_google.spanner.v1.json index a645b19356..a8e8be3ae3 100644 --- a/samples/generated_samples/snippet_metadata_google.spanner.v1.json +++ b/samples/generated_samples/snippet_metadata_google.spanner.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-spanner", - "version": "3.40.1" + "version": "0.1.0" }, "snippets": [ { diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index 82fb95a0dd..75dd236042 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -31,6 +31,7 @@ from google.cloud import spanner from google.cloud.spanner_admin_instance_v1.types import spanner_instance_admin from google.cloud.spanner_v1 import param_types +from google.cloud.spanner_v1 import TransactionTypes from google.type import expr_pb2 from google.iam.v1 import policy_pb2 from google.cloud.spanner_v1.data_types import JsonObject @@ -2664,6 +2665,77 @@ def drop_sequence(instance_id, database_id): # [END spanner_drop_sequence] +def directed_read_options( + instance_id, + database_id, +): + """ + Shows how to run an execute sql request with directed read options. + Only one of exclude_replicas or include_replicas can be set + Each accepts a list of replicaSelections which contains location and type + * `location` - The location must be one of the regions within the + multi-region configuration of your database. + * `type_` - The type of the replica + Some examples of using replica_selectors are: + * `location:us-east1` --> The "us-east1" replica(s) of any available type + will be used to process the request. + * `type:READ_ONLY` --> The "READ_ONLY" type replica(s) in nearest + available location will be used to process the + request. + * `location:us-east1 type:READ_ONLY` --> The "READ_ONLY" type replica(s) + in location "us-east1" will be used to process + the request. + include_replicas also contains an option for auto_failover which when set + Spanner will not route requests to a replica outside the + include_replicas list when all the specified replicas are unavailable + or unhealthy. The default value is `false` + """ + # [START spanner_directed_read] + # instance_id = "your-spanner-instance" + # database_id = "your-spanner-db-id" + + directed_read_options_for_client = { + "exclude_replicas": { + "replica_selections": [ + { + "location": "us-east4", + }, + ], + }, + } + + # directed_read_options can be set at client level and will be used in all + # read-only transaction requests + spanner_client = spanner.Client( + directed_read_options=directed_read_options_for_client + ) + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + directed_read_options_for_request = { + "include_replicas": { + "replica_selections": [ + { + "type_": TransactionTypes.READ_ONLY, + }, + ], + "auto_failover": True, + }, + } + + with database.snapshot() as snapshot: + # Read rows while passing directed_read_options directly to the query. + # These will override the options passed at Client level. + results = snapshot.execute_sql( + "SELECT SingerId, AlbumId, AlbumTitle FROM Albums", + directed_read_options=directed_read_options_for_request, + ) + + for row in results: + print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row)) + # [END spanner_directed_read] + + if __name__ == "__main__": # noqa: C901 parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter @@ -2802,6 +2874,9 @@ def drop_sequence(instance_id, database_id): "--database_role", default="new_parent" ) enable_fine_grained_access_parser.add_argument("--title", default="condition title") + subparsers.add_parser( + "directed_read_options", help=directed_read_options.__doc__ + ) args = parser.parse_args() @@ -2931,3 +3006,5 @@ def drop_sequence(instance_id, database_id): args.database_role, args.title, ) + elif args.command == "directed_read_options": + directed_read_options(args.instance_id, args.database_id) diff --git a/samples/samples/snippets_test.py b/samples/samples/snippets_test.py index 22b5b6f944..99eed700bf 100644 --- a/samples/samples/snippets_test.py +++ b/samples/samples/snippets_test.py @@ -845,3 +845,10 @@ def test_drop_sequence(capsys, instance_id, bit_reverse_sequence_database): "Altered Customers table to drop DEFAULT from CustomerId column and dropped the Seq sequence on database" in out ) + + +@pytest.mark.dependency(depends=["insert_data"]) +def test_directed_read_options(capsys, instance_id, sample_database): + snippets.directed_read_options(instance_id, sample_database.database_id) + out, _ = capsys.readouterr() + assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out diff --git a/scripts/fixup_spanner_v1_keywords.py b/scripts/fixup_spanner_v1_keywords.py index df4d3501f2..b43fdc3d6d 100644 --- a/scripts/fixup_spanner_v1_keywords.py +++ b/scripts/fixup_spanner_v1_keywords.py @@ -45,15 +45,15 @@ class spannerCallTransformer(cst.CSTTransformer): 'create_session': ('database', 'session', ), 'delete_session': ('name', ), 'execute_batch_dml': ('session', 'transaction', 'statements', 'seqno', 'request_options', ), - 'execute_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', 'data_boost_enabled', ), - 'execute_streaming_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', 'data_boost_enabled', ), + 'execute_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', 'directed_read_options', 'data_boost_enabled', ), + 'execute_streaming_sql': ('session', 'sql', 'transaction', 'params', 'param_types', 'resume_token', 'query_mode', 'partition_token', 'seqno', 'query_options', 'request_options', 'directed_read_options', 'data_boost_enabled', ), 'get_session': ('name', ), 'list_sessions': ('database', 'page_size', 'page_token', 'filter', ), 'partition_query': ('session', 'sql', 'transaction', 'params', 'param_types', 'partition_options', ), 'partition_read': ('session', 'table', 'key_set', 'transaction', 'index', 'columns', 'partition_options', ), - 'read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', 'data_boost_enabled', ), + 'read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', 'directed_read_options', 'data_boost_enabled', ), 'rollback': ('session', 'transaction_id', ), - 'streaming_read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', 'data_boost_enabled', ), + 'streaming_read': ('session', 'table', 'columns', 'key_set', 'transaction', 'index', 'limit', 'resume_token', 'partition_token', 'request_options', 'directed_read_options', 'data_boost_enabled', ), } def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode: diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 7d58324b04..b6c474af78 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -28,6 +28,7 @@ from google.cloud import spanner_v1 from google.cloud.spanner_admin_database_v1 import DatabaseDialect from google.cloud._helpers import UTC +from google.cloud.spanner_v1 import TransactionTypes from google.cloud.spanner_v1.data_types import JsonObject from tests import _helpers as ot_helpers from . import _helpers @@ -57,6 +58,17 @@ JSON_2 = JsonObject( {"sample_object": {"name": "Anamika", "id": 2635}}, ) +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": TransactionTypes.READ_ONLY, + }, + ], + "auto_failover": True, + }, +} COUNTERS_TABLE = "counters" COUNTERS_COLUMNS = ("name", "value") @@ -502,6 +514,29 @@ def test_batch_insert_w_commit_timestamp(sessions_database, not_postgres): assert not deleted +def test_snapshot_read_w_directed_read_options(sessions_database, not_postgres): + table = "users_history" + columns = ["id", "commit_ts", "name", "email", "deleted"] + user_id = 1234 + name = "phred" + email = "phred@example.com" + row_data = [[user_id, spanner_v1.COMMIT_TIMESTAMP, name, email, False]] + sd = _sample_data + + with sessions_database.batch() as batch: + batch.delete(table, sd.ALL) + batch.insert(table, columns, row_data) + + with sessions_database.snapshot() as snapshot: + rows = list( + snapshot.read( + table, columns, sd.ALL, directed_read_options=DIRECTED_READ_OPTIONS + ) + ) + + assert len(rows) == 1 + + @_helpers.retry_mabye_aborted_txn def test_transaction_read_and_insert_then_rollback( sessions_database, @@ -1918,6 +1953,19 @@ def test_execute_sql_w_manual_consume(sessions_database): assert streamed._pending_chunk is None +def test_execute_sql_w_directed_read_options(sessions_database, not_postgres): + sd = _sample_data + row_count = 3000 + committed = _set_up_table(sessions_database, row_count) + + with sessions_database.snapshot() as snapshot: + streamed = snapshot.execute_sql( + sd.SQL, directed_read_options=DIRECTED_READ_OPTIONS + ) + + assert len(list(streamed)) == 3000 + + def _check_sql_results( database, sql, diff --git a/tests/unit/gapic/spanner_admin_database_v1/test_database_admin.py b/tests/unit/gapic/spanner_admin_database_v1/test_database_admin.py index 6f5ec35284..48d5447d37 100644 --- a/tests/unit/gapic/spanner_admin_database_v1/test_database_admin.py +++ b/tests/unit/gapic/spanner_admin_database_v1/test_database_admin.py @@ -63,7 +63,7 @@ from google.iam.v1 import iam_policy_pb2 # type: ignore from google.iam.v1 import options_pb2 # type: ignore from google.iam.v1 import policy_pb2 # type: ignore -from google.longrunning import operations_pb2 +from google.longrunning import operations_pb2 # type: ignore from google.longrunning import operations_pb2 # type: ignore from google.oauth2 import service_account from google.protobuf import any_pb2 # type: ignore diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 1628f84062..a6fbd323fc 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -47,7 +47,8 @@ def _make_connection(self, **kwargs): from google.cloud.spanner_v1.client import Client # We don't need a real Client object to test the constructor - instance = Instance(INSTANCE, client=Client) + client = Client() + instance = Instance(INSTANCE, client=client) database = instance.database(DATABASE) return Connection(instance, database, **kwargs) diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 0e0ec903a2..46c54bd8a3 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -16,6 +16,65 @@ import unittest import mock +DIRECTED_READ_INCORRECT_OPTIONS1 = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + }, + ], + }, + "exclude_replicas": { + "replica_selections": [ + { + "location": "us-east1", + }, + ], + }, +} +DIRECTED_READ_INCORRECT_OPTIONS2 = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + { + "location": "us-west1", + }, + ], + }, +} + class Test_merge_query_options(unittest.TestCase): def _callFUT(self, *args, **kw): @@ -761,3 +820,40 @@ def test(self): self.assertEqual( metadata, ("x-goog-spanner-route-to-leader", str(value).lower()) ) + + +class Test_verify_directed_read_options(unittest.TestCase): + def _call_fut(self, directed_read_options): + from google.cloud.spanner_v1._helpers import verify_directed_read_options + + verify_directed_read_options(directed_read_options) + + def test_dict_include_exclude_replica_set_error(self): + from google.api_core.exceptions import InvalidArgument + + directed_read_options = DIRECTED_READ_INCORRECT_OPTIONS1 + with self.assertRaises(InvalidArgument): + self._call_fut(directed_read_options) + + def test_dict_greater_than_ten_replica_selections_error(self): + from google.api_core.exceptions import InvalidArgument + + directed_read_options = DIRECTED_READ_INCORRECT_OPTIONS2 + with self.assertRaises(InvalidArgument): + self._call_fut(directed_read_options) + + def test_object_include_exclude_replica_set_error(self): + from google.api_core.exceptions import InvalidArgument + from google.cloud.spanner_v1 import DirectedReadOptions + + directed_read_options = DirectedReadOptions(DIRECTED_READ_INCORRECT_OPTIONS1) + with self.assertRaises(InvalidArgument): + self._call_fut(directed_read_options) + + def test_dict_greater_than_ten_replica_selections_error(self): + from google.api_core.exceptions import InvalidArgument + from google.cloud.spanner_v1 import DirectedReadOptions + + directed_read_options = DirectedReadOptions(DIRECTED_READ_INCORRECT_OPTIONS2) + with self.assertRaises(InvalidArgument): + self._call_fut(directed_read_options) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index ed79271a96..b6e63eb1ca 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -15,6 +15,7 @@ import unittest import mock +from google.cloud.spanner_v1 import TransactionTypes def _make_credentials(): @@ -41,6 +42,17 @@ class TestClient(unittest.TestCase): LABELS = {"test": "true"} TIMEOUT_SECONDS = 80 LEADER_OPTIONS = ["leader1", "leader2"] + DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": TransactionTypes.READ_ONLY, + }, + ], + "auto_failover": True, + }, + } def _get_target_class(self): from google.cloud import spanner @@ -60,6 +72,7 @@ def _constructor_test_helper( query_options=None, expected_query_options=None, route_to_leader_enabled=True, + directed_read_options=None, ): import google.api_core.client_options from google.cloud.spanner_v1 import client as MUT @@ -85,6 +98,7 @@ def _constructor_test_helper( project=self.PROJECT, credentials=creds, query_options=query_options, + directed_read_options=directed_read_options, **kwargs ) @@ -113,6 +127,8 @@ def _constructor_test_helper( self.assertEqual(client.route_to_leader_enabled, route_to_leader_enabled) else: self.assertFalse(client.route_to_leader_enabled) + if directed_read_options is not None: + self.assertEqual(client.directed_read_options, directed_read_options) @mock.patch("google.cloud.spanner_v1.client._get_spanner_emulator_host") @mock.patch("warnings.warn") @@ -235,6 +251,15 @@ def test_constructor_route_to_leader_disbled(self): expected_scopes, creds, route_to_leader_enabled=False ) + def test_constructor_w_directed_read_options(self): + from google.cloud.spanner_v1 import client as MUT + + expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,) + creds = _make_credentials() + self._constructor_test_helper( + expected_scopes, creds, directed_read_options=self.DIRECTED_READ_OPTIONS + ) + @mock.patch("google.cloud.spanner_v1.client._get_spanner_emulator_host") def test_instance_admin_api(self, mock_em): from google.cloud.spanner_v1.client import SPANNER_ADMIN_SCOPE diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 5a6abf8084..94675f9162 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -22,7 +22,7 @@ from google.api_core.retry import Retry from google.protobuf.field_mask_pb2 import FieldMask -from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1 import RequestOptions, TransactionTypes DML_WO_PARAM = """ DELETE FROM citizens @@ -35,6 +35,17 @@ PARAMS = {"age": 30} PARAM_TYPES = {"age": INT64} MODE = 2 # PROFILE +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": TransactionTypes.READ_ONLY, + }, + ], + "auto_failover": True, + }, +} def _make_credentials(): # pragma: NO COVER @@ -199,6 +210,16 @@ def test_ctor_w_encryption_config(self): self.assertIs(database._instance, instance) self.assertEqual(database._encryption_config, encryption_config) + def test_ctor_w_directed_read_options(self): + client = _Client(directed_read_options=DIRECTED_READ_OPTIONS) + instance = _Instance(self.INSTANCE_NAME, client=client) + database = self._make_one( + self.DATABASE_ID, instance, database_role=self.DATABASE_ROLE + ) + self.assertEqual(database.database_id, self.DATABASE_ID) + self.assertIs(database._instance, instance) + self.assertEqual(database._directed_read_options, DIRECTED_READ_OPTIONS) + def test_from_pb_bad_database_name(self): from google.cloud.spanner_admin_database_v1 import Database @@ -2690,7 +2711,13 @@ def _make_instance_api(): class _Client(object): - def __init__(self, project=TestDatabase.PROJECT_ID, route_to_leader_enabled=True): + def __init__( + self, + project=TestDatabase.PROJECT_ID, + route_to_leader_enabled=True, + directed_read_options=None, + ): + from google.cloud.spanner_v1 import ExecuteSqlRequest self.project = project @@ -2701,6 +2728,7 @@ def __init__(self, project=TestDatabase.PROJECT_ID, route_to_leader_enabled=True self._client_options = mock.Mock() self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.route_to_leader_enabled = route_to_leader_enabled + self.directed_read_options = directed_read_options class _Instance(object): @@ -2727,6 +2755,7 @@ def __init__(self, name, instance=None): from logging import Logger self.logger = mock.create_autospec(Logger, instance=True) + self._directed_read_options = None class _Pool(object): diff --git a/tests/unit/test_instance.py b/tests/unit/test_instance.py index 0a7dbccb81..503e8f83bd 100644 --- a/tests/unit/test_instance.py +++ b/tests/unit/test_instance.py @@ -1016,6 +1016,7 @@ def __init__(self, project, timeout_seconds=None): self.project_name = "projects/" + self.project self.timeout_seconds = timeout_seconds self.route_to_leader_enabled = True + self.directed_read_options = None def copy(self): from copy import deepcopy diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index 5d2afb4fe6..d80edee3f7 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -16,7 +16,7 @@ from google.api_core import gapic_v1 import mock -from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1 import RequestOptions, TransactionTypes from tests._helpers import ( OpenTelemetryBase, StatusCode, @@ -46,6 +46,26 @@ "db.instance": "testing", "net.host.name": "spanner.googleapis.com", } +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": TransactionTypes.READ_ONLY, + }, + ], + "auto_failover": True, + }, +} +DIRECTED_READ_OPTIONS_FOR_CLIENT = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-east1", + }, + ], + }, +} class Test_restart_on_unavailable(OpenTelemetryBase): @@ -603,6 +623,8 @@ def _read_helper( timeout=gapic_v1.method.DEFAULT, retry=gapic_v1.method.DEFAULT, request_options=None, + directed_read_options=None, + directed_read_options_at_client_level=None, ): from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import ( @@ -642,7 +664,9 @@ def _read_helper( keyset = KeySet(keys=KEYS) INDEX = "email-address-index" LIMIT = 20 - database = _Database() + database = _Database( + directed_read_options=directed_read_options_at_client_level + ) api = database.spanner_api = self._make_spanner_api() api.streaming_read.return_value = _MockIterator(*result_sets) session = _Session(database) @@ -667,6 +691,7 @@ def _read_helper( retry=retry, timeout=timeout, request_options=request_options, + directed_read_options=directed_read_options, ) else: result_set = derived.read( @@ -678,6 +703,7 @@ def _read_helper( retry=retry, timeout=timeout, request_options=request_options, + directed_read_options=directed_read_options, ) self.assertEqual(derived._read_request_count, count + 1) @@ -712,6 +738,12 @@ def _read_helper( expected_request_options = request_options expected_request_options.transaction_tag = None + expected_directed_read_options = ( + directed_read_options + if directed_read_options is not None + else directed_read_options_at_client_level + ) + expected_request = ReadRequest( session=self.SESSION_NAME, table=TABLE_NAME, @@ -722,6 +754,7 @@ def _read_helper( limit=expected_limit, partition_token=partition, request_options=expected_request_options, + directed_read_options=expected_directed_read_options, ) api.streaming_read.assert_called_once_with( request=expected_request, @@ -797,6 +830,22 @@ def test_read_w_timeout_and_retry_params(self): multi_use=True, first=False, retry=Retry(deadline=60), timeout=2.0 ) + def test_read_w_directed_read_options(self): + self._read_helper(multi_use=False, directed_read_options=DIRECTED_READ_OPTIONS) + + def test_read_w_directed_read_options_at_client_level(self): + self._read_helper( + multi_use=False, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + + def test_read_w_directed_read_options_override(self): + self._read_helper( + multi_use=False, + directed_read_options=DIRECTED_READ_OPTIONS, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + def test_execute_sql_other_error(self): database = _Database() database.spanner_api = self._make_spanner_api() @@ -836,6 +885,8 @@ def _execute_sql_helper( request_options=None, timeout=gapic_v1.method.DEFAULT, retry=gapic_v1.method.DEFAULT, + directed_read_options=None, + directed_read_options_at_client_level=None, ): from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import ( @@ -876,7 +927,9 @@ def _execute_sql_helper( for i in range(len(result_sets)): result_sets[i].values.extend(VALUE_PBS[i]) iterator = _MockIterator(*result_sets) - database = _Database() + database = _Database( + directed_read_options=directed_read_options_at_client_level + ) api = database.spanner_api = self._make_spanner_api() api.execute_streaming_sql.return_value = iterator session = _Session(database) @@ -902,6 +955,7 @@ def _execute_sql_helper( partition=partition, retry=retry, timeout=timeout, + directed_read_options=directed_read_options, ) self.assertEqual(derived._read_request_count, count + 1) @@ -942,6 +996,12 @@ def _execute_sql_helper( expected_request_options = request_options expected_request_options.transaction_tag = None + expected_directed_read_options = ( + directed_read_options + if directed_read_options is not None + else directed_read_options_at_client_level + ) + expected_request = ExecuteSqlRequest( session=self.SESSION_NAME, sql=SQL_QUERY_WITH_PARAM, @@ -953,6 +1013,7 @@ def _execute_sql_helper( request_options=expected_request_options, partition_token=partition, seqno=sql_count, + directed_read_options=expected_directed_read_options, ) api.execute_streaming_sql.assert_called_once_with( request=expected_request, @@ -1039,6 +1100,24 @@ def test_execute_sql_w_incorrect_tag_dictionary_error(self): with self.assertRaises(ValueError): self._execute_sql_helper(multi_use=False, request_options=request_options) + def test_execute_sql_w_directed_read_options(self): + self._execute_sql_helper( + multi_use=False, directed_read_options=DIRECTED_READ_OPTIONS + ) + + def test_execute_sql_w_directed_read_options(self): + self._execute_sql_helper( + multi_use=False, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + + def test_execute_sql_w_directed_read_options(self): + self._execute_sql_helper( + multi_use=False, + directed_read_options=DIRECTED_READ_OPTIONS, + directed_read_options_at_client_level=DIRECTED_READ_OPTIONS_FOR_CLIENT, + ) + def _partition_read_helper( self, multi_use, @@ -1747,10 +1826,11 @@ def __init__(self): class _Database(object): - def __init__(self): + def __init__(self, directed_read_options=None): self.name = "testing" self._instance = _Instance() self._route_to_leader_enabled = True + self._directed_read_options = directed_read_options class _Session(object): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index e4cd1e84cd..a9c05c563f 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -28,6 +28,7 @@ StructType, TransactionOptions, TransactionSelector, + TransactionTypes, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, param_types, @@ -74,6 +75,17 @@ RETRY = gapic_v1.method.DEFAULT TIMEOUT = gapic_v1.method.DEFAULT REQUEST_OPTIONS = RequestOptions() +DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": TransactionTypes.READ_ONLY, + }, + ], + "auto_failover": True, + }, +} insert_dml = "INSERT INTO table(pkey, desc) VALUES (%pkey, %desc)" insert_params = {"pkey": 12345, "desc": "DESCRIPTION"} insert_param_types = {"pkey": param_types.INT64, "desc": param_types.STRING} @@ -193,6 +205,7 @@ def _execute_sql_helper( partition=None, sql_count=0, query_options=None, + directed_read_options=None, ): VALUES = [["bharney", "rhubbyl", 31], ["phred", "phlyntstone", 32]] VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] @@ -231,6 +244,7 @@ def _execute_sql_helper( partition=partition, retry=RETRY, timeout=TIMEOUT, + directed_read_options=directed_read_options, ) self.assertEqual(transaction._read_request_count, count + 1) @@ -284,6 +298,7 @@ def _read_helper( api, count=0, partition=None, + directed_read_options=None, ): VALUES = [["bharney", 31], ["phred", 32]] VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] @@ -322,6 +337,7 @@ def _read_helper( retry=RETRY, timeout=TIMEOUT, request_options=REQUEST_OPTIONS, + directed_read_options=directed_read_options, ) else: result_set = transaction.read( @@ -333,6 +349,7 @@ def _read_helper( retry=RETRY, timeout=TIMEOUT, request_options=REQUEST_OPTIONS, + directed_read_options=directed_read_options, ) self.assertEqual(transaction._read_request_count, count + 1) @@ -357,7 +374,6 @@ def _read_helper_expected_request(self, partition=None, begin=True, count=0): else: expected_limit = LIMIT - # Transaction tag is ignored for read request. expected_request_options = REQUEST_OPTIONS expected_request_options.transaction_tag = self.TRANSACTION_TAG @@ -606,6 +622,28 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): ], ) + def test_transaction_should_throw_error_w_directed_read_options(self): + from google.api_core.exceptions import BadRequest + + database = _Database() + session = _Session(database) + api = database.spanner_api = self._make_spanner_api() + transaction = self._make_one(session) + + with self.assertRaises(BadRequest): + self._execute_sql_helper( + transaction=transaction, + api=api, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + + with self.assertRaises(BadRequest): + self._read_helper( + transaction=transaction, + api=api, + directed_read_options=DIRECTED_READ_OPTIONS, + ) + def test_transaction_should_use_transaction_id_returned_by_first_read(self): database = _Database() session = _Session(database) @@ -924,6 +962,7 @@ def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self.directed_read_options = None class _Instance(object): @@ -936,6 +975,7 @@ def __init__(self): self.name = "testing" self._instance = _Instance() self._route_to_leader_enabled = True + self._directed_read_options = None class _Session(object): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 85359dac19..7d6a4b1ec4 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -895,6 +895,7 @@ def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self.directed_read_options = None class _Instance(object): @@ -907,6 +908,7 @@ def __init__(self): self.name = "testing" self._instance = _Instance() self._route_to_leader_enabled = True + self._directed_read_options = None class _Session(object):