Skip to content

Commit 2ef6339

Browse files
authored
fix: receive eof while closing reads stream (#1733)
When `writer.close()` is called without setting finalize_on_close flag, we need to get two responses: 1) to get the persisted_size 2) eof response That's why added a check if the first response is not eof, then again receive the response from the stream.
1 parent c8dd7a0 commit 2ef6339

File tree

2 files changed

+59
-2
lines changed

2 files changed

+59
-2
lines changed

google/cloud/storage/asyncio/async_write_object_stream.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from typing import List, Optional, Tuple
16+
import grpc
1617
from google.cloud import _storage_v2
1718
from google.cloud.storage.asyncio import _utils
1819
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
@@ -181,9 +182,17 @@ async def close(self) -> None:
181182

182183
async def requests_done(self):
183184
"""Signals that all requests have been sent."""
184-
185185
await self.socket_like_rpc.send(None)
186-
_utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv())
186+
187+
# The server may send a final "EOF" response immediately, or it may
188+
# first send an intermediate response followed by the EOF response depending on whether the object was finalized or not.
189+
first_resp = await self.socket_like_rpc.recv()
190+
_utils.update_write_handle_if_exists(self, first_resp)
191+
192+
if first_resp != grpc.aio.EOF:
193+
self.persisted_size = first_resp.persisted_size
194+
second_resp = await self.socket_like_rpc.recv()
195+
assert second_resp == grpc.aio.EOF
187196

188197
async def send(
189198
self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest

tests/unit/asyncio/test_async_write_object_stream.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import unittest.mock as mock
1616
from unittest.mock import AsyncMock, MagicMock
1717
import pytest
18+
import grpc
19+
1820

1921
from google.cloud.storage.asyncio.async_write_object_stream import (
2022
_AsyncWriteObjectStream,
@@ -194,11 +196,57 @@ async def test_close_success(self, mock_client):
194196
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
195197
stream._is_stream_open = True
196198
stream.socket_like_rpc = AsyncMock()
199+
200+
stream.socket_like_rpc.send = AsyncMock()
201+
first_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=100)
202+
stream.socket_like_rpc.recv = AsyncMock(side_effect=[first_resp, grpc.aio.EOF])
197203
stream.socket_like_rpc.close = AsyncMock()
198204

199205
await stream.close()
200206
stream.socket_like_rpc.close.assert_awaited_once()
201207
assert not stream.is_stream_open
208+
assert stream.persisted_size == 100
209+
210+
@pytest.mark.asyncio
211+
async def test_close_with_persisted_size_then_eof(self, mock_client):
212+
"""Test close when first recv has persisted_size, second is EOF."""
213+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
214+
stream._is_stream_open = True
215+
stream.socket_like_rpc = AsyncMock()
216+
217+
# First response has persisted_size (NOT EOF, intermediate)
218+
persisted_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=500)
219+
# Second response is EOF (None)
220+
eof_resp = grpc.aio.EOF
221+
222+
stream.socket_like_rpc.send = AsyncMock()
223+
stream.socket_like_rpc.recv = AsyncMock(side_effect=[persisted_resp, eof_resp])
224+
stream.socket_like_rpc.close = AsyncMock()
225+
226+
await stream.close()
227+
228+
# Verify two recv calls: first has persisted_size (NOT EOF), so read second (EOF)
229+
assert stream.socket_like_rpc.recv.await_count == 2
230+
assert stream.persisted_size == 500
231+
assert not stream.is_stream_open
232+
233+
@pytest.mark.asyncio
234+
async def test_close_with_grpc_aio_eof_response(self, mock_client):
235+
"""Test close when first recv is grpc.aio.EOF sentinel."""
236+
stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT)
237+
stream._is_stream_open = True
238+
stream.socket_like_rpc = AsyncMock()
239+
240+
# First recv returns grpc.aio.EOF (explicit sentinel from finalize)
241+
stream.socket_like_rpc.send = AsyncMock()
242+
stream.socket_like_rpc.recv = AsyncMock(return_value=grpc.aio.EOF)
243+
stream.socket_like_rpc.close = AsyncMock()
244+
245+
await stream.close()
246+
247+
# Verify only one recv call (grpc.aio.EOF=EOF, so don't read second)
248+
assert stream.socket_like_rpc.recv.await_count == 1
249+
assert not stream.is_stream_open
202250

203251
@pytest.mark.asyncio
204252
async def test_methods_require_open_raises(self, mock_client):

0 commit comments

Comments
 (0)