diff --git a/src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java b/src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java new file mode 100644 index 00000000..ad7a3ef9 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/errors/PersistCriticalError.java @@ -0,0 +1,138 @@ +package io.numaproj.numaflow.errors; + +import io.numaproj.numaflow.shared.ExceptionUtils; +import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.file.*; +import java.nio.file.attribute.PosixFilePermissions; + +/** + * The PersistCriticalError class provides functionality to persist critical errors to a file + * in a runtime directory. This is useful for logging critical errors in a structured format + * for debugging and monitoring purposes. The class ensures that the error persistence operation + * is executed only once during the application's runtime. + */ +@Slf4j +public class PersistCriticalError { + + private static final String DEFAULT_RUNTIME_APPLICATION_ERRORS_PATH = "/var/numaflow/runtime/application-errors"; + private static final String CURRENT_FILE = "current-udf.json"; + private static final String INTERNAL_ERROR = "Internal error"; + private static final String UNKNOWN_CONTAINER = "unknown-container"; + private static final ObjectMapper jsonMapper = new ObjectMapper(); + static final String CONTAINER_TYPE = System.getenv(ExceptionUtils.ENV_UD_CONTAINER_TYPE) != null + ? System.getenv(ExceptionUtils.ENV_UD_CONTAINER_TYPE) + : UNKNOWN_CONTAINER; + + private static final AtomicBoolean isPersisted = new AtomicBoolean(false); + + /** + * Resets the isPersisted flag for testing purposes. + * + * @param value the value to set for isPersisted + */ + static void setIsPersisted(boolean value) { + isPersisted.set(value); + } + + /** + * Persists a critical error to a file. Ensures the method is executed only once. + * + * @param errorCode the error code + * @param errorMessage the error message + * @param errorDetails additional error details + * @throws IllegalStateException if the method has already been executed + */ + public static synchronized void persistCriticalError(String errorCode, String errorMessage, String errorDetails) throws IllegalStateException { + if (!isPersisted.compareAndSet(false, true)) { + throw new IllegalStateException("Persist critical error function has already been executed."); + } + try { + persistCriticalErrorToFile(errorCode, errorMessage, errorDetails, DEFAULT_RUNTIME_APPLICATION_ERRORS_PATH); + } catch (IOException e) { + log.error("Error occurred while persisting critical error", e); + } + } + + /** + * Writes the critical error details to a file. + * + * @param errorCode the error code + * @param errorMessage the error message + * @param errorDetails additional error details + * @throws IOException if an error occurs while writing to the file + */ + static void persistCriticalErrorToFile(String errorCode, String errorMessage, String errorDetails, String baseDir) throws IOException { + Path containerDirPath = createDirectory(Paths.get(baseDir).resolve(CONTAINER_TYPE)); + + errorCode = (errorCode == null || errorCode.isEmpty()) ? INTERNAL_ERROR : errorCode; + long timestamp = Instant.now().getEpochSecond(); + + RuntimeErrorEntry errorEntry = new RuntimeErrorEntry(CONTAINER_TYPE, timestamp, errorCode, errorMessage, errorDetails); + String errorEntryJson = errorEntry.toJson(); + + Path currentFilePath = containerDirPath.resolve(CURRENT_FILE); + Files.writeString(currentFilePath, errorEntryJson, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + Path finalFilePath = containerDirPath.resolve(String.format("%d-udf.json", timestamp)); + Files.move(currentFilePath, finalFilePath, StandardCopyOption.REPLACE_EXISTING); + } + + /** + * Creates a directory with read,write,execute permissions for all if it does not already exist. + * + * @param dirPath the path of the directory to create + * @return the path of the created directory + * @throws IOException if an error occurs while creating the directory + */ + private static Path createDirectory(Path dirPath) throws IOException { + if (!Files.exists(dirPath)) { + Files.createDirectories(dirPath); + Files.setPosixFilePermissions(dirPath, PosixFilePermissions.fromString("rwxrwxrwx")); + } + return dirPath; + } + + /** + * Private static class representing runtime error entry. + */ + private static class RuntimeErrorEntry { + @JsonProperty("container") + private final String container; + @JsonProperty("timestamp") + private final long timestamp; + @JsonProperty("code") + private final String code; + @JsonProperty("message") + private final String message; + @JsonProperty("details") + private final String details; + + public RuntimeErrorEntry(String container, long timestamp, String code, String message, String details) { + this.container = container; + this.timestamp = timestamp; + this.code = code; + this.message = message; + this.details = details; + } + + /** + * Converts the RuntimeErrorEntry object to a JSON string. + * + * @return JSON representation of the runtime error entry + * @throws IOException if an error occurs during serialization + */ + public String toJson() throws IOException { + try { + return jsonMapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new IOException("Failed to convert RuntimeErrorEntry to JSON", e); + } + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/errors/PersistCriticalErrorTest.java b/src/test/java/io/numaproj/numaflow/errors/PersistCriticalErrorTest.java new file mode 100644 index 00000000..c60b56db --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/errors/PersistCriticalErrorTest.java @@ -0,0 +1,121 @@ +package io.numaproj.numaflow.errors; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import lombok.extern.slf4j.Slf4j; +import org.junit.After; +import org.junit.Test; + +@Slf4j +public class PersistCriticalErrorTest { + + @After + public void resetIsPersisted() { + // Reset the isPersisted flag to its default value (false) after each test + PersistCriticalError.setIsPersisted(false); + } + + @Test + public void test_writes_error_details_to_new_file() { + String errorCode = "404"; + String errorMessage = "Not Found"; + String errorDetails = "The requested resource was not found."; + String baseDir = "/tmp/test-success"; + + try { + PersistCriticalError.persistCriticalErrorToFile(errorCode, errorMessage, errorDetails, baseDir); + Path containerDirPath = Paths.get(baseDir, PersistCriticalError.CONTAINER_TYPE); + Path jsonFilePath = Files.list(containerDirPath) + .filter(path -> path.toString().endsWith(".json")) + .findFirst() + .orElseThrow(() -> new AssertionError("No .json file found in the directory")); + + // Read the contents of the file + String fileContent = Files.readString(jsonFilePath); + // Assert that the file contains the expected values + assertTrue(fileContent.contains(errorCode)); + assertTrue(fileContent.contains(errorMessage)); + assertTrue(fileContent.contains(errorDetails)); + } catch (Exception e) { + fail("Exception occurred while writing error details to the file: " + e.getMessage()); + } + } + + @Test + public void test_handles_null_or_empty_error_code() { + String errorCode = null; + String errorMessage = "Internal Server Error"; + String errorDetails = "An unexpected error occurred."; + String baseDir = "/tmp/test-errors"; + + try { + PersistCriticalError.persistCriticalErrorToFile(errorCode, errorMessage, errorDetails, baseDir); + Path containerDirPath = Paths.get(baseDir, PersistCriticalError.CONTAINER_TYPE); + Path jsonFilePath = Files.list(containerDirPath) + .filter(path -> path.toString().endsWith(".json")) + .findFirst() + .orElseThrow(() -> new AssertionError("No .json file found in the directory")); + + // Read the contents of the file + String fileContent = Files.readString(jsonFilePath); + // Assert that the file contains the expected values + assertTrue(fileContent.contains("Internal error")); + assertTrue(fileContent.contains(errorMessage)); + assertTrue(fileContent.contains(errorDetails)); + } catch (Exception e) { + fail("Exception occurred while writing error details to the file: " + e.getMessage()); + } + } + + @Test + public void test_persistCriticalError_when_isPersisted_is_true_with_threads() { + // Set isPersisted to true before calling persistCriticalError + PersistCriticalError.setIsPersisted(true); + + String errorCode = "500"; + String errorMessage = "Critical Error"; + String errorDetails = "A critical error occurred."; + + // Number of threads to simulate concurrent calls + int numberOfThreads = 5; + + // Create a thread pool + ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); + + // Runnable task to call persistCriticalError + Runnable task = () -> { + try { + PersistCriticalError.persistCriticalError(errorCode, errorMessage, errorDetails); + fail("Expected IllegalStateException to be thrown, but it was not."); + } catch (IllegalStateException e) { + // Assert that the exception message is as expected + assertTrue(e.getMessage().contains("Persist critical error function has already been executed.")); + } catch (Exception e) { + fail("Unexpected exception occurred: " + e.getMessage()); + } + }; + + // Submit tasks to the executor + for (int i = 0; i < numberOfThreads; i++) { + executorService.submit(task); + } + + // Shut down the executor + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)) { + fail("Executor service did not terminate in the expected time."); + } + } catch (InterruptedException e) { + fail("Runtime exception occurred: " + e.getMessage()); + Thread.currentThread().interrupt(); + } + } +}