Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/google/cloud/streaming/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def initialize_download(self, http_request, http):
# Unless the user has requested otherwise, we want to just
# go ahead and pump the bytes now.
if self.auto_transfer:
self.stream_file(use_chunks=True)
self.stream_file(use_chunks=True, headers=http_request.headers)

def _normalize_start_end(self, start, end=None):
"""Validate / fix up byte range.
Expand Down Expand Up @@ -487,7 +487,7 @@ def _compute_end_byte(self, start, end=None, use_chunks=True):

return end_byte

def _get_chunk(self, start, end):
def _get_chunk(self, start, end, headers=None):
"""Retrieve a chunk of the file.

:type start: int
Expand All @@ -496,11 +496,14 @@ def _get_chunk(self, start, end):
:type end: int
:param end: (Optional) end byte of the range.

:type headers: dict
:param headers: (Optional) Headers to be used for the ``Request``.

:rtype: :class:`google.cloud.streaming.http_wrapper.Response`
:returns: response from the chunk request.
"""
self._ensure_initialized()
request = Request(url=self.url)
request = Request(url=self.url, headers=headers)
self._set_range_header(request, start, end=end)
return make_api_request(
self.bytes_http, request, retries=self.num_retries)
Expand Down Expand Up @@ -589,7 +592,7 @@ def get_range(self, start, end=None, use_chunks=True):
raise TransferRetryError(
'Zero bytes unexpectedly returned in download response')

def stream_file(self, use_chunks=True):
def stream_file(self, use_chunks=True, headers=None):
"""Stream the entire download.

Writes retrieved bytes into :attr:`stream`.
Expand All @@ -598,6 +601,9 @@ def stream_file(self, use_chunks=True):
:param use_chunks: If False, ignore :attr:`chunksize`
and stream this download in a single request.
If True, streams via chunks.

:type headers: dict
:param headers: (Optional) Headers to be used for the ``Request``.
"""
self._ensure_initialized()
while True:
Expand All @@ -607,7 +613,8 @@ def stream_file(self, use_chunks=True):
else:
end_byte = self._compute_end_byte(self.progress,
use_chunks=use_chunks)
response = self._get_chunk(self.progress, end_byte)
response = self._get_chunk(self.progress, end_byte,
headers=headers)
if self.total_size is None:
self._set_total(response.info)
response = self._process_response(response)
Expand Down
24 changes: 24 additions & 0 deletions system_tests/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def tearDown(self):


class TestStorageWriteFiles(TestStorageFiles):
ENCRYPTION_KEY = 'b23ff11bba187db8c37077e6af3b25b8'

def test_large_file_write_from_stream(self):
blob = self.bucket.blob('LargeFile')
Expand All @@ -163,6 +164,29 @@ def test_large_file_write_from_stream(self):
md5_hash = md5_hash.encode('utf-8')
self.assertEqual(md5_hash, file_data['hash'])

def test_large_encrypted_file_write_from_stream(self):
blob = self.bucket.blob('LargeFile',
encryption_key=self.ENCRYPTION_KEY)

file_data = self.FILES['big']
with open(file_data['path'], 'rb') as file_obj:
blob.upload_from_file(file_obj)
self.case_blobs_to_delete.append(blob)

md5_hash = blob.md5_hash
if not isinstance(md5_hash, six.binary_type):
md5_hash = md5_hash.encode('utf-8')
self.assertEqual(md5_hash, file_data['hash'])

temp_filename = tempfile.mktemp()
with open(temp_filename, 'wb') as file_obj:
blob.download_to_file(file_obj)

with open(temp_filename, 'rb') as file_obj:
md5_temp_hash = _base64_md5hash(file_obj)

self.assertEqual(md5_temp_hash, file_data['hash'])

def test_small_file_write_from_filename(self):
blob = self.bucket.blob('SmallFile')

Expand Down