Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion google/cloud/storage/_experimental/asyncio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def raise_if_no_fast_crc32c():
"For more information, see https://github.com/googleapis/python-crc32c."
)


def update_write_handle_if_exists(obj, response):
"""Update the write_handle attribute of an object if it exists in the response."""
if hasattr(response, "write_handle") and response.write_handle is not None:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -493,4 +493,4 @@ async def close(self):

@property
def is_stream_open(self) -> bool:
return self._is_stream_open
return self._is_stream_open
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:

@property
def is_stream_open(self) -> bool:
return self._is_stream_open
return self._is_stream_open
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
if you want to use these Rapid Storage APIs.

"""
from typing import Optional
from . import _utils
from typing import List, Optional, Tuple
from google.cloud import _storage_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
Expand Down Expand Up @@ -60,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
same name already exists, it will be overwritten the moment
`writer.open()` is called.

:type write_handle: _storage_v2.BidiWriteHandle
:type write_handle: bytes
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
Expand All @@ -71,7 +70,8 @@ def __init__(
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None, # None means new object
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
write_handle: Optional[bytes] = None,
routing_token: Optional[str] = None,
) -> None:
if client is None:
raise ValueError("client must be provided")
Expand All @@ -86,7 +86,8 @@ def __init__(
generation_number=generation_number,
)
self.client: AsyncGrpcClient.grpc_client = client
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
self.write_handle: Optional[bytes] = write_handle
self.routing_token: Optional[str] = routing_token

self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"

Expand All @@ -101,7 +102,7 @@ def __init__(
self.persisted_size = 0
self.object_resource: Optional[_storage_v2.Object] = None

async def open(self) -> None:
async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
"""
Opens the bidi-gRPC connection to write to the object.

Expand All @@ -110,20 +111,19 @@ async def open(self) -> None:

:rtype: None
:raises ValueError: If the stream is already open.
:raises google.api_core.exceptions.FailedPrecondition:
:raises google.api_core.exceptions.FailedPrecondition:
if `generation_number` is 0 and object already exists.
"""
if self._is_stream_open:
raise ValueError("Stream is already open")

write_handle = self.write_handle if self.write_handle else None

# Create a new object or overwrite existing one if generation_number
# is None. This makes it consistent with GCS JSON API behavior.
# Created object type would be Appendable Object.
# if `generation_number` == 0 new object will be created only if there
# isn't any existing object.
is_open_via_write_handle = (
self.write_handle is not None and self.generation_number
)
if self.generation_number is None or self.generation_number == 0:
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
write_object_spec=_storage_v2.WriteObjectSpec(
Expand All @@ -140,44 +140,47 @@ async def open(self) -> None:
bucket=self._full_bucket_name,
object=self.object_name,
generation=self.generation_number,
write_handle=self.write_handle,
write_handle=write_handle,
routing_token=self.routing_token if self.routing_token else None,
),
)

request_params = [f"bucket={self._full_bucket_name}"]
other_metadata = []
if metadata:
for key, value in metadata:
if key == "x-goog-request-params":
request_params.append(value)
else:
other_metadata.append((key, value))

current_metadata = other_metadata
current_metadata.append(("x-goog-request-params", ",".join(request_params)))

self.socket_like_rpc = AsyncBidiRpc(
self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata
self.rpc,
initial_request=self.first_bidi_write_req,
metadata=current_metadata,
)

await self.socket_like_rpc.open() # this is actually 1 send
response = await self.socket_like_rpc.recv()
self._is_stream_open = True
if is_open_via_write_handle:
# Don't use if not response.persisted_size because this will be true
# if persisted_size==0 (0 is considered "Falsy" in Python)
if response.persisted_size is None:
raise ValueError(
"Failed to obtain persisted_size after opening the stream via write_handle"
)

if response.persisted_size:
self.persisted_size = response.persisted_size
else:
if not response.resource:
raise ValueError(
"Failed to obtain object resource after opening the stream"
)
if not response.resource.generation:
raise ValueError(
"Failed to obtain object generation after opening the stream"
)

if response.resource:
if not response.resource.size:
# Appending to a 0 byte appendable object.
self.persisted_size = 0
else:
self.persisted_size = response.resource.size

if not response.write_handle:
raise ValueError("Failed to obtain write_handle after opening the stream")
self.generation_number = response.resource.generation

self.generation_number = response.resource.generation
self.write_handle = response.write_handle
if response.write_handle:
self.write_handle = response.write_handle

async def close(self) -> None:
"""Closes the bidi-gRPC connection."""
Expand All @@ -191,7 +194,7 @@ async def requests_done(self):
"""Signals that all requests have been sent."""

await self.socket_like_rpc.send(None)
_utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv())
await self.socket_like_rpc.recv()

async def send(
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest
Expand Down Expand Up @@ -220,9 +223,17 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
if not self._is_stream_open:
raise ValueError("Stream is not open")
response = await self.socket_like_rpc.recv()
_utils.update_write_handle_if_exists(self, response)
# Update write_handle if present in response
if response:
if response.write_handle:
self.write_handle = response.write_handle
if response.persisted_size is not None:
self.persisted_size = response.persisted_size
if response.resource and response.resource.size:
self.persisted_size = response.resource.size
return response

@property
def is_stream_open(self) -> bool:
return self._is_stream_open

46 changes: 44 additions & 2 deletions google/cloud/storage/_experimental/asyncio/retry/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
from typing import Tuple, Optional

from google.api_core import exceptions
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError
from google.cloud._storage_v2.types import (
BidiReadObjectRedirectedError,
BidiWriteObjectRedirectedError,
)
from google.rpc import status_pb2

_BIDI_READ_REDIRECTED_TYPE_URL = (
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
)
_BIDI_WRITE_REDIRECTED_TYPE_URL = (
"type.googleapis.com/google.storage.v2.BidiWriteObjectRedirectedError"
)
logger = logging.getLogger(__name__)


def _handle_redirect(
Expand Down Expand Up @@ -78,6 +85,41 @@ def _handle_redirect(
read_handle = redirect_proto.read_handle
break
except Exception as e:
logging.ERROR(f"Error unpacking redirect: {e}")
logger.error(f"Error unpacking redirect: {e}")

return routing_token, read_handle


def _extract_bidi_writes_redirect_proto(exc: Exception):
grpc_error = None
if isinstance(exc, exceptions.Aborted) and exc.errors:
grpc_error = exc.errors[0]

if grpc_error:
if isinstance(grpc_error, BidiWriteObjectRedirectedError):
return grpc_error

if hasattr(grpc_error, "trailing_metadata"):
trailers = grpc_error.trailing_metadata()
if not trailers:
return

status_details_bin = None
for key, value in trailers:
if key == "grpc-status-details-bin":
status_details_bin = value
break

if status_details_bin:
status_proto = status_pb2.Status()
try:
status_proto.ParseFromString(status_details_bin)
for detail in status_proto.details:
if detail.type_url == _BIDI_WRITE_REDIRECTED_TYPE_URL:
redirect_proto = BidiWriteObjectRedirectedError.deserialize(
detail.value
)
return redirect_proto
except Exception:
logger.error("Error unpacking redirect details from gRPC error.")
pass
Loading