Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.numaproj.numaflow.errors;

import io.numaproj.numaflow.shared.ExceptionUtils;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.*;
import java.nio.file.attribute.PosixFilePermissions;

/**
* The PersistCriticalError class provides functionality to persist critical errors to a file
* in a runtime directory. This is useful for logging critical errors in a structured format
* for debugging and monitoring purposes. The class ensures that the error persistence operation
* is executed only once during the application's runtime.
*/
@Slf4j
public class PersistCriticalError {

Check warning on line 21 in src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java#L21

Added line #L21 was not covered by tests

private static final String DEFAULT_RUNTIME_APPLICATION_ERRORS_PATH = "/var/numaflow/runtime/application-errors";
private static final String CURRENT_FILE = "current-udf.json";
private static final String INTERNAL_ERROR = "Internal error";
private static final String UNKNOWN_CONTAINER = "unknown-container";
private static final ObjectMapper jsonMapper = new ObjectMapper();
static final String CONTAINER_TYPE = System.getenv(ExceptionUtils.ENV_UD_CONTAINER_TYPE) != null
? System.getenv(ExceptionUtils.ENV_UD_CONTAINER_TYPE)

Check warning on line 29 in src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java#L29

Added line #L29 was not covered by tests
: UNKNOWN_CONTAINER;

private static final AtomicBoolean isPersisted = new AtomicBoolean(false);

/**
* Resets the isPersisted flag for testing purposes.
*
* @param value the value to set for isPersisted
*/
static void setIsPersisted(boolean value) {
isPersisted.set(value);
}

/**
* Persists a critical error to a file. Ensures the method is executed only once.
*
* @param errorCode the error code
* @param errorMessage the error message
* @param errorDetails additional error details
* @throws IllegalStateException if the method has already been executed
*/
public static synchronized void persistCriticalError(String errorCode, String errorMessage, String errorDetails) throws IllegalStateException {
if (!isPersisted.compareAndSet(false, true)) {
throw new IllegalStateException("Persist critical error function has already been executed.");
}
try {
persistCriticalErrorToFile(errorCode, errorMessage, errorDetails, DEFAULT_RUNTIME_APPLICATION_ERRORS_PATH);
} catch (IOException e) {
log.error("Error occurred while persisting critical error", e);
}
}

Check warning on line 60 in src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java#L56-L60

Added lines #L56 - L60 were not covered by tests

/**
* Writes the critical error details to a file.
*
* @param errorCode the error code
* @param errorMessage the error message
* @param errorDetails additional error details
* @throws IOException if an error occurs while writing to the file
*/
static void persistCriticalErrorToFile(String errorCode, String errorMessage, String errorDetails, String baseDir) throws IOException {
Path containerDirPath = createDirectory(Paths.get(baseDir).resolve(CONTAINER_TYPE));

errorCode = (errorCode == null || errorCode.isEmpty()) ? INTERNAL_ERROR : errorCode;
long timestamp = Instant.now().getEpochSecond();

RuntimeErrorEntry errorEntry = new RuntimeErrorEntry(CONTAINER_TYPE, timestamp, errorCode, errorMessage, errorDetails);
String errorEntryJson = errorEntry.toJson();

Path currentFilePath = containerDirPath.resolve(CURRENT_FILE);
Files.writeString(currentFilePath, errorEntryJson, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);

Path finalFilePath = containerDirPath.resolve(String.format("%d-udf.json", timestamp));
Files.move(currentFilePath, finalFilePath, StandardCopyOption.REPLACE_EXISTING);
}

/**
* Creates a directory with read,write,execute permissions for all if it does not already exist.
*
* @param dirPath the path of the directory to create
* @return the path of the created directory
* @throws IOException if an error occurs while creating the directory
*/
private static Path createDirectory(Path dirPath) throws IOException {
if (!Files.exists(dirPath)) {
Files.createDirectories(dirPath);
Files.setPosixFilePermissions(dirPath, PosixFilePermissions.fromString("rwxrwxrwx"));
}
return dirPath;
}

/**
* Private static class representing runtime error entry.
*/
private static class RuntimeErrorEntry {
@JsonProperty("container")
private final String container;
@JsonProperty("timestamp")
private final long timestamp;
@JsonProperty("code")
private final String code;
@JsonProperty("message")
private final String message;
@JsonProperty("details")
private final String details;

public RuntimeErrorEntry(String container, long timestamp, String code, String message, String details) {
this.container = container;
this.timestamp = timestamp;
this.code = code;
this.message = message;
this.details = details;
}

/**
* Converts the RuntimeErrorEntry object to a JSON string.
*
* @return JSON representation of the runtime error entry
* @throws IOException if an error occurs during serialization
*/
public String toJson() throws IOException {
try {
return jsonMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new IOException("Failed to convert RuntimeErrorEntry to JSON", e);

Check warning on line 134 in src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java#L133-L134

Added lines #L133 - L134 were not covered by tests
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.numaproj.numaflow.errors;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Test;

@Slf4j
public class PersistCriticalErrorTest {

@After
public void resetIsPersisted() {
// Reset the isPersisted flag to its default value (false) after each test
PersistCriticalError.setIsPersisted(false);
}

@Test
public void test_writes_error_details_to_new_file() {
String errorCode = "404";
String errorMessage = "Not Found";
String errorDetails = "The requested resource was not found.";
String baseDir = "/tmp/test-success";

try {
PersistCriticalError.persistCriticalErrorToFile(errorCode, errorMessage, errorDetails, baseDir);
Path containerDirPath = Paths.get(baseDir, PersistCriticalError.CONTAINER_TYPE);
Path jsonFilePath = Files.list(containerDirPath)
.filter(path -> path.toString().endsWith(".json"))
.findFirst()
.orElseThrow(() -> new AssertionError("No .json file found in the directory"));

// Read the contents of the file
String fileContent = Files.readString(jsonFilePath);
// Assert that the file contains the expected values
assertTrue(fileContent.contains(errorCode));
assertTrue(fileContent.contains(errorMessage));
assertTrue(fileContent.contains(errorDetails));
} catch (Exception e) {
fail("Exception occurred while writing error details to the file: " + e.getMessage());
}
}

@Test
public void test_handles_null_or_empty_error_code() {
String errorCode = null;
String errorMessage = "Internal Server Error";
String errorDetails = "An unexpected error occurred.";
String baseDir = "/tmp/test-errors";

try {
PersistCriticalError.persistCriticalErrorToFile(errorCode, errorMessage, errorDetails, baseDir);
Path containerDirPath = Paths.get(baseDir, PersistCriticalError.CONTAINER_TYPE);
Path jsonFilePath = Files.list(containerDirPath)
.filter(path -> path.toString().endsWith(".json"))
.findFirst()
.orElseThrow(() -> new AssertionError("No .json file found in the directory"));

// Read the contents of the file
String fileContent = Files.readString(jsonFilePath);
// Assert that the file contains the expected values
assertTrue(fileContent.contains("Internal error"));
assertTrue(fileContent.contains(errorMessage));
assertTrue(fileContent.contains(errorDetails));
} catch (Exception e) {
fail("Exception occurred while writing error details to the file: " + e.getMessage());
}
}

@Test
public void test_persistCriticalError_when_isPersisted_is_true_with_threads() {
// Set isPersisted to true before calling persistCriticalError
PersistCriticalError.setIsPersisted(true);

String errorCode = "500";
String errorMessage = "Critical Error";
String errorDetails = "A critical error occurred.";

// Number of threads to simulate concurrent calls
int numberOfThreads = 5;

// Create a thread pool
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

// Runnable task to call persistCriticalError
Runnable task = () -> {
try {
PersistCriticalError.persistCriticalError(errorCode, errorMessage, errorDetails);
fail("Expected IllegalStateException to be thrown, but it was not.");
} catch (IllegalStateException e) {
// Assert that the exception message is as expected
assertTrue(e.getMessage().contains("Persist critical error function has already been executed."));
} catch (Exception e) {
fail("Unexpected exception occurred: " + e.getMessage());
}
};

// Submit tasks to the executor
for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(task);
}

// Shut down the executor
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)) {
fail("Executor service did not terminate in the expected time.");
}
} catch (InterruptedException e) {
fail("Runtime exception occurred: " + e.getMessage());
Thread.currentThread().interrupt();
}
}
}
Loading