diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..298b682 --- /dev/null +++ b/.envrc @@ -0,0 +1,5 @@ +export AWS_ACCESS_KEY_ID="FANCY-ROOT-KEY" +export AWS_SECRET_ACCESS_KEY="SECRET" + +# Optional: Run a command automatically when .envrc loads +# aws s3 ls diff --git a/README.md b/README.md index 23d5132..d38659b 100644 --- a/README.md +++ b/README.md @@ -12,4 +12,20 @@ Simply download suitable sample script for your use-case and run it close to the Currently list of available sample scripts is rather short, but we are working on extending it. Star to follow on our updates. -For additional informaition, please refer to our documentation: https://docs.ultihash.io/ \ No newline at end of file +For additional informaition, please refer to our documentation: https://docs.ultihash.io/ + +Upload a folder: + +```bash +./boto3/multithread_upload/uh_upload.py --url http://localhost:8080 -B xxx -e +``` + +Download the bucket: + +```bash +./boto3/multithread_download/uh_download.py xxx --url http://localhost:8080 -C --delete-path +``` + +You'd better use ramdisk folder as a target folder for download, as it will be much faster and perform stabler. + +Adding --delete-path option will remove the target folder before download, so you're download doesn't do any hidden remove operation which may slow down the process. diff --git a/boto3/multithread_download/uh_download.py b/boto3/multithread_download/uh_download.py index b73c2b0..425b8ee 100755 --- a/boto3/multithread_download/uh_download.py +++ b/boto3/multithread_download/uh_download.py @@ -8,6 +8,9 @@ import sys import time import tqdm +import os + +import shutil def parse_args(): parser = argparse.ArgumentParser( @@ -31,6 +34,9 @@ def parse_args(): parser.add_argument('--no-store', help='do not store downloaded data to file system, only used for benchmarking', action='store_false', dest='store') + parser.add_argument('--delete-path', help='recursively delete target path before download', + action='store_true', dest='delete_path') + return parser.parse_args() @@ -59,13 +65,22 @@ def cb(count): response = self.s3.get_object(Bucket=bucket, Key=key) - body = response["Body"].read() - cb(len(body)) - + stream = response["Body"] if self.config.store: local_path.parent.mkdir(parents=True, exist_ok=True) with open(local_path, "wb+") as f: - f.write(body) + while True: + chunk = stream.read(1048576) + if not chunk: + break + f.write(chunk) + cb(len(chunk)) + else: + while True: + chunk = stream.read(1048576) + if not chunk: + break + cb(len(chunk)) def list_objects(self, bucket): paginator = self.s3.get_paginator('list_objects_v2') @@ -117,4 +132,12 @@ def download(config): if __name__ == "__main__": config = parse_args() + + def delete_path(config): + if config.path.exists(): + shutil.rmtree(config.path) + os.sync() + + if config.delete_path: + delete_path(config) download(config) diff --git a/boto3/multithread_upload/uh_upload.py b/boto3/multithread_upload/uh_upload.py index 4cfd7ba..ad670d6 100755 --- a/boto3/multithread_upload/uh_upload.py +++ b/boto3/multithread_upload/uh_upload.py @@ -44,6 +44,9 @@ def parse_args(): parser.add_argument('--generate', help='generate and upload random data of the specified size in GiB', action='store', dest='generate', type=int) + parser.add_argument('-e', '--check-etag', help='check ETag after upload (single part only)', + action='store_true', dest='check_etag') + args, unknown = parser.parse_known_args() if args.generate: @@ -104,7 +107,9 @@ def cb(count): self.progress.update(count) else: self.count_buffer += count - self.s3.upload_file(file_path, Bucket=bucket, Key=str(file_path.relative_to(base_path)), Callback=cb, Config=self.transfer_config) + key = str(file_path.relative_to(base_path)) + self.s3.upload_file(file_path, Bucket=bucket, Key=key, Callback=cb, Config=self.transfer_config) + return key def upload_random(self, bucket, key, size): def cb(count): @@ -180,23 +185,38 @@ def upload (config): pass if base_path.is_file(): - results += [(base_path, up.push(bucket, base_path, pathlib.Path(base_path).parent))] - size_total += base_path.stat().st_size - continue + key = str(base_path.relative_to(pathlib.Path(base_path).parent)) + future = up.push(bucket, base_path, pathlib.Path(base_path).parent) + results += [(base_path, future, bucket, key)] + size_total += base_path.stat().st_size + continue for (root, dirs, files) in os.walk(base_path): - for file in files: - file_path = pathlib.Path(root) / file - size_total += file_path.stat().st_size - results += [(file_path, up.push(bucket, file_path, base_path))] + for file in files: + file_path = pathlib.Path(root) / file + key = str(file_path.relative_to(base_path)) + future = up.push(bucket, file_path, base_path) + results += [(file_path, future, bucket, key)] + size_total += file_path.stat().st_size up.set_total(size_total) + import hashlib + def calc_md5(path): + h = hashlib.md5() + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + h.update(chunk) + return h.hexdigest() + + # Wait for all uploads to finish for job in results: - try: - job[1].result() - except Exception as e: - print(f"Error uploading {job[0]}: {str(e)}", file=sys.stderr) + # Wait for each upload to finish + file_path, future, bucket, key = job + try: + future.result() + except Exception as e: + print(f"Error uploading {file_path}: {str(e)}", file=sys.stderr) end = time.monotonic() seconds = end - start @@ -205,6 +225,30 @@ def upload (config): up.stop() print(f"average upload speed: {mb/seconds} MB/s") + # ETag check after timing + etag_fail = False + for job in results: + file_path, _, bucket, key = job + try: + if config.check_etag and pathlib.Path(file_path).is_file(): + resp = up.s3.head_object(Bucket=bucket, Key=key) + etag = resp['ETag'].strip('"') + md5 = calc_md5(file_path) + if '-' in etag: + print(f"[ETag] {file_path}: multipart upload detected, cannot compare MD5.") + elif etag == md5: + print(f"[ETag] {file_path}: OK (etag matches local MD5)") + else: + print(f"\033[31m[ETag] {file_path}: FAIL (etag {etag} != local MD5 {md5})\033[0m") + etag_fail = True + else: + pass + except Exception as e: + print(f"Error uploading {file_path}: {str(e)}", file=sys.stderr) + etag_fail = True + + if config.check_etag and etag_fail: + sys.exit(1) return float(mb)/seconds if __name__ == "__main__":