diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 4da113eab..1ed74bc13 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -61,6 +61,7 @@ configurations { dependencies { minimalJar project(path: ':dagger-common', configuration: 'minimalCommonJar') minimalJar project(path: ':dagger-functions', configuration: 'minimalFunctionsJar') + minimalJar 'io.odpf:depot:0.1.5' compileOnly 'org.projectlombok:lombok:1.18.8' annotationProcessor 'org.projectlombok:lombok:1.18.8' diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index e502c975c..606de25ef 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core; +import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter; import io.odpf.dagger.core.source.Stream; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; @@ -220,6 +221,8 @@ private void addSink(StreamInfo streamInfo) { } List getStreams() { - return StreamsFactory.getStreams(configuration, stencilClientOrchestrator); + org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration(); + DaggerStatsDReporter daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration); + return StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter); } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactory.java index 2c8e2a954..3955cfbb9 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactory.java @@ -4,6 +4,8 @@ import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.core.exception.DaggerConfigurationException; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; import io.odpf.dagger.core.source.config.StreamConfig; import org.apache.flink.types.Row; @@ -12,12 +14,17 @@ import java.util.stream.Stream; public class DaggerDeserializerFactory { - public static DaggerDeserializer create(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator) { + public static DaggerDeserializer create(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, SerializedStatsDReporterSupplier statsDReporterSupplier) { return getDaggerDeserializerProviders(streamConfig, configuration, stencilClientOrchestrator) .stream() .filter(DaggerDeserializerProvider::canProvide) .findFirst() - .orElseThrow(() -> new DaggerConfigurationException("No suitable deserializer could be constructed for the given stream configuration.")) + .orElseThrow(() -> { + StatsDErrorReporter statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); + DaggerConfigurationException ex = new DaggerConfigurationException("No suitable deserializer could be constructed for the given stream configuration."); + statsDErrorReporter.reportFatalException(ex); + return ex; + }) .getDaggerDeserializer(); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/aspects/ChronologyOrderedSplitAssignerAspects.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/aspects/ChronologyOrderedSplitAssignerAspects.java new file mode 100644 index 000000000..555abccdd --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/aspects/ChronologyOrderedSplitAssignerAspects.java @@ -0,0 +1,28 @@ +package io.odpf.dagger.core.metrics.aspects; + +import io.odpf.dagger.common.metrics.aspects.AspectType; +import io.odpf.dagger.common.metrics.aspects.Aspects; + +public enum ChronologyOrderedSplitAssignerAspects implements Aspects { + TOTAL_SPLITS_DISCOVERED("total_splits_discovered", AspectType.Gauge), + TOTAL_SPLITS_RECORDED("total_splits_recorded", AspectType.Gauge), + SPLITS_AWAITING_ASSIGNMENT("splits_awaiting_assignment", AspectType.Counter); + + ChronologyOrderedSplitAssignerAspects(String value, AspectType aspectType) { + this.value = value; + this.aspectType = aspectType; + } + + private final String value; + private final AspectType aspectType; + + @Override + public String getValue() { + return value; + } + + @Override + public AspectType getAspectType() { + return aspectType; + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/aspects/ParquetReaderAspects.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/aspects/ParquetReaderAspects.java new file mode 100644 index 000000000..9a755e843 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/aspects/ParquetReaderAspects.java @@ -0,0 +1,30 @@ +package io.odpf.dagger.core.metrics.aspects; + +import io.odpf.dagger.common.metrics.aspects.AspectType; +import io.odpf.dagger.common.metrics.aspects.Aspects; + +public enum ParquetReaderAspects implements Aspects { + READER_CREATED("reader_created", AspectType.Counter), + READER_CLOSED("reader_closed", AspectType.Counter), + READER_ROWS_EMITTED("reader_rows_emitted", AspectType.Counter), + READER_ROW_DESERIALIZATION_TIME("reader_row_deserialization_time", AspectType.Histogram), + READER_ROW_READ_TIME("reader_row_read_time", AspectType.Histogram); + + private final String value; + private final AspectType aspectType; + + ParquetReaderAspects(String value, AspectType aspectType) { + this.value = value; + this.aspectType = aspectType; + } + + @Override + public String getValue() { + return value; + } + + @Override + public AspectType getAspectType() { + return aspectType; + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerMetricsConfig.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerMetricsConfig.java new file mode 100644 index 000000000..15ea4ffed --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerMetricsConfig.java @@ -0,0 +1,43 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import io.odpf.depot.config.MetricsConfig; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; + +public class DaggerMetricsConfig implements MetricsConfig { + private static final String FLINK_STATSD_HOST_CONFIG_KEY = "metrics.reporter.stsd.host"; + private static final String DEFAULT_STATSD_HOST_VALUE = "localhost"; + private static final String FLINK_STATSD_PORT_CONFIG_KEY = "metrics.reporter.stsd.port"; + private static final int DEFAULT_STATSD_PORT_VALUE = 8125; + private final String hostName; + private final int port; + + public DaggerMetricsConfig(Configuration flinkConfiguration) { + ConfigOption hostConfigOption = ConfigOptions + .key(FLINK_STATSD_HOST_CONFIG_KEY) + .stringType() + .defaultValue(DEFAULT_STATSD_HOST_VALUE); + ConfigOption portConfigOption = ConfigOptions + .key(FLINK_STATSD_PORT_CONFIG_KEY) + .intType() + .defaultValue(DEFAULT_STATSD_PORT_VALUE); + this.hostName = flinkConfiguration.getString(hostConfigOption); + this.port = flinkConfiguration.getInteger(portConfigOption); + } + + @Override + public String getMetricStatsDHost() { + return hostName; + } + + @Override + public Integer getMetricStatsDPort() { + return port; + } + + @Override + public String getMetricStatsDTags() { + return ""; + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerStatsDReporter.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerStatsDReporter.java new file mode 100644 index 000000000..733cacdf3 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerStatsDReporter.java @@ -0,0 +1,59 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import io.odpf.dagger.core.metrics.reporters.statsd.tags.GlobalTags; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.depot.metrics.StatsDReporterBuilder; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Arrays; + +import static io.odpf.dagger.core.utils.Constants.FLINK_JOB_ID_DEFAULT; +import static io.odpf.dagger.core.utils.Constants.FLINK_JOB_ID_KEY; + +public class DaggerStatsDReporter implements SerializedStatsDReporterSupplier { + private static StatsDReporter statsDReporter; + private final Configuration flinkConfiguration; + private final io.odpf.dagger.common.configuration.Configuration daggerConfiguration; + + private DaggerStatsDReporter(Configuration flinkConfiguration, io.odpf.dagger.common.configuration.Configuration daggerConfiguration) { + this.flinkConfiguration = flinkConfiguration; + this.daggerConfiguration = daggerConfiguration; + } + + private String[] generateGlobalTags() { + StatsDTag[] globalTags = new StatsDTag[]{ + new StatsDTag(GlobalTags.JOB_ID, daggerConfiguration.getString(FLINK_JOB_ID_KEY, FLINK_JOB_ID_DEFAULT))}; + return Arrays.stream(globalTags) + .map(StatsDTag::getFormattedTag) + .toArray(String[]::new); + } + + @Override + public StatsDReporter buildStatsDReporter() { + if (statsDReporter == null) { + DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(flinkConfiguration); + String[] globalTags = generateGlobalTags(); + statsDReporter = StatsDReporterBuilder + .builder() + .withMetricConfig(daggerMetricsConfig) + .withExtraTags(globalTags) + .build(); + } + return statsDReporter; + } + + protected static void close() throws IOException { + if (statsDReporter != null) { + statsDReporter.close(); + statsDReporter = null; + } + } + + public static class Provider { + public static DaggerStatsDReporter provide(Configuration flinkConfiguration, io.odpf.dagger.common.configuration.Configuration daggerConfiguration) { + return new DaggerStatsDReporter(flinkConfiguration, daggerConfiguration); + } + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/SerializedStatsDReporterSupplier.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/SerializedStatsDReporterSupplier.java new file mode 100644 index 000000000..7991bf293 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/SerializedStatsDReporterSupplier.java @@ -0,0 +1,15 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import io.odpf.depot.metrics.StatsDReporter; + +import java.io.Serializable; + +/* Flink requires that all objects which are needed to prepare the Job Graph should be serializable along with their +properties/fields. StatsDReporter and its fields are not serializable. Hence, in order to mitigate job graph creation +failure, we create a serializable interface around the reporter as below. This is a common idiom to make un-serializable +fields serializable in Java 8: https://stackoverflow.com/a/22808112 */ + +@FunctionalInterface +public interface SerializedStatsDReporterSupplier extends Serializable { + StatsDReporter buildStatsDReporter(); +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/StatsDErrorReporter.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/StatsDErrorReporter.java new file mode 100644 index 000000000..53856695d --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/StatsDErrorReporter.java @@ -0,0 +1,39 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import io.odpf.dagger.core.metrics.reporters.ErrorReporter; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; + +import static io.odpf.dagger.core.utils.Constants.FATAL_EXCEPTION_METRIC_GROUP_KEY; +import static io.odpf.dagger.core.utils.Constants.NONFATAL_EXCEPTION_METRIC_GROUP_KEY; + +public class StatsDErrorReporter implements ErrorReporter, Serializable { + private static final String FATAL_EXCEPTION_TAG_KEY = "fatal_exception_type"; + private static final String NON_FATAL_EXCEPTION_TAG_KEY = "non_fatal_exception_type"; + private final StatsDReporter statsDReporter; + + public StatsDErrorReporter(SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.statsDReporter = statsDReporterSupplier.buildStatsDReporter(); + } + + @Override + public void reportFatalException(Exception exception) { + StatsDTag statsDTag = new StatsDTag(FATAL_EXCEPTION_TAG_KEY, exception.getClass().getName()); + statsDReporter.captureCount(FATAL_EXCEPTION_METRIC_GROUP_KEY, 1L, statsDTag.getFormattedTag()); + } + + @Override + public void reportNonFatalException(Exception exception) { + StatsDTag statsDTag = new StatsDTag(NON_FATAL_EXCEPTION_TAG_KEY, exception.getClass().getName()); + statsDReporter.captureCount(NONFATAL_EXCEPTION_METRIC_GROUP_KEY, 1L, statsDTag.getFormattedTag()); + } + + @Override + public Counter addExceptionToCounter(Exception exception, MetricGroup metricGroup, String metricGroupKey) { + throw new UnsupportedOperationException("This operation is not supported on StatsDErrorReporter"); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerCounterManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerCounterManager.java new file mode 100644 index 000000000..1110beeae --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerCounterManager.java @@ -0,0 +1,47 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.common.metrics.aspects.Aspects; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.measurement.Counter; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; + +import java.util.ArrayList; + +public class DaggerCounterManager implements MeasurementManager, Counter { + private final StatsDReporter statsDReporter; + private String[] formattedTags; + + public DaggerCounterManager(SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.statsDReporter = statsDReporterSupplier.buildStatsDReporter(); + } + + @Override + public void register(StatsDTag[] tags) { + ArrayList tagList = new ArrayList<>(); + for (StatsDTag measurementTag : tags) { + tagList.add(measurementTag.getFormattedTag()); + } + this.formattedTags = tagList.toArray(new String[0]); + } + + @Override + public void increment(Aspects aspect) { + increment(aspect, 1L); + } + + @Override + public void increment(Aspects aspect, long positiveCount) { + statsDReporter.captureCount(aspect.getValue(), positiveCount, formattedTags); + } + + @Override + public void decrement(Aspects aspect) { + decrement(aspect, -1L); + } + + @Override + public void decrement(Aspects aspect, long negativeCount) { + statsDReporter.captureCount(aspect.getValue(), negativeCount, formattedTags); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerGaugeManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerGaugeManager.java new file mode 100644 index 000000000..10c7c6771 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerGaugeManager.java @@ -0,0 +1,32 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.common.metrics.aspects.Aspects; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.measurement.Gauge; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; + +import java.util.ArrayList; + +public class DaggerGaugeManager implements MeasurementManager, Gauge { + private final StatsDReporter statsDReporter; + private String[] formattedTags; + + public DaggerGaugeManager(SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.statsDReporter = statsDReporterSupplier.buildStatsDReporter(); + } + + @Override + public void register(StatsDTag[] tags) { + ArrayList tagList = new ArrayList<>(); + for (StatsDTag measurementTag : tags) { + tagList.add(measurementTag.getFormattedTag()); + } + this.formattedTags = tagList.toArray(new String[0]); + } + + @Override + public void markValue(Aspects aspect, int gaugeValue) { + statsDReporter.gauge(aspect.getValue(), gaugeValue, formattedTags); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerHistogramManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerHistogramManager.java new file mode 100644 index 000000000..93b8b5065 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerHistogramManager.java @@ -0,0 +1,32 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.common.metrics.aspects.Aspects; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.measurement.Histogram; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; + +import java.util.ArrayList; + +public class DaggerHistogramManager implements MeasurementManager, Histogram { + private final StatsDReporter statsDReporter; + private String[] formattedTags; + + public DaggerHistogramManager(SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.statsDReporter = statsDReporterSupplier.buildStatsDReporter(); + } + + @Override + public void register(StatsDTag[] tags) { + ArrayList tagList = new ArrayList<>(); + for (StatsDTag measurementTag : tags) { + tagList.add(measurementTag.getFormattedTag()); + } + this.formattedTags = tagList.toArray(new String[0]); + } + + @Override + public void recordValue(Aspects aspect, long value) { + statsDReporter.captureHistogram(aspect.getValue(), value, formattedTags); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/MeasurementManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/MeasurementManager.java new file mode 100644 index 000000000..5c195c17b --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/MeasurementManager.java @@ -0,0 +1,9 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; + +import java.io.Serializable; + +public interface MeasurementManager extends Serializable { + void register(StatsDTag[] tags); +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Counter.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Counter.java new file mode 100644 index 000000000..0b0668aac --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Counter.java @@ -0,0 +1,15 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.measurement; + +import io.odpf.dagger.common.metrics.aspects.Aspects; + +import java.io.Serializable; + +public interface Counter extends Serializable { + void increment(Aspects aspect); + + void increment(Aspects aspect, long num); + + void decrement(Aspects aspect); + + void decrement(Aspects aspect, long num); +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Gauge.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Gauge.java new file mode 100644 index 000000000..a41fb29bd --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Gauge.java @@ -0,0 +1,9 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.measurement; + +import io.odpf.dagger.common.metrics.aspects.Aspects; + +import java.io.Serializable; + +public interface Gauge extends Serializable { + void markValue(Aspects aspect, int gaugeValue); +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Histogram.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Histogram.java new file mode 100644 index 000000000..1c8f68cfe --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/measurement/Histogram.java @@ -0,0 +1,9 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.measurement; + +import io.odpf.dagger.common.metrics.aspects.Aspects; + +import java.io.Serializable; + +public interface Histogram extends Serializable { + void recordValue(Aspects aspect, long value); +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/ComponentTags.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/ComponentTags.java new file mode 100644 index 000000000..b73d64c1b --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/ComponentTags.java @@ -0,0 +1,25 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.tags; + +public class ComponentTags { + private static StatsDTag[] parquetReaderTags; + private static StatsDTag[] splitAssignerTags; + private static final String COMPONENT_TAG_KEY = "component"; + + public static StatsDTag[] getParquetReaderTags() { + if (parquetReaderTags == null) { + parquetReaderTags = new StatsDTag[]{ + new StatsDTag(COMPONENT_TAG_KEY, "parquet_reader"), + }; + } + return parquetReaderTags; + } + + public static StatsDTag[] getSplitAssignerTags() { + if (splitAssignerTags == null) { + splitAssignerTags = new StatsDTag[]{ + new StatsDTag(COMPONENT_TAG_KEY, "split_assigner"), + }; + } + return splitAssignerTags; + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/GlobalTags.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/GlobalTags.java new file mode 100644 index 000000000..2b3d9b143 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/GlobalTags.java @@ -0,0 +1,5 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.tags; + +public class GlobalTags { + public static final String JOB_ID = "job_id"; +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/StatsDTag.java b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/StatsDTag.java new file mode 100644 index 000000000..d1c40b22e --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/StatsDTag.java @@ -0,0 +1,27 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.tags; + +import org.apache.flink.util.Preconditions; + +public class StatsDTag { + private final String tagKey; + private final String tagValue; + private static final String NIL_TAG_VALUE = "NIL_TAG_VALUE"; + + public StatsDTag(String key, String value) { + Preconditions.checkArgument(key != null && !key.isEmpty(), "Tag key cannot be null or empty"); + this.tagKey = key; + this.tagValue = (value != null && !value.isEmpty()) ? value : NIL_TAG_VALUE; + } + + public StatsDTag(String tagName) { + this(tagName, NIL_TAG_VALUE); + } + + public String getFormattedTag() { + if (tagValue.equals(NIL_TAG_VALUE)) { + return tagKey; + } else { + return String.format("%s=%s", tagKey, tagValue); + } + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/DaggerSourceFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/DaggerSourceFactory.java index 572a8a758..b99735342 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/DaggerSourceFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/DaggerSourceFactory.java @@ -1,8 +1,10 @@ package io.odpf.dagger.core.source; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.core.exception.InvalidDaggerSourceException; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; import io.odpf.dagger.core.source.config.StreamConfig; import io.odpf.dagger.core.source.flinkkafkaconsumer.FlinkKafkaConsumerDaggerSource; import io.odpf.dagger.core.source.kafka.KafkaDaggerSource; @@ -16,22 +18,24 @@ public class DaggerSourceFactory { - public static DaggerSource create(StreamConfig streamConfig, Configuration configuration, DaggerDeserializer deserializer) { - List> daggerSources = getDaggerSources(streamConfig, configuration, deserializer); + public static DaggerSource create(StreamConfig streamConfig, Configuration configuration, DaggerDeserializer deserializer, SerializedStatsDReporterSupplier statsDReporterSupplier) { + List> daggerSources = getDaggerSources(streamConfig, configuration, deserializer, statsDReporterSupplier); return daggerSources.stream() .filter(DaggerSource::canBuild) .findFirst() .orElseThrow(() -> { + StatsDErrorReporter statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); String sourceDetails = Arrays.toString(streamConfig.getSourceDetails()); - String message = String.format("No suitable DaggerSource can be created as per SOURCE_DETAILS config %s", sourceDetails); - return new InvalidDaggerSourceException(message); + InvalidDaggerSourceException ex = new InvalidDaggerSourceException(String.format("No suitable DaggerSource can be created as per SOURCE_DETAILS config %s", sourceDetails)); + statsDErrorReporter.reportFatalException(ex); + return ex; }); } - private static List> getDaggerSources(StreamConfig streamConfig, Configuration configuration, DaggerDeserializer deserializer) { + private static List> getDaggerSources(StreamConfig streamConfig, Configuration configuration, DaggerDeserializer deserializer, SerializedStatsDReporterSupplier statsDReporterSupplier) { KafkaDaggerSource kafkaDaggerSource = new KafkaDaggerSource(streamConfig, configuration, deserializer); FlinkKafkaConsumerDaggerSource flinkKafkaConsumerDaggerSource = new FlinkKafkaConsumerDaggerSource(streamConfig, configuration, deserializer); - ParquetDaggerSource parquetDaggerSource = new ParquetDaggerSource(streamConfig, configuration, deserializer); + ParquetDaggerSource parquetDaggerSource = new ParquetDaggerSource(streamConfig, configuration, deserializer, statsDReporterSupplier); return Stream.of(kafkaDaggerSource, flinkKafkaConsumerDaggerSource, parquetDaggerSource) .collect(Collectors.toList()); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/Stream.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/Stream.java index 6a03aa0b5..3f5266b0b 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/Stream.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/Stream.java @@ -2,6 +2,7 @@ import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.core.deserializer.DaggerDeserializerFactory; import io.odpf.dagger.core.source.config.StreamConfig; @@ -32,16 +33,18 @@ public static class Builder { private final StreamConfig streamConfig; private final Configuration configuration; private final StencilClientOrchestrator stencilClientOrchestrator; + private final SerializedStatsDReporterSupplier statsDReporterSupplier; - public Builder(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator) { + public Builder(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, SerializedStatsDReporterSupplier statsDReporterSupplier) { this.streamConfig = streamConfig; this.configuration = configuration; this.stencilClientOrchestrator = stencilClientOrchestrator; + this.statsDReporterSupplier = statsDReporterSupplier; } public Stream build() { - DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator); - DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, daggerDeserializer); + DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplier); + DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, daggerDeserializer, statsDReporterSupplier); return new Stream(daggerSource, streamConfig.getSchemaTable()); } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/StreamsFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/StreamsFactory.java index 3ec25a7e3..034e92c09 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/StreamsFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/StreamsFactory.java @@ -2,18 +2,19 @@ import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.core.source.config.StreamConfig; import java.util.ArrayList; import java.util.List; public class StreamsFactory { - public static List getStreams(Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator) { + public static List getStreams(Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, SerializedStatsDReporterSupplier statsDReporterSupplier) { StreamConfig[] streamConfigs = StreamConfig.parse(configuration); ArrayList streams = new ArrayList<>(); for (StreamConfig streamConfig : streamConfigs) { - Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator); + Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplier); streams.add(builder.build()); } return streams; diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSource.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSource.java index 781803e79..078b59731 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSource.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSource.java @@ -1,9 +1,11 @@ package io.odpf.dagger.core.source.parquet; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.common.serde.parquet.deserialization.SimpleGroupDeserializer; import io.odpf.dagger.core.exception.DaggerConfigurationException; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; import io.odpf.dagger.core.source.DaggerSource; import io.odpf.dagger.core.source.config.StreamConfig; import io.odpf.dagger.core.source.config.models.SourceDetails; @@ -33,13 +35,17 @@ public class ParquetDaggerSource implements DaggerSource { private final DaggerDeserializer deserializer; private final StreamConfig streamConfig; private final Configuration configuration; + private final SerializedStatsDReporterSupplier statsDReporterSupplier; private static final SourceType SUPPORTED_SOURCE_TYPE = BOUNDED; private static final SourceName SUPPORTED_SOURCE_NAME = PARQUET_SOURCE; + private final StatsDErrorReporter statsDErrorReporter; - public ParquetDaggerSource(StreamConfig streamConfig, Configuration configuration, DaggerDeserializer deserializer) { + public ParquetDaggerSource(StreamConfig streamConfig, Configuration configuration, DaggerDeserializer deserializer, SerializedStatsDReporterSupplier statsDReporterSupplier) { this.streamConfig = streamConfig; this.configuration = configuration; this.deserializer = deserializer; + this.statsDReporterSupplier = statsDReporterSupplier; + this.statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); } @Override @@ -71,6 +77,7 @@ FileSource buildFileSource() { .setFileRecordFormat(parquetFileRecordFormat) .setSourceType(SUPPORTED_SOURCE_TYPE) .setFileSplitAssigner(splitAssignerProvider) + .setStatsDReporterSupplier(statsDReporterSupplier) .build(); return parquetFileSource.buildFileSource(); } @@ -89,22 +96,26 @@ private FileSplitAssigner.Provider buildParquetFileSplitAssignerProvider() { ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder chronologyOrderedSplitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder() .addTimeRanges(streamConfig.getParquetFileDateRange()) + .addStatsDReporterSupplier(statsDReporterSupplier) .addPathParser(new HourDatePathParser()); return chronologyOrderedSplitAssignerBuilder::build; case EARLIEST_INDEX_FIRST: default: - throw new DaggerConfigurationException("Error: file split assignment strategy not configured or not supported yet."); + DaggerConfigurationException daggerConfigurationException = new DaggerConfigurationException("Error: file split assignment strategy not configured or not supported yet."); + statsDErrorReporter.reportFatalException(daggerConfigurationException); + throw daggerConfigurationException; } } private ParquetFileRecordFormat buildParquetFileRecordFormat() { SimpleGroupDeserializer simpleGroupDeserializer = (SimpleGroupDeserializer) deserializer; - ReaderProvider parquetFileReaderProvider = new ParquetReader.ParquetReaderProvider(simpleGroupDeserializer); + ReaderProvider parquetFileReaderProvider = new ParquetReader.ParquetReaderProvider(simpleGroupDeserializer, statsDReporterSupplier); ParquetFileRecordFormat.Builder parquetFileRecordFormatBuilder = ParquetFileRecordFormat.Builder.getInstance(); Supplier> typeInformationProvider = (Supplier> & Serializable) simpleGroupDeserializer::getProducedType; return parquetFileRecordFormatBuilder .setParquetFileReaderProvider(parquetFileReaderProvider) .setTypeInformationProvider(typeInformationProvider) + .setStatsDReporterSupplier(statsDReporterSupplier) .build(); } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormat.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormat.java index 9828dfabb..e93b9eb81 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormat.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormat.java @@ -2,6 +2,8 @@ import static com.google.api.client.util.Preconditions.checkArgument; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; import io.odpf.dagger.core.source.parquet.reader.ReaderProvider; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -9,15 +11,23 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.types.Row; +import java.io.Serializable; import java.util.function.Supplier; public class ParquetFileRecordFormat implements FileRecordFormat { + /* FileRecordFormat object and all it's fields need to be serializable in order to construct the Flink job graph. Even though + StatsDErrorReporter is serializable, it contains StatsDErrorReporter, which in turn contains more fields which may not be + serializable.Hence, in order to mitigate job graph creation failures, we wrap the error reporter inside a serializable lambda. + This is a common idiom to make un-serializable fields serializable in Java 8: https://stackoverflow.com/a/22808112 */ + private final ReaderProvider parquetFileReaderProvider; private final Supplier> typeInformationProvider; + private final Supplier statsDErrorReporterSupplier; - private ParquetFileRecordFormat(ReaderProvider parquetFileReaderProvider, Supplier> typeInformationProvider) { + private ParquetFileRecordFormat(ReaderProvider parquetFileReaderProvider, Supplier> typeInformationProvider, SerializedStatsDReporterSupplier statsDReporterSupplier) { this.parquetFileReaderProvider = parquetFileReaderProvider; this.typeInformationProvider = typeInformationProvider; + this.statsDErrorReporterSupplier = (Supplier & Serializable) () -> new StatsDErrorReporter(statsDReporterSupplier); } @Override @@ -27,8 +37,10 @@ public Reader createReader(Configuration config, Path filePath, long splitO @Override public Reader restoreReader(Configuration config, Path filePath, long restoredOffset, long splitOffset, long splitLength) { - throw new UnsupportedOperationException("Error: ParquetReader do not have offsets and hence cannot be restored " + UnsupportedOperationException ex = new UnsupportedOperationException("Error: ParquetReader do not have offsets and hence cannot be restored " + "via this method."); + statsDErrorReporterSupplier.get().reportFatalException(ex); + throw ex; } @Override @@ -44,6 +56,7 @@ public TypeInformation getProducedType() { public static class Builder { private ReaderProvider parquetFileReaderProvider; private Supplier> typeInformationProvider; + private SerializedStatsDReporterSupplier statsDReporterSupplier; public static Builder getInstance() { return new Builder(); @@ -52,6 +65,7 @@ public static Builder getInstance() { private Builder() { this.parquetFileReaderProvider = null; this.typeInformationProvider = null; + this.statsDReporterSupplier = null; } public Builder setParquetFileReaderProvider(ReaderProvider parquetFileReaderProvider) { @@ -64,10 +78,23 @@ public Builder setTypeInformationProvider(Supplier> typeInf return this; } + public Builder setStatsDReporterSupplier(SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.statsDReporterSupplier = statsDReporterSupplier; + return this; + } + public ParquetFileRecordFormat build() { - checkArgument(parquetFileReaderProvider != null, "ReaderProvider is required but is set as null"); - checkArgument(typeInformationProvider != null, "TypeInformationProvider is required but is set as null"); - return new ParquetFileRecordFormat(parquetFileReaderProvider, typeInformationProvider); + try { + checkArgument(parquetFileReaderProvider != null, "ReaderProvider is required but is set as null"); + checkArgument(typeInformationProvider != null, "TypeInformationProvider is required but is set as null"); + checkArgument(statsDReporterSupplier != null, "SerializedStatsDReporterSupplier is required but is set as null"); + return new ParquetFileRecordFormat(parquetFileReaderProvider, typeInformationProvider, statsDReporterSupplier); + } catch (IllegalArgumentException ex) { + if (statsDReporterSupplier != null) { + new StatsDErrorReporter(statsDReporterSupplier).reportFatalException(ex); + } + throw ex; + } } } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileSource.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileSource.java index 56ceebe81..f97fb14c1 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileSource.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/ParquetFileSource.java @@ -1,6 +1,8 @@ package io.odpf.dagger.core.source.parquet; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; import io.odpf.dagger.core.source.config.models.SourceType; import lombok.Getter; import org.apache.flink.connector.file.src.FileSource; @@ -51,6 +53,7 @@ public static class Builder { private FileRecordFormat fileRecordFormat; private Configuration configuration; private FileSplitAssigner.Provider fileSplitAssigner; + private SerializedStatsDReporterSupplier statsDReporterSupplier; public static Builder getInstance() { return new Builder(); @@ -89,12 +92,25 @@ public Builder setConfiguration(Configuration configuration) { return this; } + public Builder setStatsDReporterSupplier(SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.statsDReporterSupplier = statsDReporterSupplier; + return this; + } + /* other validations if required before creating the file source can be put here */ /* for example, checking that all the file paths conform to just one partitioning strategy */ private void sanityCheck() { - checkArgument(fileRecordFormat != null, "FileRecordFormat is required but is set as null"); - checkArgument(filePaths.length != 0, "At least one file path is required but none are provided"); - checkArgument(sourceType == BOUNDED, "Running Parquet FileSource in UNBOUNDED mode is not supported yet"); + try { + checkArgument(statsDReporterSupplier != null, "SerializedStatsDReporterSupplier is required but is set as null"); + checkArgument(fileRecordFormat != null, "FileRecordFormat is required but is set as null"); + checkArgument(filePaths.length != 0, "At least one file path is required but none are provided"); + checkArgument(sourceType == BOUNDED, "Running Parquet FileSource in UNBOUNDED mode is not supported yet"); + } catch (IllegalArgumentException exception) { + if (statsDReporterSupplier != null) { + new StatsDErrorReporter(statsDReporterSupplier).reportFatalException(exception); + } + throw exception; + } } public ParquetFileSource build() { diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/reader/ParquetReader.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/reader/ParquetReader.java index 573c1b859..017aa92ba 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/reader/ParquetReader.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/reader/ParquetReader.java @@ -1,7 +1,14 @@ package io.odpf.dagger.core.source.parquet.reader; +import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; +import io.odpf.dagger.core.metrics.reporters.statsd.manager.DaggerCounterManager; +import io.odpf.dagger.core.metrics.reporters.statsd.manager.DaggerHistogramManager; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; import io.odpf.dagger.common.serde.parquet.deserialization.SimpleGroupDeserializer; import io.odpf.dagger.core.exception.ParquetFileSourceReaderInitializationException; +import io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects; import org.apache.flink.connector.file.src.reader.FileRecordFormat; import org.apache.flink.connector.file.src.util.CheckpointedPosition; import org.apache.flink.types.Row; @@ -22,6 +29,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Instant; + +import static io.odpf.dagger.core.metrics.reporters.statsd.tags.ComponentTags.getParquetReaderTags; public class ParquetReader implements FileRecordFormat.Reader { private final Path hadoopFilePath; @@ -33,15 +43,30 @@ public class ParquetReader implements FileRecordFormat.Reader { private RecordReader recordReader; private final MessageType schema; private long totalEmittedRowCount; + private DaggerCounterManager daggerCounterManager; + private DaggerHistogramManager daggerHistogramManager; + private final StatsDErrorReporter statsDErrorReporter; private static final Logger LOGGER = LoggerFactory.getLogger(ParquetReader.class.getName()); - private ParquetReader(Path hadoopFilePath, SimpleGroupDeserializer simpleGroupDeserializer, ParquetFileReader parquetFileReader) throws IOException { + private ParquetReader(Path hadoopFilePath, SimpleGroupDeserializer simpleGroupDeserializer, ParquetFileReader + parquetFileReader, SerializedStatsDReporterSupplier statsDReporterSupplier) throws IOException { this.hadoopFilePath = hadoopFilePath; this.simpleGroupDeserializer = simpleGroupDeserializer; this.parquetFileReader = parquetFileReader; this.schema = this.parquetFileReader.getFileMetaData().getSchema(); this.isRecordReaderInitialized = false; this.totalEmittedRowCount = 0L; + this.registerTagsWithMeasurementManagers(statsDReporterSupplier); + this.statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); + daggerCounterManager.increment(ParquetReaderAspects.READER_CREATED); + } + + private void registerTagsWithMeasurementManagers(SerializedStatsDReporterSupplier statsDReporterSupplier) { + StatsDTag[] parquetReaderTags = getParquetReaderTags(); + this.daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + this.daggerCounterManager.register(parquetReaderTags); + this.daggerHistogramManager = new DaggerHistogramManager(statsDReporterSupplier); + this.daggerHistogramManager.register(parquetReaderTags); } private boolean checkIfNullPage(PageReadStore page) { @@ -69,6 +94,8 @@ private void initializeRecordReader() throws IOException { } private Row readRecords() throws IOException { + long startReadTime = Instant.now().toEpochMilli(); + if (currentRecordIndex >= rowCount) { PageReadStore nextPage = parquetFileReader.readNextRowGroup(); if (checkIfNullPage(nextPage)) { @@ -76,7 +103,29 @@ private Row readRecords() throws IOException { } changeReaderPosition(nextPage); } - return readAndDeserialize(); + SimpleGroup simpleGroup = (SimpleGroup) recordReader.read(); + long endReadTime = Instant.now().toEpochMilli(); + + currentRecordIndex++; + long startDeserializationTime = Instant.now().toEpochMilli(); + + Row row = deserialize(simpleGroup); + + long endDeserializationTime = Instant.now().toEpochMilli(); + totalEmittedRowCount++; + + daggerHistogramManager.recordValue(ParquetReaderAspects.READER_ROW_READ_TIME, endReadTime - startReadTime); + daggerHistogramManager.recordValue(ParquetReaderAspects.READER_ROW_DESERIALIZATION_TIME, endDeserializationTime - startDeserializationTime); + return row; + } + + private Row deserialize(SimpleGroup simpleGroup) { + try { + return simpleGroupDeserializer.deserialize(simpleGroup); + } catch (DaggerDeserializationException exception) { + statsDErrorReporter.reportFatalException(exception); + throw exception; + } } @Nullable @@ -85,14 +134,8 @@ public Row read() throws IOException { if (!isRecordReaderInitialized) { initializeRecordReader(); } - return readRecords(); - } - - private Row readAndDeserialize() { - SimpleGroup simpleGroup = (SimpleGroup) recordReader.read(); - currentRecordIndex++; - Row row = simpleGroupDeserializer.deserialize(simpleGroup); - totalEmittedRowCount++; + Row row = readRecords(); + daggerCounterManager.increment(ParquetReaderAspects.READER_ROWS_EMITTED); return row; } @@ -102,6 +145,7 @@ public void close() throws IOException { closeRecordReader(); String logMessage = String.format("Closed the ParquetFileReader and de-referenced the RecordReader for file %s", hadoopFilePath.getName()); LOGGER.info(logMessage); + daggerCounterManager.increment(ParquetReaderAspects.READER_CLOSED); } private void closeRecordReader() { @@ -118,9 +162,11 @@ public CheckpointedPosition getCheckpointedPosition() { public static class ParquetReaderProvider implements ReaderProvider { private final SimpleGroupDeserializer simpleGroupDeserializer; + private final SerializedStatsDReporterSupplier statsDReporterSupplier; - public ParquetReaderProvider(SimpleGroupDeserializer simpleGroupDeserializer) { + public ParquetReaderProvider(SimpleGroupDeserializer simpleGroupDeserializer, SerializedStatsDReporterSupplier statsDReporterSupplier) { this.simpleGroupDeserializer = simpleGroupDeserializer; + this.statsDReporterSupplier = statsDReporterSupplier; } @Override @@ -129,9 +175,11 @@ public ParquetReader getReader(String filePath) { Configuration conf = new Configuration(); Path hadoopFilePath = new Path(filePath); ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(hadoopFilePath, conf)); - return new ParquetReader(hadoopFilePath, simpleGroupDeserializer, parquetFileReader); + return new ParquetReader(hadoopFilePath, simpleGroupDeserializer, parquetFileReader, statsDReporterSupplier); } catch (IOException | RuntimeException ex) { - throw new ParquetFileSourceReaderInitializationException(ex); + ParquetFileSourceReaderInitializationException exception = new ParquetFileSourceReaderInitializationException(ex); + new StatsDErrorReporter(statsDReporterSupplier).reportFatalException(exception); + throw exception; } } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssigner.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssigner.java index 9c0fef179..b219ccc88 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssigner.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssigner.java @@ -1,5 +1,9 @@ package io.odpf.dagger.core.source.parquet.splitassigner; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter; +import io.odpf.dagger.core.metrics.reporters.statsd.manager.DaggerGaugeManager; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; import io.odpf.dagger.core.exception.PathParserNotProvidedException; import io.odpf.dagger.core.source.config.models.TimeRangePool; import io.odpf.dagger.core.source.parquet.path.PathParser; @@ -16,19 +20,39 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.stream.Collectors; +import static com.google.api.client.util.Preconditions.checkArgument; +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.SPLITS_AWAITING_ASSIGNMENT; +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.TOTAL_SPLITS_DISCOVERED; +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.TOTAL_SPLITS_RECORDED; +import static io.odpf.dagger.core.metrics.reporters.statsd.tags.ComponentTags.getSplitAssignerTags; + public class ChronologyOrderedSplitAssigner implements FileSplitAssigner { private final PriorityBlockingQueue unassignedSplits; private static final int INITIAL_DEFAULT_CAPACITY = 11; private PathParser pathParser; private TimeRangePool timeRangePool; + private DaggerGaugeManager daggerGaugeManager; + private final StatsDErrorReporter statsDErrorReporter; - private ChronologyOrderedSplitAssigner(Collection fileSourceSplits, PathParser pathParser, TimeRangePool timeRangePool) { + private ChronologyOrderedSplitAssigner(Collection fileSourceSplits, PathParser pathParser, + TimeRangePool timeRangePool, SerializedStatsDReporterSupplier statsDReporterSupplier) { + this.registerTagsWithMeasurementManagers(statsDReporterSupplier); + daggerGaugeManager.markValue(TOTAL_SPLITS_DISCOVERED, fileSourceSplits.size()); this.pathParser = pathParser; this.timeRangePool = timeRangePool; + statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); this.unassignedSplits = new PriorityBlockingQueue<>(INITIAL_DEFAULT_CAPACITY, getFileSourceSplitComparator()); for (FileSourceSplit split : fileSourceSplits) { validateAndAddSplits(split); } + daggerGaugeManager.markValue(TOTAL_SPLITS_RECORDED, unassignedSplits.size()); + daggerGaugeManager.markValue(SPLITS_AWAITING_ASSIGNMENT, unassignedSplits.size()); + } + + private void registerTagsWithMeasurementManagers(SerializedStatsDReporterSupplier statsDReporterSupplier) { + StatsDTag[] splitAssignerTags = getSplitAssignerTags(); + daggerGaugeManager = new DaggerGaugeManager(statsDReporterSupplier); + daggerGaugeManager.register(splitAssignerTags); } @Override @@ -37,6 +61,7 @@ public Optional getNext(@Nullable String hostname) { if (instantEnrichedSplit == null) { return Optional.empty(); } + daggerGaugeManager.markValue(SPLITS_AWAITING_ASSIGNMENT, unassignedSplits.size()); return Optional.of(instantEnrichedSplit.getFileSourceSplit()); } @@ -62,7 +87,9 @@ private void validateAndAddSplits(FileSourceSplit split) { this.unassignedSplits.add(new InstantEnrichedSplit(split, instant)); } } catch (ParseException ex) { - throw new IllegalArgumentException(ex); + IllegalArgumentException exception = new IllegalArgumentException(ex); + statsDErrorReporter.reportFatalException(exception); + throw exception; } } @@ -83,6 +110,7 @@ private Comparator getFileSourceSplitComparator() { public static class ChronologyOrderedSplitAssignerBuilder implements Serializable { private PathParser pathParser; private TimeRangePool parquetFileDateRange; + private SerializedStatsDReporterSupplier statsDReporterSupplier; public ChronologyOrderedSplitAssignerBuilder addPathParser(PathParser parser) { this.pathParser = parser; @@ -94,11 +122,19 @@ public ChronologyOrderedSplitAssignerBuilder addTimeRanges(TimeRangePool timeRan return this; } + public ChronologyOrderedSplitAssignerBuilder addStatsDReporterSupplier(SerializedStatsDReporterSupplier supplier) { + this.statsDReporterSupplier = supplier; + return this; + } + public ChronologyOrderedSplitAssigner build(Collection fileSourceSplits) { + checkArgument(statsDReporterSupplier != null, "SerializedStatsDReporterSupplier is required but is set as null"); if (pathParser == null) { - throw new PathParserNotProvidedException("Path parser is null"); + PathParserNotProvidedException exception = new PathParserNotProvidedException("Path parser is null"); + new StatsDErrorReporter(statsDReporterSupplier).reportFatalException(exception); + throw exception; } - return new ChronologyOrderedSplitAssigner(fileSourceSplits, pathParser, parquetFileDateRange); + return new ChronologyOrderedSplitAssigner(fileSourceSplits, pathParser, parquetFileDateRange, statsDReporterSupplier); } } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactoryTest.java index ded8de2a1..0b7144f88 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/deserializer/DaggerDeserializerFactoryTest.java @@ -8,10 +8,12 @@ import io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer; import io.odpf.dagger.consumer.TestBookingLogMessage; import io.odpf.dagger.core.exception.DaggerConfigurationException; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.core.source.config.models.SourceDetails; import io.odpf.dagger.core.source.config.models.SourceName; import io.odpf.dagger.core.source.config.models.SourceType; import io.odpf.dagger.core.source.config.StreamConfig; +import io.odpf.depot.metrics.StatsDReporter; import io.odpf.stencil.client.StencilClient; import org.apache.flink.types.Row; import org.junit.Before; @@ -19,6 +21,7 @@ import org.mockito.Mock; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -36,6 +39,8 @@ public class DaggerDeserializerFactoryTest { @Mock private StencilClient stencilClient; + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> mock(StatsDReporter.class); + @Before public void setUp() throws Exception { initMocks(this); @@ -47,7 +52,7 @@ public void shouldReturnJsonDeserializerWhenConfigured() { when(streamConfig.getDataType()).thenReturn("JSON"); when(streamConfig.getJsonSchema()).thenReturn("{ \"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"$id\": \"https://example.com/product.schema.json\", \"title\": \"Product\", \"description\": \"A product from Acme's catalog\", \"type\": \"object\", \"properties\": { \"id\": { \"description\": \"The unique identifier for a product\", \"type\": \"string\" }, \"time\": { \"description\": \"event timestamp of the event\", \"type\": \"string\", \"format\" : \"date-time\" } }, \"required\": [ \"id\", \"time\" ] }"); - DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator); + DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); assertTrue(daggerDeserializer instanceof JsonDeserializer); } @@ -61,7 +66,7 @@ public void shouldReturnProtoDeserializerWhenConfigured() { when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); - DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator); + DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); assertTrue(daggerDeserializer instanceof ProtoDeserializer); } @@ -75,7 +80,7 @@ public void shouldReturnSimpleGroupDeserializerWhenConfigured() { when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); - DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator); + DaggerDeserializer daggerDeserializer = DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); assertTrue(daggerDeserializer instanceof SimpleGroupDeserializer); } @@ -85,6 +90,6 @@ public void shouldThrowRuntimeExceptionIfNoDeserializerCouldBeCreatedFromConfigs when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.BOUNDED)}); when(streamConfig.getDataType()).thenReturn("JSON"); - assertThrows(DaggerConfigurationException.class, () -> DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator)); + assertThrows(DaggerConfigurationException.class, () -> DaggerDeserializerFactory.create(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock)); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerMetricsConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerMetricsConfigTest.java new file mode 100644 index 000000000..bbc5c5414 --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerMetricsConfigTest.java @@ -0,0 +1,65 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + + +public class DaggerMetricsConfigTest { + @Mock + private Configuration flinkConfiguration; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldSetHostNameAsDefinedInFlinkConfiguration() { + ConfigOption hostConfigOption = ConfigOptions + .key("metrics.reporter.stsd.host") + .stringType() + .defaultValue("localhost"); + when(flinkConfiguration.getString(hostConfigOption)).thenReturn("my-host"); + + DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(flinkConfiguration); + assertEquals("my-host", daggerMetricsConfig.getMetricStatsDHost()); + } + + @Test + public void shouldUseDefaultHostNameWhenNotDefinedInFlinkConfiguration() { + DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(new Configuration()); + assertEquals("localhost", daggerMetricsConfig.getMetricStatsDHost()); + } + + @Test + public void shouldSetPortNumberAsDefinedInFlinkConfiguration() { + ConfigOption portConfigOption = ConfigOptions + .key("metrics.reporter.stsd.port") + .intType() + .defaultValue(8125); + when(flinkConfiguration.getInteger(portConfigOption)).thenReturn(9010); + + DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(flinkConfiguration); + assertEquals(9010, daggerMetricsConfig.getMetricStatsDPort().intValue()); + } + + @Test + public void shouldUseDefaultPortWhenNotDefinedInFlinkConfiguration() { + DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(new Configuration()); + assertEquals(8125, daggerMetricsConfig.getMetricStatsDPort().intValue()); + } + + @Test + public void shouldReturnEmptyStringAsTags() { + DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(new Configuration()); + assertEquals("", daggerMetricsConfig.getMetricStatsDTags()); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerStatsDReporterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerStatsDReporterTest.java new file mode 100644 index 000000000..ac8f2b3f0 --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/DaggerStatsDReporterTest.java @@ -0,0 +1,55 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import org.apache.flink.configuration.Configuration; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class DaggerStatsDReporterTest { + + @Mock + private Configuration flinkConfiguration; + + @Mock + private io.odpf.dagger.common.configuration.Configuration daggerConfiguration; + + @Before + public void setup() throws IOException { + initMocks(this); + DaggerStatsDReporter.close(); + when(daggerConfiguration.getString(anyString(), anyString())).thenReturn("some-tag"); + } + + @Test + public void shouldBuildStatsDReporterWithGlobalTags() { + DaggerStatsDReporter.Provider + .provide(flinkConfiguration, daggerConfiguration) + .buildStatsDReporter(); + + ArgumentCaptor jobIdCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor defaultJobIdCaptor = ArgumentCaptor.forClass(String.class); + + verify(daggerConfiguration, times(1)).getString(jobIdCaptor.capture(), defaultJobIdCaptor.capture()); + + assertEquals("FLINK_JOB_ID", jobIdCaptor.getValue()); + assertEquals("SQL Flink job", defaultJobIdCaptor.getValue()); + } + + @Test + public void shouldBeAbleToBuildAndMaintainSingletonCopyOfStatsDReporter() { + DaggerStatsDReporter daggerStatsDReporter = DaggerStatsDReporter.Provider + .provide(flinkConfiguration, daggerConfiguration); + + assertEquals(daggerStatsDReporter.buildStatsDReporter(), daggerStatsDReporter.buildStatsDReporter()); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/StatsDErrorReporterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/StatsDErrorReporterTest.java new file mode 100644 index 000000000..e8def56b7 --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/StatsDErrorReporterTest.java @@ -0,0 +1,56 @@ +package io.odpf.dagger.core.metrics.reporters.statsd; + +import io.odpf.depot.metrics.StatsDReporter; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +public class StatsDErrorReporterTest { + @Mock + private StatsDReporter statsDReporter; + + @Before + public void setup() { + initMocks(this); + } + + private final SerializedStatsDReporterSupplier statsDReporterSupplier = () -> statsDReporter; + + @Test + public void shouldReportFatalExceptionWithExceptionClassNameAsTagValue() { + StatsDErrorReporter statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); + + statsDErrorReporter.reportFatalException(new NullPointerException("This is a fatal exception")); + + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, + "fatal_exception_type=" + NullPointerException.class.getName()); + } + + @Test + public void shouldReportNonFatalExceptionWithExceptionClassNameAsTagValue() { + StatsDErrorReporter statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); + + statsDErrorReporter.reportNonFatalException(new Exception("This is a non-fatal exception")); + + verify(statsDReporter, times(1)) + .captureCount("non.fatal.exception", 1L, + "non_fatal_exception_type=" + Exception.class.getName()); + } + + @Test + public void shouldThrowUnsupportedExceptionForAddExceptionToCounter() { + StatsDErrorReporter statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier); + + UnsupportedOperationException ex = assertThrows(UnsupportedOperationException.class, + () -> statsDErrorReporter.addExceptionToCounter(null, null, null)); + + assertEquals("This operation is not supported on StatsDErrorReporter", ex.getMessage()); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerCounterManagerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerCounterManagerTest.java new file mode 100644 index 000000000..49e84bf81 --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerCounterManagerTest.java @@ -0,0 +1,81 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_CLOSED; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +public class DaggerCounterManagerTest { + @Mock + private StatsDReporter statsDReporter; + + @Before + public void setup() { + initMocks(this); + } + + private final SerializedStatsDReporterSupplier statsDReporterSupplier = () -> statsDReporter; + + @Test + public void shouldIncrementCounterMeasurement() { + DaggerCounterManager daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + + daggerCounterManager.increment(READER_CLOSED); + + verify(statsDReporter, times(1)).captureCount(READER_CLOSED.getValue(), 1L, (String[]) null); + } + + @Test + public void shouldIncrementCounterMeasurementWithDelta() { + DaggerCounterManager daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + + daggerCounterManager.increment(READER_CLOSED, 5L); + + verify(statsDReporter, times(1)).captureCount(READER_CLOSED.getValue(), 5L, (String[]) null); + } + + @Test + public void shouldIncrementCounterMeasurementWithRegisteredTags() { + DaggerCounterManager daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + daggerCounterManager.register(new StatsDTag[]{new StatsDTag("tag1", "value1"), new StatsDTag("tag2", "value2")}); + + daggerCounterManager.increment(READER_CLOSED); + + verify(statsDReporter, times(1)).captureCount(READER_CLOSED.getValue(), 1L, "tag1=value1", "tag2=value2"); + } + + @Test + public void shouldDecrementCounterMeasurement() { + DaggerCounterManager daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + + daggerCounterManager.decrement(READER_CLOSED); + + verify(statsDReporter, times(1)).captureCount(READER_CLOSED.getValue(), -1L, (String[]) null); + } + + @Test + public void shouldDecrementCounterMeasurementWithDelta() { + DaggerCounterManager daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + + daggerCounterManager.decrement(READER_CLOSED, -5L); + + verify(statsDReporter, times(1)).captureCount(READER_CLOSED.getValue(), -5L, (String[]) null); + } + + @Test + public void shouldDecrementCounterMeasurementWithRegisteredTags() { + DaggerCounterManager daggerCounterManager = new DaggerCounterManager(statsDReporterSupplier); + daggerCounterManager.register(new StatsDTag[]{new StatsDTag("tag1", "value1"), new StatsDTag("tag2", "value2")}); + + daggerCounterManager.decrement(READER_CLOSED); + + verify(statsDReporter, times(1)).captureCount(READER_CLOSED.getValue(), -1L, "tag1=value1", "tag2=value2"); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerGaugeManagerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerGaugeManagerTest.java new file mode 100644 index 000000000..9658cf87f --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerGaugeManagerTest.java @@ -0,0 +1,44 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.TOTAL_SPLITS_DISCOVERED; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +public class DaggerGaugeManagerTest { + @Mock + private StatsDReporter statsDReporter; + + @Before + public void setup() { + initMocks(this); + } + + private final SerializedStatsDReporterSupplier statsDReporterSupplier = () -> statsDReporter; + + @Test + public void shouldMarkGaugeValue() { + DaggerGaugeManager daggerGaugeManager = new DaggerGaugeManager(statsDReporterSupplier); + + daggerGaugeManager.markValue(TOTAL_SPLITS_DISCOVERED, 1000); + + verify(statsDReporter, times(1)).gauge(TOTAL_SPLITS_DISCOVERED.getValue(), 1000, (String[]) null); + } + + @Test + public void shouldMarkGaugeValueWithRegisteredTags() { + DaggerGaugeManager daggerGaugeManager = new DaggerGaugeManager(statsDReporterSupplier); + daggerGaugeManager.register(new StatsDTag[]{new StatsDTag("tag1", "value1"), new StatsDTag("tag2", "value2")}); + + daggerGaugeManager.markValue(TOTAL_SPLITS_DISCOVERED, 1000); + + verify(statsDReporter, times(1)).gauge(TOTAL_SPLITS_DISCOVERED.getValue(), 1000, "tag1=value1", "tag2=value2"); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerHistogramManagerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerHistogramManagerTest.java new file mode 100644 index 000000000..571c6be6e --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/manager/DaggerHistogramManagerTest.java @@ -0,0 +1,44 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.manager; + +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; +import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag; +import io.odpf.depot.metrics.StatsDReporter; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_ROW_READ_TIME; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +public class DaggerHistogramManagerTest { + @Mock + private StatsDReporter statsDReporter; + + @Before + public void setup() { + initMocks(this); + } + + private final SerializedStatsDReporterSupplier statsDReporterSupplier = () -> statsDReporter; + + @Test + public void shouldRecordHistogramValue() { + DaggerHistogramManager daggerHistogramManager = new DaggerHistogramManager(statsDReporterSupplier); + + daggerHistogramManager.recordValue(READER_ROW_READ_TIME, 5L); + + verify(statsDReporter, times(1)).captureHistogram(READER_ROW_READ_TIME.getValue(), 5L, (String[]) null); + } + + @Test + public void shouldRecordHistogramValueWithRegisteredTags() { + DaggerHistogramManager daggerHistogramManager = new DaggerHistogramManager(statsDReporterSupplier); + daggerHistogramManager.register(new StatsDTag[]{new StatsDTag("tag1", "value1"), new StatsDTag("tag2", "value2")}); + + daggerHistogramManager.recordValue(READER_ROW_READ_TIME, 6L); + + verify(statsDReporter, times(1)).captureHistogram(READER_ROW_READ_TIME.getValue(), 6L, "tag1=value1", "tag2=value2"); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/StatsDTagTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/StatsDTagTest.java new file mode 100644 index 000000000..1db97bd8d --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/metrics/reporters/statsd/tags/StatsDTagTest.java @@ -0,0 +1,51 @@ +package io.odpf.dagger.core.metrics.reporters.statsd.tags; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class StatsDTagTest { + + @Test + public void shouldReturnFormattedTagWithKeyValueSeparatedByDelimiter() { + StatsDTag statsDTag = new StatsDTag("tag-key", "tag-value"); + + assertEquals("tag-key=tag-value", statsDTag.getFormattedTag()); + } + + @Test + public void shouldReturnFormattedTagWithOnlyTagKey() { + StatsDTag statsDTag = new StatsDTag("my-tag"); + + assertEquals("my-tag", statsDTag.getFormattedTag()); + } + + @Test + public void shouldReturnFormattedTagWithOnlyTagKeyWhenTagValueIsNull() { + StatsDTag statsDTag = new StatsDTag("my-tag", null); + + assertEquals("my-tag", statsDTag.getFormattedTag()); + } + + @Test + public void shouldReturnFormattedTagWithOnlyTagKeyWhenTagValueIsEmpty() { + StatsDTag statsDTag = new StatsDTag("my-tag", ""); + + assertEquals("my-tag", statsDTag.getFormattedTag()); + } + + @Test + public void shouldThrowIllegalArgumentExceptionWhenTagKeyIsNull() { + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new StatsDTag(null, "tag-value")); + + assertEquals("Tag key cannot be null or empty", ex.getMessage()); + } + + @Test + public void shouldThrowIllegalArgumentExceptionWhenTagKeyIsEmpty() { + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new StatsDTag("", "tag-value")); + + assertEquals("Tag key cannot be null or empty", ex.getMessage()); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/DaggerSourceFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/DaggerSourceFactoryTest.java index b94b8d8c3..e95d1e3d7 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/DaggerSourceFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/DaggerSourceFactoryTest.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core.source; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.common.serde.json.deserialization.JsonDeserializer; import io.odpf.dagger.common.serde.parquet.deserialization.SimpleGroupDeserializer; import io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer; @@ -12,6 +13,7 @@ import io.odpf.dagger.core.source.flinkkafkaconsumer.FlinkKafkaConsumerDaggerSource; import io.odpf.dagger.core.source.kafka.KafkaDaggerSource; import io.odpf.dagger.core.source.parquet.ParquetDaggerSource; +import io.odpf.depot.metrics.StatsDReporter; import org.apache.flink.types.Row; import org.junit.Before; import org.junit.Test; @@ -19,6 +21,8 @@ import org.mockito.Mockito; import static org.junit.Assert.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -29,6 +33,11 @@ public class DaggerSourceFactoryTest { @Mock private Configuration configuration; + @Mock + private StatsDReporter statsDReporter; + + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> statsDReporter; + @Before public void setUp() throws Exception { initMocks(this); @@ -38,7 +47,7 @@ public void setUp() throws Exception { public void shouldReturnKafkaDaggerSourceWhenConfigured() { ProtoDeserializer deserializer = Mockito.mock(ProtoDeserializer.class); when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.KAFKA_SOURCE, SourceType.UNBOUNDED)}); - DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, deserializer); + DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, deserializer, statsDReporterSupplierMock); assertTrue(daggerSource instanceof KafkaDaggerSource); } @@ -47,7 +56,7 @@ public void shouldReturnKafkaDaggerSourceWhenConfigured() { public void shouldReturnFlinkKafkaConsumerDaggerSourceWhenConfigured() { JsonDeserializer deserializer = Mockito.mock(JsonDeserializer.class); when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.KAFKA_CONSUMER, SourceType.UNBOUNDED)}); - DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, deserializer); + DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, deserializer, statsDReporterSupplierMock); assertTrue(daggerSource instanceof FlinkKafkaConsumerDaggerSource); } @@ -56,16 +65,18 @@ public void shouldReturnFlinkKafkaConsumerDaggerSourceWhenConfigured() { public void shouldReturnParquetDaggerSourceWhenConfigured() { SimpleGroupDeserializer deserializer = Mockito.mock(SimpleGroupDeserializer.class); when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.BOUNDED)}); - DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, deserializer); + DaggerSource daggerSource = DaggerSourceFactory.create(streamConfig, configuration, deserializer, statsDReporterSupplierMock); assertTrue(daggerSource instanceof ParquetDaggerSource); } @Test - public void shouldThrowRuntimeExceptionIfNoDaggerSourceCouldBeCreatedAsPerConfigs() { + public void shouldThrowRuntimeExceptionAndReportErrorIfNoDaggerSourceCouldBeCreatedAsPerConfigs() { SimpleGroupDeserializer deserializer = Mockito.mock(SimpleGroupDeserializer.class); when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.UNBOUNDED)}); - assertThrows(InvalidDaggerSourceException.class, () -> DaggerSourceFactory.create(streamConfig, configuration, deserializer)); + assertThrows(InvalidDaggerSourceException.class, () -> DaggerSourceFactory.create(streamConfig, configuration, deserializer, statsDReporterSupplierMock)); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + InvalidDaggerSourceException.class.getName()); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamTest.java index 828129d99..a7366cd11 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamTest.java @@ -2,6 +2,7 @@ import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.consumer.TestBookingLogMessage; import io.odpf.dagger.core.source.config.StreamConfig; import io.odpf.dagger.core.source.config.models.SourceDetails; @@ -10,6 +11,7 @@ import io.odpf.dagger.core.source.flinkkafkaconsumer.FlinkKafkaConsumerDaggerSource; import io.odpf.dagger.core.source.kafka.KafkaDaggerSource; import io.odpf.dagger.core.source.parquet.ParquetDaggerSource; +import io.odpf.depot.metrics.StatsDReporter; import io.odpf.stencil.client.StencilClient; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -25,6 +27,7 @@ import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -53,6 +56,8 @@ public class StreamTest { @Mock private DaggerSource mockDaggerSource; + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> mock(StatsDReporter.class); + @Before public void setup() { initMocks(this); @@ -68,7 +73,7 @@ public void shouldBeAbleToBuildAStreamWithKafkaDaggerSourceAndProtoSchema() { when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); - Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator); + Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); Stream stream = builder.build(); assertTrue(stream.getDaggerSource() instanceof KafkaDaggerSource); @@ -93,7 +98,7 @@ public void shouldBeAbleToBuildAStreamWithFlinkKafkaConsumerDaggerSourceAndProto when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); - Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator); + Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); Stream stream = builder.build(); assertTrue(stream.getDaggerSource() instanceof FlinkKafkaConsumerDaggerSource); @@ -106,7 +111,7 @@ public void shouldBeAbleToBuildAStreamWithKafkaDaggerSourceAndJsonSchema() { when(streamConfig.getSchemaTable()).thenReturn("data_stream"); when(streamConfig.getJsonSchema()).thenReturn("{ \"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"$id\": \"https://example.com/product.schema.json\", \"title\": \"Product\", \"description\": \"A product from Acme's catalog\", \"type\": \"object\", \"properties\": { \"id\": { \"description\": \"The unique identifier for a product\", \"type\": \"string\" }, \"time\": { \"description\": \"event timestamp of the event\", \"type\": \"string\", \"format\" : \"date-time\" } }, \"required\": [ \"id\", \"time\" ] }"); - Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator); + Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); Stream stream = builder.build(); assertTrue(stream.getDaggerSource() instanceof KafkaDaggerSource); @@ -119,7 +124,7 @@ public void shouldBeAbleToBuildAStreamWithFlinkKafkaConsumerDaggerSourceAndJsonS when(streamConfig.getSchemaTable()).thenReturn("data_stream"); when(streamConfig.getJsonSchema()).thenReturn("{ \"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"$id\": \"https://example.com/product.schema.json\", \"title\": \"Product\", \"description\": \"A product from Acme's catalog\", \"type\": \"object\", \"properties\": { \"id\": { \"description\": \"The unique identifier for a product\", \"type\": \"string\" }, \"time\": { \"description\": \"event timestamp of the event\", \"type\": \"string\", \"format\" : \"date-time\" } }, \"required\": [ \"id\", \"time\" ] }"); - Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator); + Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); Stream stream = builder.build(); assertTrue(stream.getDaggerSource() instanceof FlinkKafkaConsumerDaggerSource); @@ -134,7 +139,7 @@ public void shouldBeAbleToBuildAStreamWithParquetDaggerSourceAndProtoSchema() { when(streamConfig.getSchemaTable()).thenReturn("data_stream"); when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); - Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator); + Stream.Builder builder = new Stream.Builder(streamConfig, configuration, stencilClientOrchestrator, statsDReporterSupplierMock); Stream stream = builder.build(); assertTrue(stream.getDaggerSource() instanceof ParquetDaggerSource); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamsFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamsFactoryTest.java index c5f2de52f..e54d152ab 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamsFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/StreamsFactoryTest.java @@ -1,7 +1,9 @@ package io.odpf.dagger.core.source; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.core.source.flinkkafkaconsumer.FlinkKafkaConsumerDaggerSource; import io.odpf.dagger.core.source.kafka.KafkaDaggerSource; +import io.odpf.depot.metrics.StatsDReporter; import io.odpf.stencil.client.StencilClient; import com.google.gson.JsonSyntaxException; import io.odpf.dagger.common.configuration.Configuration; @@ -18,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -31,6 +34,8 @@ public class StreamsFactoryTest { @Mock private Configuration configuration; + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> mock(StatsDReporter.class); + @Before public void setup() { initMocks(this); @@ -65,7 +70,7 @@ public void shouldReturnListOfStreamsCreatedFromConfiguration() { when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); - List streams = StreamsFactory.getStreams(configuration, stencilClientOrchestrator); + List streams = StreamsFactory.getStreams(configuration, stencilClientOrchestrator, statsDReporterSupplierMock); assertEquals(2, streams.size()); assertTrue(streams.get(0).getDaggerSource() instanceof FlinkKafkaConsumerDaggerSource); @@ -88,7 +93,7 @@ public void shouldThrowErrorForInvalidStreamConfig() { when(stencilClient.get("com.tests.TestMessage")).thenReturn(TestBookingLogMessage.getDescriptor()); assertThrows(JsonSyntaxException.class, - () -> StreamsFactory.getStreams(configuration, stencilClientOrchestrator)); + () -> StreamsFactory.getStreams(configuration, stencilClientOrchestrator, statsDReporterSupplierMock)); } @Test @@ -96,6 +101,6 @@ public void shouldThrowNullPointerIfStreamConfigIsNotGiven() { when(configuration.getString(INPUT_STREAMS, "")).thenReturn(""); assertThrows(NullPointerException.class, - () -> StreamsFactory.getStreams(configuration, stencilClientOrchestrator)); + () -> StreamsFactory.getStreams(configuration, stencilClientOrchestrator, statsDReporterSupplierMock)); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSourceTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSourceTest.java index 744134124..22a0f5f33 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSourceTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetDaggerSourceTest.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core.source.parquet; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.common.serde.parquet.deserialization.SimpleGroupDeserializer; import io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer; @@ -9,6 +10,7 @@ import io.odpf.dagger.core.source.config.models.SourceDetails; import io.odpf.dagger.core.source.config.models.SourceName; import io.odpf.dagger.core.source.config.models.SourceType; +import io.odpf.depot.metrics.StatsDReporter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -46,6 +48,11 @@ public class ParquetDaggerSourceTest { @Mock private StreamExecutionEnvironment streamExecutionEnvironment; + @Mock + private StatsDReporter statsDReporter; + + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> statsDReporter; + private FileSource fileSource; @Before @@ -59,7 +66,7 @@ public void setup() { @Test public void shouldBeAbleToBuildSourceIfSourceDetailsIsBoundedParquetAndDaggerDeserializerIsSimpleGroupDeserializer() { when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.BOUNDED)}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); assertTrue(daggerSource.canBuild()); } @@ -68,7 +75,7 @@ public void shouldBeAbleToBuildSourceIfSourceDetailsIsBoundedParquetAndDaggerDes public void shouldNotBeAbleToBuildSourceIfSourceDetailsContainsMultipleBackToBackSources() { when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.BOUNDED), new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.BOUNDED)}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); assertFalse(daggerSource.canBuild()); } @@ -76,7 +83,7 @@ public void shouldNotBeAbleToBuildSourceIfSourceDetailsContainsMultipleBackToBac @Test public void shouldNotBeAbleToBuildSourceIfSourceNameIsUnsupported() { when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.KAFKA_CONSUMER, SourceType.BOUNDED)}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); assertFalse(daggerSource.canBuild()); } @@ -84,7 +91,7 @@ public void shouldNotBeAbleToBuildSourceIfSourceNameIsUnsupported() { @Test public void shouldNotBeAbleToBuildSourceIfSourceTypeIsUnsupported() { when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.UNBOUNDED)}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); assertFalse(daggerSource.canBuild()); } @@ -93,14 +100,14 @@ public void shouldNotBeAbleToBuildSourceIfSourceTypeIsUnsupported() { public void shouldNotBeAbleToBuildSourceIfDeserializerTypeIsUnsupported() { DaggerDeserializer unsupportedDeserializer = Mockito.mock(ProtoDeserializer.class); when(streamConfig.getSourceDetails()).thenReturn(new SourceDetails[]{new SourceDetails(SourceName.PARQUET_SOURCE, SourceType.BOUNDED)}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, unsupportedDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, unsupportedDeserializer, statsDReporterSupplierMock); assertFalse(daggerSource.canBuild()); } @Test public void shouldBeAbleToRegisterSourceWithExecutionEnvironmentForCorrectConfiguration() { - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); ParquetDaggerSource daggerSourceSpy = Mockito.spy(daggerSource); doReturn(fileSource).when(daggerSourceSpy).buildFileSource(); when(streamConfig.getSchemaTable()).thenReturn("data_stream_0"); @@ -117,18 +124,20 @@ public void shouldUseStreamConfigurationToBuildTheFileSource() { when(streamConfig.getParquetFilesReadOrderStrategy()).thenReturn(EARLIEST_TIME_URL_FIRST); when(streamConfig.getParquetFilePaths()).thenReturn(new String[]{"gs://sshsh", "gs://shadd"}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); daggerSource.register(streamExecutionEnvironment, strategy); } @Test - public void shouldThrowRuntimeExceptionIfReadOrderStrategyIsNotSupported() { + public void shouldThrowRuntimeExceptionAndReportErrorIfReadOrderStrategyIsNotSupported() { when(streamConfig.getParquetFilesReadOrderStrategy()).thenReturn(EARLIEST_INDEX_FIRST); when(streamConfig.getParquetFilePaths()).thenReturn(new String[]{"gs://sshsh", "gs://shadd"}); - ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer); + ParquetDaggerSource daggerSource = new ParquetDaggerSource(streamConfig, configuration, daggerDeserializer, statsDReporterSupplierMock); assertThrows(DaggerConfigurationException.class, () -> daggerSource.register(streamExecutionEnvironment, strategy)); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + DaggerConfigurationException.class.getName()); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormatTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormatTest.java index f87c87a28..59a5ad7f1 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormatTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileRecordFormatTest.java @@ -1,8 +1,10 @@ package io.odpf.dagger.core.source.parquet; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.core.source.parquet.reader.ParquetReader; import io.odpf.dagger.core.source.parquet.reader.ReaderProvider; +import io.odpf.depot.metrics.StatsDReporter; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.reader.FileRecordFormat; @@ -15,6 +17,8 @@ import java.util.function.Supplier; import static org.junit.Assert.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; public class ParquetFileRecordFormatTest { @@ -28,8 +32,12 @@ public class ParquetFileRecordFormatTest { @Mock private Configuration configuration; + @Mock + private StatsDReporter statsDReporter; + private final ReaderProvider readerProviderMock = (filePath) -> parquetReader; private final Supplier> typeInformationProviderMock = () -> typeInformation; + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> statsDReporter; @Before public void setup() { @@ -41,6 +49,7 @@ public void shouldBuildAFileRecordFormatAsPerConfiguredParameters() { ParquetFileRecordFormat.Builder builder = ParquetFileRecordFormat.Builder.getInstance(); ParquetFileRecordFormat parquetFileRecordFormat = builder.setParquetFileReaderProvider(readerProviderMock) .setTypeInformationProvider(typeInformationProviderMock) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .build(); FileRecordFormat.Reader expectedReader = parquetFileRecordFormat.createReader(configuration, new Path("gs://file-path"), 0, 1024); @@ -51,26 +60,44 @@ public void shouldBuildAFileRecordFormatAsPerConfiguredParameters() { } @Test - public void shouldThrowIllegalArgumentExceptionWhenReaderProviderIsNotConfigured() { + public void shouldThrowIllegalArgumentExceptionAndReportErrorWhenReaderProviderIsNotConfigured() { ParquetFileRecordFormat.Builder builder = ParquetFileRecordFormat.Builder.getInstance(); IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder .setTypeInformationProvider(typeInformationProviderMock) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .build()); assertEquals("ReaderProvider is required but is set as null", ex.getMessage()); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + IllegalArgumentException.class.getName()); } @Test - public void shouldThrowIllegalArgumentExceptionWhenTypeInformationProviderIsNotConfigured() { + public void shouldThrowIllegalArgumentExceptionAndReportErrorWhenTypeInformationProviderIsNotConfigured() { ParquetFileRecordFormat.Builder builder = ParquetFileRecordFormat.Builder.getInstance(); IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder .setParquetFileReaderProvider(readerProviderMock) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .build()); assertEquals("TypeInformationProvider is required but is set as null", ex.getMessage()); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + IllegalArgumentException.class.getName()); + } + + @Test + public void shouldThrowIllegalArgumentExceptionWhenStatsDReporterSupplierIsNotConfigured() { + ParquetFileRecordFormat.Builder builder = ParquetFileRecordFormat.Builder.getInstance(); + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> builder + .setParquetFileReaderProvider(readerProviderMock) + .setTypeInformationProvider(typeInformationProviderMock) + .build()); + + assertEquals("SerializedStatsDReporterSupplier is required but is set as null", ex.getMessage()); } @Test @@ -78,21 +105,25 @@ public void shouldReturnFalseWhenIsSplittableIsCalled() { ParquetFileRecordFormat.Builder builder = ParquetFileRecordFormat.Builder.getInstance(); ParquetFileRecordFormat parquetFileRecordFormat = builder.setParquetFileReaderProvider(readerProviderMock) .setTypeInformationProvider(typeInformationProviderMock) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .build(); assertFalse(parquetFileRecordFormat.isSplittable()); } @Test - public void shouldThrowUnsupportedOperationExceptionWhenRestoreReaderIsCalled() { + public void shouldThrowUnsupportedOperationExceptionAndReportErrorWhenRestoreReaderIsCalled() { ParquetFileRecordFormat.Builder builder = ParquetFileRecordFormat.Builder.getInstance(); ParquetFileRecordFormat parquetFileRecordFormat = builder.setTypeInformationProvider(typeInformationProviderMock) .setParquetFileReaderProvider(readerProviderMock) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .build(); UnsupportedOperationException ex = assertThrows(UnsupportedOperationException.class, () -> parquetFileRecordFormat.restoreReader(configuration, new Path("gs://some-path"), 12, 0, 1024)); assertEquals("Error: ParquetReader do not have offsets and hence cannot be restored via this method.", ex.getMessage()); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + UnsupportedOperationException.class.getName()); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileSourceTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileSourceTest.java index c38482478..2cda4ba4c 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileSourceTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/ParquetFileSourceTest.java @@ -1,10 +1,12 @@ package io.odpf.dagger.core.source.parquet; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.core.source.config.models.SourceType; import io.odpf.dagger.core.source.parquet.ParquetFileSource.Builder; import io.odpf.dagger.core.source.parquet.path.HourDatePathParser; import io.odpf.dagger.core.source.parquet.splitassigner.ChronologyOrderedSplitAssigner; +import io.odpf.depot.metrics.StatsDReporter; import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner; import org.apache.flink.connector.file.src.reader.FileRecordFormat; import org.apache.flink.core.fs.Path; @@ -15,6 +17,8 @@ import static java.util.Collections.emptyList; import static org.junit.Assert.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; public class ParquetFileSourceTest { @@ -24,6 +28,11 @@ public class ParquetFileSourceTest { @Mock private FileRecordFormat fileRecordFormat; + @Mock + private StatsDReporter statsDReporter; + + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> statsDReporter; + @Before public void setup() { initMocks(this); @@ -34,11 +43,14 @@ public void shouldBuildParquetFileSourceAsPerArguments() { Builder builder = Builder.getInstance(); Path[] filePaths = new Path[]{new Path("gs://aadadc"), new Path("gs://sjsjhd")}; ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - splitAssignerBuilder.addPathParser(new HourDatePathParser()); + splitAssignerBuilder + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .addPathParser(new HourDatePathParser()); ParquetFileSource parquetFileSource = builder.setConfiguration(configuration) .setFileRecordFormat(fileRecordFormat) .setSourceType(SourceType.BOUNDED) .setFileSplitAssigner(splitAssignerBuilder::build) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .setFilePaths(filePaths) .build(); @@ -50,7 +62,7 @@ public void shouldBuildParquetFileSourceAsPerArguments() { } @Test - public void shouldThrowExceptionIfSourceTypeConfiguredAsUnbounded() { + public void shouldThrowExceptionAndReportErrorIfSourceTypeConfiguredAsUnbounded() { Builder builder = Builder.getInstance(); Path[] filePaths = new Path[]{new Path("gs://aadadc"), new Path("gs://sjsjhd")}; @@ -58,40 +70,65 @@ public void shouldThrowExceptionIfSourceTypeConfiguredAsUnbounded() { () -> builder.setConfiguration(configuration) .setFileRecordFormat(fileRecordFormat) .setSourceType(SourceType.UNBOUNDED) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .setFileSplitAssigner(new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder()::build) .setFilePaths(filePaths) .build()); assertEquals("Running Parquet FileSource in UNBOUNDED mode is not supported yet", ex.getMessage()); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + IllegalArgumentException.class.getName()); } @Test - public void shouldThrowExceptionIfFileRecordFormatIsNotSet() { + public void shouldThrowExceptionAndReportErrorIfFileRecordFormatIsNotSet() { Builder builder = Builder.getInstance(); Path[] filePaths = new Path[]{new Path("gs://aadadc"), new Path("gs://sjsjhd")}; IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder.setConfiguration(configuration) .setSourceType(SourceType.UNBOUNDED) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .setFileSplitAssigner(new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder()::build) .setFilePaths(filePaths) .build()); assertEquals("FileRecordFormat is required but is set as null", ex.getMessage()); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + IllegalArgumentException.class.getName()); } @Test - public void shouldThrowExceptionIfNoFilePathsSet() { + public void shouldThrowExceptionAndReportErrorIfNoFilePathsSet() { Builder builder = Builder.getInstance(); IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder.setConfiguration(configuration) .setFileRecordFormat(fileRecordFormat) .setSourceType(SourceType.BOUNDED) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .setFileSplitAssigner(new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder()::build) .build()); assertEquals("At least one file path is required but none are provided", ex.getMessage()); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + IllegalArgumentException.class.getName()); + } + + @Test + public void shouldThrowExceptionIfStatsDReporterSupplierIsNotSet() { + Builder builder = Builder.getInstance(); + Path[] filePaths = new Path[]{new Path("gs://aadadc"), new Path("gs://sjsjhd")}; + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> builder.setConfiguration(configuration) + .setFileRecordFormat(fileRecordFormat) + .setSourceType(SourceType.BOUNDED) + .setFileSplitAssigner(new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder()::build) + .setFilePaths(filePaths) + .build()); + + assertEquals("SerializedStatsDReporterSupplier is required but is set as null", ex.getMessage()); } @Test @@ -100,6 +137,7 @@ public void shouldUseDefaultValueForSomeFieldsWhichAreNotConfiguredExplicitly() Path[] filePaths = new Path[]{new Path("gs://aadadc"), new Path("gs://sjsjhd")}; ParquetFileSource parquetFileSource = builder.setConfiguration(configuration) .setFileRecordFormat(fileRecordFormat) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .setFilePaths(filePaths) .build(); @@ -118,6 +156,7 @@ public void shouldReturnAFileSourceMadeFromParquetFileSource() { .setFileRecordFormat(fileRecordFormat) .setSourceType(SourceType.BOUNDED) .setFileSplitAssigner(new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder()::build) + .setStatsDReporterSupplier(statsDReporterSupplierMock) .setFilePaths(filePaths) .build(); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/reader/ParquetReaderTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/reader/ParquetReaderTest.java index 51a9a65c3..66413cab2 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/reader/ParquetReaderTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/reader/ParquetReaderTest.java @@ -1,8 +1,11 @@ package io.odpf.dagger.core.source.parquet.reader; +import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.common.serde.parquet.deserialization.SimpleGroupDeserializer; import io.odpf.dagger.core.exception.ParquetFileSourceReaderInitializationException; +import io.odpf.depot.metrics.StatsDReporter; import org.apache.commons.lang3.ArrayUtils; import org.apache.flink.connector.file.src.util.CheckpointedPosition; import org.apache.flink.types.Row; @@ -15,11 +18,17 @@ import org.junit.rules.TemporaryFolder; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.List; +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_CLOSED; +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_CREATED; +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_ROWS_EMITTED; +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_ROW_DESERIALIZATION_TIME; +import static io.odpf.dagger.core.metrics.aspects.ParquetReaderAspects.READER_ROW_READ_TIME; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.parquet.schema.Types.*; @@ -37,15 +46,79 @@ public class ParquetReaderTest { @Rule public TemporaryFolder tempFolder = TemporaryFolder.builder().assureDeletion().build(); + @Mock + private StatsDReporter statsDReporter; + + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> statsDReporter; + @Before public void setup() { initMocks(this); } + @Test + public void shouldRaiseMetricsWhenInitialized() { + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); + ClassLoader classLoader = getClass().getClassLoader(); + String filePath = classLoader.getResource("test_file.parquet").getPath(); + + provider.getReader(filePath); + + verify(statsDReporter, Mockito.times(1)).captureCount(READER_CREATED.getValue(), 1L, "component=parquet_reader"); + } + + @Test + public void shouldRaiseMetricsWhenReadingFileAndDeserializingToRows() throws IOException { + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); + ClassLoader classLoader = getClass().getClassLoader(); + String filePath = classLoader.getResource("test_file.parquet").getPath(); + + ParquetReader reader = provider.getReader(filePath); + reader.read(); + + verify(statsDReporter, Mockito.times(1)).captureCount(READER_ROWS_EMITTED.getValue(), 1L, "component=parquet_reader"); + + ArgumentCaptor measurementNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor executionTimeCaptor = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor tagCaptor = ArgumentCaptor.forClass(String.class); + + verify(statsDReporter, Mockito.times(2)).captureHistogram(measurementNameCaptor.capture(), executionTimeCaptor.capture(), tagCaptor.capture()); + + assertEquals(READER_ROW_READ_TIME.getValue(), measurementNameCaptor.getAllValues().get(0)); + assertEquals(READER_ROW_DESERIALIZATION_TIME.getValue(), measurementNameCaptor.getAllValues().get(1)); + assertEquals("component=parquet_reader", tagCaptor.getAllValues().get(0)); + assertEquals("component=parquet_reader", tagCaptor.getAllValues().get(1)); + } + + @Test + public void shouldRaiseMetricsWhenClosingTheReader() throws IOException { + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); + ClassLoader classLoader = getClass().getClassLoader(); + String filePath = classLoader.getResource("test_file.parquet").getPath(); + + provider.getReader(filePath).close(); + + verify(statsDReporter, Mockito.times(1)).captureCount(READER_CLOSED.getValue(), 1L, "component=parquet_reader"); + } + + @Test + public void shouldReportErrorAndRethrowWhenDaggerDeserializerThrowsException() { + when(deserializer.deserialize(any(SimpleGroup.class))).thenThrow(DaggerDeserializationException.class); + + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); + ClassLoader classLoader = getClass().getClassLoader(); + String filePath = classLoader.getResource("test_file.parquet").getPath(); + + assertThrows(DaggerDeserializationException.class, () -> provider.getReader(filePath).read()); + + verify(statsDReporter, Mockito.times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + DaggerDeserializationException.class.getName()); + } + @Test public void shouldCreateReadersConfiguredWithTheSameDeserializerButForDifferentFilePaths() throws IOException { when(deserializer.deserialize(any(SimpleGroup.class))).thenReturn(Row.of("same", "deserializer")); - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); String filePath1 = classLoader.getResource("test_file.parquet").getPath(); @@ -59,7 +132,7 @@ public void shouldCreateReadersConfiguredWithTheSameDeserializerButForDifferentF @Test public void shouldReadFileAndCallDeserializerWithSimpleGroupWhenReadIsCalled() throws IOException { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("test_file.parquet").getPath()); @@ -84,7 +157,7 @@ public void shouldReadFileAndCallDeserializerWithSimpleGroupWhenReadIsCalled() t @Test public void shouldBeAbleToReadParquetFileContainingMultipleRowGroups() throws IOException { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("multiple_row_groups_test_file.parquet").getPath()); @@ -112,7 +185,7 @@ public void shouldBeAbleToReadParquetFileContainingMultipleRowGroups() throws IO @Test public void shouldReturnDeserializedValueWhenRecordsPresentAndNullWhenNoMoreDataLeftToRead() throws IOException { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("test_file.parquet").getPath()); when(deserializer.deserialize(any(SimpleGroup.class))).thenReturn(Row.of("some value")); @@ -125,7 +198,7 @@ public void shouldReturnDeserializedValueWhenRecordsPresentAndNullWhenNoMoreData @Test public void shouldThrowIOExceptionIfReadIsCalledAfterCallingClose() throws IOException { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("test_file.parquet").getPath()); @@ -135,16 +208,18 @@ public void shouldThrowIOExceptionIfReadIsCalledAfterCallingClose() throws IOExc } @Test - public void shouldThrowParquetFileSourceReaderInitializationExceptionIfCannotConstructReaderForTheFile() throws IOException { + public void shouldThrowParquetFileSourceReaderInitializationExceptionAndReportErrorIfCannotConstructReaderForTheFile() throws IOException { final File tempFile = tempFolder.newFile("test_file.parquet"); - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); assertThrows(ParquetFileSourceReaderInitializationException.class, () -> provider.getReader(tempFile.getPath())); + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + ParquetFileSourceReaderInitializationException.class.getName()); } @Test public void shouldReturnCheckPointedPositionWithNoOffsetAndZeroRecordsAfterOffsetWhenReadHasNotBeenCalledYet() { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("test_file.parquet").getPath()); @@ -155,7 +230,7 @@ public void shouldReturnCheckPointedPositionWithNoOffsetAndZeroRecordsAfterOffse @Test public void shouldUpdateCheckPointedPositionWithNoOffsetAndCountOfTotalRecordsReadYet() throws IOException { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("test_file.parquet").getPath()); @@ -167,8 +242,8 @@ public void shouldUpdateCheckPointedPositionWithNoOffsetAndCountOfTotalRecordsRe } @Test - public void shouldNotUpdateCheckpointedPositionWhenNoMoreRecordsToRead() throws IOException { - ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer); + public void shouldNotUpdateCheckPointedPositionWhenNoMoreRecordsToRead() throws IOException { + ParquetReader.ParquetReaderProvider provider = new ParquetReader.ParquetReaderProvider(deserializer, statsDReporterSupplierMock); ClassLoader classLoader = getClass().getClassLoader(); ParquetReader reader = provider.getReader(classLoader.getResource("test_file.parquet").getPath()); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssignerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssignerTest.java index c6252c23d..34ab3bade 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssignerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/parquet/splitassigner/ChronologyOrderedSplitAssignerTest.java @@ -2,13 +2,17 @@ import io.odpf.dagger.core.exception.PathParserNotProvidedException; +import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier; import io.odpf.dagger.core.source.config.models.TimeRange; import io.odpf.dagger.core.source.config.models.TimeRangePool; import io.odpf.dagger.core.source.parquet.path.HourDatePathParser; +import io.odpf.depot.metrics.StatsDReporter; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.core.fs.Path; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; import org.mockito.Mockito; import java.text.ParseException; @@ -18,12 +22,26 @@ import java.util.List; import java.util.Optional; +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.SPLITS_AWAITING_ASSIGNMENT; +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.TOTAL_SPLITS_DISCOVERED; +import static io.odpf.dagger.core.metrics.aspects.ChronologyOrderedSplitAssignerAspects.TOTAL_SPLITS_RECORDED; import static org.junit.Assert.*; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; public class ChronologyOrderedSplitAssignerTest { + @Mock + private StatsDReporter statsDReporter; + + private final SerializedStatsDReporterSupplier statsDReporterSupplierMock = () -> statsDReporter; + + @Before + public void setup() { + initMocks(this); + } + @Test public void shouldReturnFileSplitsHavingOldestDateFilePathsFirstWhenFilePathURLHasOnlyDate() { FileSourceSplit firstSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2019-10-12/asdghsdhasd"), 0, 1024); @@ -35,7 +53,9 @@ public void shouldReturnFileSplitsHavingOldestDateFilePathsFirstWhenFilePathURLH ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(inputSplits); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits); for (int i = 0; i < 4; i++) { Optional split = splitAssigner.getNext(null); @@ -55,7 +75,9 @@ public void shouldReturnFileSplitsHavingOldestTimeFilePathsFirstWhenFilePathURLH ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(inputSplits); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits); for (int i = 0; i < 4; i++) { Optional split = splitAssigner.getNext(null); @@ -71,7 +93,10 @@ public void shouldReturnEmptyOptionalWhenNoMoreSplitsToReturn() { ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(inputSplits); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder. + addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits); splitAssigner.getNext(null); Optional nextSplit = splitAssigner.getNext(null); @@ -86,7 +111,10 @@ public void shouldThrowIllegalArgumentExceptionDuringConstructionItselfWhenFileP IllegalArgumentException ex = Assert.assertThrows(IllegalArgumentException.class, () -> { ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(Collections.singleton(split)); + splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(Collections.singleton(split)); }); assertEquals("java.text.ParseException: Cannot extract timestamp from filepath for deciding order of processing.\n" @@ -101,7 +129,10 @@ public void shouldAddNewFileSourceSplitsWithOldestDateFilePathsReturnedFirstWhen FileSourceSplit fourthSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2020-12-31/hagga6a36dg"), 0, 1024); ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(Collections.singleton(secondSplit)); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(Collections.singleton(secondSplit)); List remainingSplitsToAdd = Arrays.asList(fourthSplit, firstSplit, thirdSplit); splitAssigner.addSplits(remainingSplitsToAdd); @@ -121,7 +152,10 @@ public void shouldAddNewFileSourceSplitsWithOldestTimeFilePathsReturnedFirstWhen FileSourceSplit fourthSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2020-12-31/hr=23/ahaha4a5dg"), 0, 1024); ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(Collections.singleton(secondSplit)); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(Collections.singleton(secondSplit)); List remainingSplitsToAdd = Arrays.asList(fourthSplit, firstSplit, thirdSplit); splitAssigner.addSplits(remainingSplitsToAdd); @@ -143,7 +177,10 @@ public void shouldReturnRemainingSplitsWhichAreNotAssignedYetInAscendingOrderOfF ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()).build(inputSplits); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits); splitAssigner.getNext(null); List remainingSplits = (List) splitAssigner.remainingSplits(); @@ -160,7 +197,10 @@ public void shouldCallPathParserToParseFilePathURL() throws ParseException { ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); - splitAssignerBuilder.addPathParser(hourDatePathParser).build(Collections.singleton(split)); + splitAssignerBuilder + .addPathParser(hourDatePathParser) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(Collections.singleton(split)); verify(hourDatePathParser, times(1)).instantFromFilePath(split.path()); } @@ -180,6 +220,7 @@ public void shouldReturnFileSplitsFallingInGivenTimeRangeWhenFilePathURLHasBothD ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder .addPathParser(new HourDatePathParser()) .addTimeRanges(timeRangePool) + .addStatsDReporterSupplier(statsDReporterSupplierMock) .build(inputSplits); assertEquals(firstSplit, splitAssigner.getNext(null).get()); @@ -204,6 +245,7 @@ public void shouldReturnFileSplitsFallingInMultipleTimeRangesWhenFilePathURLHasB ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder .addPathParser(new HourDatePathParser()) .addTimeRanges(timeRangePool) + .addStatsDReporterSupplier(statsDReporterSupplierMock) .build(inputSplits); assertEquals(firstSplit, splitAssigner.getNext(null).get()); @@ -224,8 +266,74 @@ public void shouldThrowExceptionIfPathParserNotProvided() { TimeRangePool timeRangePool = new TimeRangePool(); timeRangePool.add(new TimeRange(Instant.parse("2019-10-12T00:00:00Z"), Instant.parse("2019-10-12T04:00:00Z"))); timeRangePool.add(new TimeRange(Instant.parse("2020-11-29T00:00:00Z"), Instant.parse("2020-11-30T10:00:00Z"))); - PathParserNotProvidedException pathParserNotProvidedException = assertThrows(PathParserNotProvidedException.class, () -> splitAssignerBuilder.addTimeRanges(timeRangePool).build(inputSplits)); + PathParserNotProvidedException pathParserNotProvidedException = assertThrows(PathParserNotProvidedException.class, + () -> splitAssignerBuilder + .addTimeRanges(timeRangePool) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits)); assertEquals("Path parser is null", pathParserNotProvidedException.getMessage()); } + @Test + public void shouldRaiseMetricsForDiscoveredSplitsAndRecordedSplitsWithTagsWhenInitialized() { + FileSourceSplit firstSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2019-10-12/hr=00/hd6a7gad"), 0, 1024); + FileSourceSplit secondSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2019-10-12/hr=08/sa6advgad7"), 0, 1024); + FileSourceSplit thirdSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2020-11-30/hr=09/aga6adgad"), 0, 1024); + FileSourceSplit fourthSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2020-12-31/hr=23/ahaha4a5dg"), 0, 1024); + List inputSplits = Arrays.asList(secondSplit, fourthSplit, firstSplit, thirdSplit); + + TimeRangePool timeRangePool = new TimeRangePool(); + timeRangePool.add(new TimeRange(Instant.parse("2019-10-12T00:00:00Z"), Instant.parse("2020-11-30T10:00:00Z"))); + + ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); + + splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .addTimeRanges(timeRangePool) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits); + + verify(statsDReporter, times(1)).gauge(TOTAL_SPLITS_DISCOVERED.getValue(), 4, "component=split_assigner"); + verify(statsDReporter, times(1)).gauge(TOTAL_SPLITS_RECORDED.getValue(), 3, "component=split_assigner"); + } + + @Test + public void shouldRaiseMetricsWithTagsAfterAssigningSplits() { + FileSourceSplit firstSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2019-10-12/asdghsdhasd"), 0, 1024); + FileSourceSplit secondSplit = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2020-02-29/ga6agad6ad"), 0, 1024); + List inputSplits = Arrays.asList(firstSplit, secondSplit); + + ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); + ChronologyOrderedSplitAssigner splitAssigner = splitAssignerBuilder.addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(inputSplits); + splitAssigner.getNext(null); + + verify(statsDReporter, times(1)).gauge(SPLITS_AWAITING_ASSIGNMENT.getValue(), 1, "component=split_assigner"); + } + + @Test + public void shouldRaiseErrorMetricsWhenFilePathValidationFailed() { + FileSourceSplit split = new FileSourceSplit("1", new Path("gs://my-bucket/bid-log/dt=2019-130-12/shs6s5sdg"), 0, 1024); + ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); + assertThrows(IllegalArgumentException.class, () -> splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .addStatsDReporterSupplier(statsDReporterSupplierMock) + .build(Collections.singleton(split))); + + verify(statsDReporter, times(1)) + .captureCount("fatal.exception", 1L, "fatal_exception_type=" + IllegalArgumentException.class.getName()); + } + + @Test + public void shouldThrowExceptionIfStatsDSupplierNotProvided() { + ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder splitAssignerBuilder = new ChronologyOrderedSplitAssigner.ChronologyOrderedSplitAssignerBuilder(); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> splitAssignerBuilder + .addPathParser(new HourDatePathParser()) + .build(Collections.emptyList())); + + assertEquals("SerializedStatsDReporterSupplier is required but is set as null", exception.getMessage()); + } } diff --git a/docs/docs/reference/metrics.md b/docs/docs/reference/metrics.md index 25a84e6db..498ce0c28 100644 --- a/docs/docs/reference/metrics.md +++ b/docs/docs/reference/metrics.md @@ -16,6 +16,7 @@ Service-level Indicators \(SLIs\) are the measurements used to calculate the per - [Processors](metrics.md#processors) - [Longbow](metrics.md#longbow) - [Checkpointing](metrics.md#checkpointing) +- [Parquet Source](metrics.md#parquet-source) ## Overview @@ -435,6 +436,49 @@ This shows the details about checkpointing of the dagger job. Checkpointing is F - The stream where the input topic is read from +## Parquet Source + +This shows the metrics related to Parquet Source in Dagger. The metrics included are: + +### Row Deserialization Time Percentiles + +- This shows the percentile distribution of the time duration required to deserialize a single parquet record from a file +into a row. + +### Rows Emitted by Source Per Second + +- This shows, as the title suggests, the number of rows emitted by each source-reader aka the task manager per second. This +acts as a measure of the throughput of the parquet source. + +### Parquet Record Read Time Percentiles + +- This shows the percentile distribution of the time duration required to read a single parquet record from a file. + +### Total File Splits Discovered + +- This shows the net total of all files discovered by Dagger after doing a recursive search on all the GCS URLs as specified by the +config `SOURCE_PARQUET_FILE_PATHS`. + +### Total File Splits To Be Processed + +- This shows the net total of all files which will be processed by this dagger job. It is basically the number of file +paths as shown by the metric [`Total File Splits Discovered`](metrics.md#total-file-splits-discovered) minus the files which were skipped after applying the time +range filter as specified by config [`SOURCE_PARQUET_FILE_DATE_RANGE`](configuration.md#source_parquet_file_date_range). + +### File Splits Remaining For Processing + +- This shows the number of files which are pending to be processed by Dagger. It is a live metric and as the job progresses, +this value will slowly come down as the files get processed by Dagger and ultimately become 0 when the job completes. + +### Readers Created Per Minute + +- This shows the number of Parquet File readers which are created by Dagger per minute to process the files. One +reader is created for each new file. Readers are not re-used. + +### Readers Closed Per Minute + +- This shows the number of Parquet File readers which are closed and cleaned up by Dagger per minute. + ## Overview Some of the most important metrics related to firehose that gives you an overview of the current state of it.