diff --git a/charon/pkgs/maven.py b/charon/pkgs/maven.py index 4a49e8e7..9c234612 100644 --- a/charon/pkgs/maven.py +++ b/charon/pkgs/maven.py @@ -320,7 +320,7 @@ def handle_maven_uploading( if not manifest_bucket_name: logger.warning( 'Warning: No manifest bucket is provided, will ignore the process of manifest ' - 'uploading') + 'uploading\n') else: manifest_name, manifest_full_path = write_manifest(valid_mvn_paths, top_level, prod_key) s3_client.upload_manifest(manifest_name, manifest_full_path, target, manifest_bucket_name) diff --git a/charon/pkgs/npm.py b/charon/pkgs/npm.py index e49c31f9..40dddbfb 100644 --- a/charon/pkgs/npm.py +++ b/charon/pkgs/npm.py @@ -113,7 +113,7 @@ def handle_npm_uploading( if not manifest_bucket_name: logger.warning( 'Warning: No manifest bucket is provided, will ignore the process of manifest ' - 'uploading') + 'uploading\n') else: manifest_name, manifest_full_path = write_manifest(valid_paths, target_dir, product) client.upload_manifest(manifest_name, manifest_full_path, target, manifest_bucket_name) diff --git a/charon/storage.py b/charon/storage.py index f0ec3d0c..1cc88398 100644 --- a/charon/storage.py +++ b/charon/storage.py @@ -44,6 +44,8 @@ DEFAULT_MIME_TYPE = "application/octet-stream" +FILE_REPORT_LIMIT = 1000 + class S3Client(object): """The S3Client is a wrapper of the original boto3 s3 client, which will provide @@ -132,12 +134,14 @@ async def path_upload_handler( ): async with self.__con_sem: if not os.path.isfile(full_file_path): - logger.warning('Warning: file %s does not exist during uploading. Product: %s', - full_file_path, product) + logger.warning( + 'Warning: file %s does not exist during uploading. Product: %s', + full_file_path, product + ) failed.append(full_file_path) return - logger.info( + logger.debug( '(%d/%d) Uploading %s to bucket %s', index, total, full_file_path, bucket_name ) @@ -175,16 +179,18 @@ async def path_upload_handler( if product: await self.__update_prod_info(path_key, bucket_name, [product]) - logger.info('Uploaded %s to bucket %s', path, bucket_name) + logger.debug('Uploaded %s to bucket %s', path, bucket_name) uploaded_files.append(path_key) except (ClientError, HTTPClientError) as e: - logger.error("ERROR: file %s not uploaded to bucket" - " %s due to error: %s ", full_file_path, - bucket_name, e) + logger.error( + "ERROR: file %s not uploaded to bucket" + " %s due to error: %s ", + full_file_path, bucket_name, e + ) failed.append(full_file_path) return else: - logger.info( + logger.debug( "File %s already exists, check if need to update product.", full_file_path, ) @@ -193,9 +199,11 @@ async def path_upload_handler( f_meta[CHECKSUM_META_KEY] if CHECKSUM_META_KEY in f_meta else "" ) if checksum != "" and checksum.strip() != sha1: - logger.error('Error: checksum check failed. The file %s is ' - 'different from the one in S3. Product: %s', - path_key, product) + logger.error( + 'Error: checksum check failed. The file %s is ' + 'different from the one in S3. Product: %s', + path_key, product + ) failed.append(full_file_path) return (prods, no_error) = await self.__run_async( @@ -203,7 +211,7 @@ async def path_upload_handler( path_key, bucket_name ) if not self.__dry_run and no_error and product not in prods: - logger.info( + logger.debug( "File %s has new product, updating the product %s", full_file_path, product, @@ -215,7 +223,9 @@ async def path_upload_handler( return return (uploaded_files, self.__do_path_cut_and( - file_paths=file_paths, path_handler=path_upload_handler, root=root + file_paths=file_paths, + path_handler=self.__path_handler_count_wrapper(path_upload_handler), + root=root )) def upload_metadatas( @@ -238,12 +248,14 @@ async def path_upload_handler( ): async with self.__con_sem: if not os.path.isfile(full_file_path): - logger.warning('Warning: file %s does not exist during uploading. Product: %s', - full_file_path, product) + logger.warning( + 'Warning: file %s does not exist during uploading. Product: %s', + full_file_path, product + ) failed.append(full_file_path) return - logger.info( + logger.debug( '(%d/%d) Updating metadata %s to bucket %s', index, total, path, bucket_name ) @@ -276,9 +288,9 @@ async def path_upload_handler( ) ) if product: - # NOTE: This should not happen for most cases, as most of the metadata - # file does not have product info. Just leave for requirement change in - # future + # NOTE: This should not happen for most cases, as most + # of the metadata file does not have product info. Just + # leave for requirement change in future (prods, no_error) = await self.__run_async( self.__get_prod_info, path_key, bucket_name @@ -288,20 +300,26 @@ async def path_upload_handler( return if no_error and product not in prods: prods.append(product) - updated = await self.__update_prod_info(path_key, bucket_name, prods) + updated = await self.__update_prod_info( + path_key, bucket_name, prods + ) if not updated: failed.append(full_file_path) return - logger.info('Updated metadata %s to bucket %s', path, bucket_name) + logger.debug('Updated metadata %s to bucket %s', path, bucket_name) uploaded_files.append(path_key) except (ClientError, HTTPClientError) as e: - logger.error("ERROR: file %s not uploaded to bucket" - " %s due to error: %s ", full_file_path, - bucket_name, e) + logger.error( + "ERROR: file %s not uploaded to bucket" + " %s due to error: %s ", + full_file_path, bucket_name, e + ) failed.append(full_file_path) return (uploaded_files, self.__do_path_cut_and( - file_paths=meta_file_paths, path_handler=path_upload_handler, root=root + file_paths=meta_file_paths, + path_handler=self.__path_handler_count_wrapper(path_upload_handler), + root=root )) def upload_manifest( @@ -346,7 +364,7 @@ async def path_delete_handler( total: int, failed: List[str] ): async with self.__con_sem: - logger.info('(%d/%d) Deleting %s from bucket %s', index, total, path, bucket_name) + logger.debug('(%d/%d) Deleting %s from bucket %s', index, total, path, bucket_name) path_key = os.path.join(key_prefix, path) if key_prefix else path file_object = bucket.Object(path_key) existed = await self.__run_async(self.__file_exists, file_object) @@ -368,13 +386,13 @@ async def path_delete_handler( if len(prods) > 0: try: - logger.info( + logger.debug( "File %s has other products overlapping," " will remove %s from its metadata", path, product ) await self.__update_prod_info(path_key, bucket_name, prods) - logger.info( + logger.debug( "Removed product %s from metadata of file %s", product, path ) @@ -405,18 +423,25 @@ async def path_delete_handler( deleted_files.append(path) return except (ClientError, HTTPClientError) as e: - logger.error("ERROR: file %s failed to delete from bucket" - " %s due to error: %s ", full_file_path, - bucket_name, e) + logger.error( + "ERROR: file %s failed to delete from bucket" + " %s due to error: %s ", + full_file_path, bucket_name, e + ) failed.append(full_file_path) return else: - logger.info("File %s does not exist in s3 bucket %s, skip deletion.", - path, bucket_name) + logger.debug( + "File %s does not exist in s3 bucket %s, skip deletion.", + path, bucket_name + ) return failed_files = self.__do_path_cut_and( - file_paths=file_paths, path_handler=path_delete_handler, root=root) + file_paths=file_paths, + path_handler=self.__path_handler_count_wrapper(path_delete_handler), + root=root + ) return (deleted_files, failed_files) @@ -594,6 +619,21 @@ async def __update_prod_info( "due to error: %s", file, e) return False + def __path_handler_count_wrapper( + self, + path_handler: Callable[[str, str, int, int, List[str], asyncio.Semaphore], Awaitable[bool]] + ) -> Callable[[str, str, int, int, List[str], asyncio.Semaphore], Awaitable[bool]]: + async def wrapper( + full_file_path: str, path: str, index: int, + total: int, failed: List[str] + ): + try: + await path_handler(full_file_path, path, index, total, failed) + finally: + if index % FILE_REPORT_LIMIT == 0: + logger.info("######### %d/%d files finished", index, total) + return wrapper + def __do_path_cut_and( self, file_paths: List[str], path_handler: Callable[[str, str, int, int, List[str], asyncio.Semaphore], Awaitable[bool]], @@ -616,6 +656,7 @@ def __do_path_cut_and( ) ) index += 1 + loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) return failed_paths