Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export AWS_ACCESS_KEY_ID="FANCY-ROOT-KEY"
export AWS_SECRET_ACCESS_KEY="SECRET"

# Optional: Run a command automatically when .envrc loads
# aws s3 ls
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,20 @@ Simply download suitable sample script for your use-case and run it close to the

Currently list of available sample scripts is rather short, but we are working on extending it. Star to follow on our updates.

For additional informaition, please refer to our documentation: https://docs.ultihash.io/
For additional informaition, please refer to our documentation: https://docs.ultihash.io/

Upload a folder:

```bash
./boto3/multithread_upload/uh_upload.py --url http://localhost:8080 -B xxx -e <source_folder>
```

Download the bucket:

```bash
./boto3/multithread_download/uh_download.py xxx --url http://localhost:8080 -C <target_folder> --delete-path
```

You'd better use ramdisk folder as a target folder for download, as it will be much faster and perform stabler.

Adding --delete-path option will remove the target folder before download, so you're download doesn't do any hidden remove operation which may slow down the process.
31 changes: 27 additions & 4 deletions boto3/multithread_download/uh_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import sys
import time
import tqdm
import os

import shutil

def parse_args():
parser = argparse.ArgumentParser(
Expand All @@ -31,6 +34,9 @@ def parse_args():
parser.add_argument('--no-store', help='do not store downloaded data to file system, only used for benchmarking',
action='store_false', dest='store')

parser.add_argument('--delete-path', help='recursively delete target path before download',
action='store_true', dest='delete_path')

return parser.parse_args()


Expand Down Expand Up @@ -59,13 +65,22 @@ def cb(count):


response = self.s3.get_object(Bucket=bucket, Key=key)
body = response["Body"].read()
cb(len(body))

stream = response["Body"]
if self.config.store:
local_path.parent.mkdir(parents=True, exist_ok=True)
with open(local_path, "wb+") as f:
f.write(body)
while True:
chunk = stream.read(1048576)
if not chunk:
break
f.write(chunk)
cb(len(chunk))
else:
while True:
chunk = stream.read(1048576)
if not chunk:
break
cb(len(chunk))

def list_objects(self, bucket):
paginator = self.s3.get_paginator('list_objects_v2')
Expand Down Expand Up @@ -117,4 +132,12 @@ def download(config):

if __name__ == "__main__":
config = parse_args()

def delete_path(config):
if config.path.exists():
shutil.rmtree(config.path)
os.sync()

if config.delete_path:
delete_path(config)
download(config)
68 changes: 56 additions & 12 deletions boto3/multithread_upload/uh_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def parse_args():
parser.add_argument('--generate', help='generate and upload random data of the specified size in GiB',
action='store', dest='generate', type=int)

parser.add_argument('-e', '--check-etag', help='check ETag after upload (single part only)',
action='store_true', dest='check_etag')

args, unknown = parser.parse_known_args()

if args.generate:
Expand Down Expand Up @@ -104,7 +107,9 @@ def cb(count):
self.progress.update(count)
else:
self.count_buffer += count
self.s3.upload_file(file_path, Bucket=bucket, Key=str(file_path.relative_to(base_path)), Callback=cb, Config=self.transfer_config)
key = str(file_path.relative_to(base_path))
self.s3.upload_file(file_path, Bucket=bucket, Key=key, Callback=cb, Config=self.transfer_config)
return key

def upload_random(self, bucket, key, size):
def cb(count):
Expand Down Expand Up @@ -180,23 +185,38 @@ def upload (config):
pass

if base_path.is_file():
results += [(base_path, up.push(bucket, base_path, pathlib.Path(base_path).parent))]
size_total += base_path.stat().st_size
continue
key = str(base_path.relative_to(pathlib.Path(base_path).parent))
future = up.push(bucket, base_path, pathlib.Path(base_path).parent)
results += [(base_path, future, bucket, key)]
size_total += base_path.stat().st_size
continue

for (root, dirs, files) in os.walk(base_path):
for file in files:
file_path = pathlib.Path(root) / file
size_total += file_path.stat().st_size
results += [(file_path, up.push(bucket, file_path, base_path))]
for file in files:
file_path = pathlib.Path(root) / file
key = str(file_path.relative_to(base_path))
future = up.push(bucket, file_path, base_path)
results += [(file_path, future, bucket, key)]
size_total += file_path.stat().st_size

up.set_total(size_total)

import hashlib
def calc_md5(path):
h = hashlib.md5()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()

# Wait for all uploads to finish
for job in results:
try:
job[1].result()
except Exception as e:
print(f"Error uploading {job[0]}: {str(e)}", file=sys.stderr)
# Wait for each upload to finish
file_path, future, bucket, key = job
try:
future.result()
except Exception as e:
print(f"Error uploading {file_path}: {str(e)}", file=sys.stderr)

end = time.monotonic()
seconds = end - start
Expand All @@ -205,6 +225,30 @@ def upload (config):
up.stop()
print(f"average upload speed: {mb/seconds} MB/s")

# ETag check after timing
etag_fail = False
for job in results:
file_path, _, bucket, key = job
try:
if config.check_etag and pathlib.Path(file_path).is_file():
resp = up.s3.head_object(Bucket=bucket, Key=key)
etag = resp['ETag'].strip('"')
md5 = calc_md5(file_path)
if '-' in etag:
print(f"[ETag] {file_path}: multipart upload detected, cannot compare MD5.")
elif etag == md5:
print(f"[ETag] {file_path}: OK (etag matches local MD5)")
else:
print(f"\033[31m[ETag] {file_path}: FAIL (etag {etag} != local MD5 {md5})\033[0m")
etag_fail = True
else:
pass
except Exception as e:
print(f"Error uploading {file_path}: {str(e)}", file=sys.stderr)
etag_fail = True

if config.check_etag and etag_fail:
sys.exit(1)
return float(mb)/seconds

if __name__ == "__main__":
Expand Down