Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@
*/
package org.dspace.content;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -28,9 +26,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
Expand Down Expand Up @@ -73,6 +78,9 @@ public class PreviewContentServiceImpl implements PreviewContentService {
@Value("${file.preview.zip.limit.length:1000}")
private int maxPreviewCount;

@Value("${file.preview.archive.thread.pool.size:#{null}}")
private Integer archiveThreadPoolSize;


@Autowired
PreviewContentDAO previewContentDAO;
Expand Down Expand Up @@ -287,18 +295,6 @@ private <T, U> Hashtable<String, T> createSubMap(Map<String, U> sourceMap, Funct
return sub;
}

/**
* Creates a temporary file with the appropriate extension based on the specified file type.
* @param fileType the type of file for which to create a temporary file
* @return a Path object representing the temporary file
* @throws IOException if an I/O error occurs while creating the file
*/
private Path createTempFile(String fileType) throws IOException {
String extension = ARCHIVE_TYPE_TAR.equals(fileType) ?
String.format(".%s", ARCHIVE_TYPE_TAR) : String.format(".%s", ARCHIVE_TYPE_ZIP);
return Files.createTempFile("temp", extension);
}

/**
* Adds a file path and its size to the list of file paths.
* If the path represents a directory, appends a "/" to the path.
Expand All @@ -307,69 +303,117 @@ private Path createTempFile(String fileType) throws IOException {
* @param size the size of the file or directory
*/
private void addFilePath(List<String> filePaths, String path, long size) {
String fileInfo = (Files.isDirectory(Paths.get(path))) ? path + "/|" + size : path + "|" + size;
Path p = Paths.get(path);
if (!Files.exists(p)) {
throw new IllegalArgumentException("Path does not exist: " + path);
}
String fileInfo = Files.isDirectory(p) ? path + "/|" + size : path + "|" + size;
filePaths.add(fileInfo);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
Paurikova2 marked this conversation as resolved.

/**
* Processes a TAR file, extracting its entries and adding their paths to the provided list.
* @param filePaths the list to populate with the extracted file paths
* @param tempFile the temporary TAR file to process
* Processes a TAR file, extracting its entries and adding their paths and sizes to the provided list.
* Utilizes parallelism to process each TAR entry concurrently.
*
* @param filePaths the list to populate with the extracted file paths and sizes
* @param inputStream the InputStream containing the TAR file data
* @throws IOException if an I/O error occurs while reading the TAR file
* @throws InterruptedException if the current thread is interrupted while waiting for the completion of a task
* @throws ExecutionException if an exception occurs during the execution of a parallel task
*/
private void processTarFile(List<String> filePaths, Path tempFile) throws IOException {
try (InputStream fi = Files.newInputStream(tempFile);
TarArchiveInputStream tis = new TarArchiveInputStream(fi)) {
private void processTarFile(List<String> filePaths, InputStream inputStream)
throws IOException, InterruptedException, ExecutionException {
List<TarArchiveEntry> entries = new ArrayList<>();
int threadPoolSize = (archiveThreadPoolSize != null) ?
archiveThreadPoolSize : Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
List<Future<List<String>>> futures = new ArrayList<>();

try (BufferedInputStream bufferedStream = new BufferedInputStream(inputStream);
TarArchiveInputStream tis = new TarArchiveInputStream(bufferedStream)) {

TarArchiveEntry entry;
while ((entry = tis.getNextTarEntry()) != null) {
addFilePath(filePaths, entry.getName(), entry.getSize());
if (!entry.isDirectory()) {
entries.add(entry);
}
}
// Process sequentially if below threshold
if (entries.size() < archiveThreadPoolSize) {
for (TarArchiveEntry e : entries) {
addFilePath(filePaths, e.getName(), e.getSize());
}
return;
}
// Process in parallel if above threshold
for (TarArchiveEntry e : entries) {
String entryName = e.getName();
long fileSize = e.getSize();
Comment thread
milanmajchrak marked this conversation as resolved.
Outdated
Callable<List<String>> task = () -> {
List<String> localPaths = new ArrayList<>();
addFilePath(localPaths, entryName, fileSize);
return localPaths;
};
futures.add(executorService.submit(task));
}
for (Future<List<String>> future : futures) {
filePaths.addAll(future.get()); // Thread-safe addition
}
Comment thread
Paurikova2 marked this conversation as resolved.
} finally {
executorService.shutdown();
}
}

/**
* Processes a ZIP file, extracting its entries and adding their paths to the provided list.
* @param filePaths the list to populate with the extracted file paths
* @param zipFileSystem the FileSystem object representing the ZIP file
* Processes a ZIP file, extracting its entries and adding their paths and sizes to the provided list.
* Utilizes parallelism to process each ZIP entry concurrently.
*
* @param filePaths the list to populate with the extracted file paths and sizes
* @param inputStream the InputStream containing the ZIP file data
* @throws IOException if an I/O error occurs while reading the ZIP file
* @throws InterruptedException if the current thread is interrupted while waiting for the completion of a task
* @throws ExecutionException if an exception occurs during the execution of a parallel task
*/
private void processZipFile(List<String> filePaths, FileSystem zipFileSystem) throws IOException {
Path root = zipFileSystem.getPath("/");
Files.walk(root).forEach(path -> {
try {
long fileSize = Files.size(path);
addFilePath(filePaths, path.toString().substring(1), fileSize);
} catch (IOException e) {
log.error("An error occurred while getting the size of the zip file.", e);
private void processZipFile(List<String> filePaths, InputStream inputStream)
throws IOException, InterruptedException, ExecutionException {
List<ZipEntry> entries = new ArrayList<>();
int threadPoolSize = (archiveThreadPoolSize != null) ?
archiveThreadPoolSize : Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
List<Future<List<String>>> futures = new ArrayList<>();

try (BufferedInputStream bufferedStream = new BufferedInputStream(inputStream);
ZipInputStream zipInputStream = new ZipInputStream(bufferedStream)) {

ZipEntry entry;
while ((entry = zipInputStream.getNextEntry()) != null) {
if (!entry.isDirectory()) {
entries.add(entry);
}
}
});
}

/**
* Closes the specified FileSystem resource if it is not null.
* @param zipFileSystem the FileSystem to close
*/
private void closeFileSystem(FileSystem zipFileSystem) {
if (Objects.nonNull(zipFileSystem)) {
try {
zipFileSystem.close();
} catch (IOException e) {
log.error("An error occurred while closing the zip file.", e);
// Process sequentially if below threshold
if (entries.size() < archiveThreadPoolSize) {
for (ZipEntry e : entries) {
addFilePath(filePaths, e.getName(), e.getSize());
}
return;
}
}
}

/**
* Deletes the specified temporary file if it is not null.
* @param tempFile the Path object representing the temporary file to delete
*/
private void deleteTempFile(Path tempFile) {
if (Objects.nonNull(tempFile)) {
try {
Files.delete(tempFile);
} catch (IOException e) {
log.error("An error occurred while deleting temp file.", e);
// Process in parallel if above threshold
for (ZipEntry e : entries) {
String entryName = e.getName();
long fileSize = e.getSize();
Callable<List<String>> task = () -> {
List<String> localPaths = new ArrayList<>();
addFilePath(localPaths, entryName, fileSize);
return localPaths;
};
futures.add(executorService.submit(task));
}
for (Future<List<String>> future : futures) {
filePaths.addAll(future.get()); // Thread-safe addition
}
} finally {
executorService.shutdown();
}
}

Expand Down Expand Up @@ -406,36 +450,20 @@ private String buildXmlResponse(List<String> filePaths) {
}

/**
* Extracts files from an InputStream, processes them based on the specified file type (tar or zip),
* Processes file data based on the specified file type (tar or zip),
* and returns an XML representation of the file paths.
* @param inputStream the InputStream containing the file data
* @param fileType the type of file to extract ("tar" or "zip")
* @return an XML string representing the extracted file paths
*/
private String extractFile(InputStream inputStream, String fileType) throws Exception {
List<String> filePaths = new ArrayList<>();
Path tempFile = null;
FileSystem zipFileSystem = null;

try {
// Create a temporary file based on the file type
tempFile = createTempFile(fileType);

// Copy the input stream to the temporary file
Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING);

// Process the file based on its type
if (ARCHIVE_TYPE_TAR.equals(fileType)) {
processTarFile(filePaths, tempFile);
} else {
zipFileSystem = FileSystems.newFileSystem(tempFile, (ClassLoader) null);
processZipFile(filePaths, zipFileSystem);
}
} finally {
closeFileSystem(zipFileSystem);
deleteTempFile(tempFile);
// Process the file based on its type
if (ARCHIVE_TYPE_TAR.equals(fileType)) {
processTarFile(filePaths, inputStream);
} else {
processZipFile(filePaths, inputStream);
}

return buildXmlResponse(filePaths);
}

Expand Down
Loading