Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
5eccb23
feat: initial hit and trial for publishing reader metrics
Meghajit Jun 21, 2022
7f26745
Revert "feat: initial hit and trial for publishing reader metrics"
Meghajit Jun 21, 2022
9d0c8e8
feat: check if open is called
Meghajit Jun 21, 2022
d196fb0
Revert "feat: check if open is called"
Meghajit Jun 21, 2022
acceed9
feat: refactor package names
Meghajit Jun 24, 2022
579139e
feat: exclude dogstatsd from stencil and use custom version
Meghajit Jun 27, 2022
ffb60ec
feat: implement metrics via dogstatsd
Meghajit Jun 27, 2022
f1a5572
feat: cache the global tags for perf reasons
Meghajit Jun 28, 2022
a6cee08
feat: use Depot and remove dogstatsd
Meghajit Jun 29, 2022
b2ebf92
feat: add depot to minimalJar
Meghajit Jul 5, 2022
1a7590a
feat: revert package name
Meghajit Jul 5, 2022
af18a80
feat: remove dogstatsd from dagger-common
Meghajit Jul 5, 2022
13ace76
feat: fix indentation
Meghajit Jul 5, 2022
5f08109
feat: create inner class for provider
Meghajit Jul 5, 2022
92b36bb
feat: use statsd.increment method instead of captureCount
Meghajit Jul 6, 2022
53f7cc3
feat: use statsdReporter directly instead of client
Meghajit Jul 6, 2022
72da9a5
feat: use = as tag separator
Meghajit Jul 6, 2022
0f32e00
feat: use = as tag separator
Meghajit Jul 6, 2022
ec6bd63
feat: delete dagger source tag
Meghajit Jul 7, 2022
20b607b
feat: delete unused register method from interface
Meghajit Jul 7, 2022
cfd01d6
feat: add StatsDErrorReporter
Meghajit Jul 7, 2022
94383b8
feat: add exception message to measurement as well
Meghajit Jul 7, 2022
2178281
feat: remove exception message measurement
Meghajit Jul 7, 2022
4ff6048
feat: report errors with exception class as tag value
Meghajit Jul 7, 2022
4aaafb2
feat: capture and report fatal exceptions
Meghajit Jul 7, 2022
a724c9d
feat: fix broken tests and checkstyle issues
Meghajit Jul 7, 2022
36f237c
feat: rename interface method
Meghajit Jul 8, 2022
e8f9602
feat: add tests for DaggerStatsDReporter
Meghajit Jul 8, 2022
365c533
feat: add tests for DaggerMetricsConfig
Meghajit Jul 8, 2022
cea2b4b
feat: add tests for DaggerCounterManager
Meghajit Jul 8, 2022
9fd9004
feat: add a close method to DaggerStatsDReporter for cleanup
Meghajit Jul 8, 2022
e13ed52
feat: delete meter measurement
Meghajit Jul 8, 2022
cf1d76d
feat: add tests for DaggerGaugeManager
Meghajit Jul 8, 2022
1529f0d
feat: add tests for DaggerHistogramManager
Meghajit Jul 8, 2022
fa9f19d
feat: make instance variable final in tests
Meghajit Jul 8, 2022
c329488
feat: change exception message
Meghajit Jul 8, 2022
482c833
feat: add tests for StatsDErrorReporter
Meghajit Jul 8, 2022
1308032
feat: add tests for ChronologyOrderedSplitAssigner
Meghajit Jul 8, 2022
5085eb4
feat: add tests for StatsDTag
Meghajit Jul 8, 2022
230c3f8
feat: add error reporter tests for DaggerSourceFactory
Meghajit Jul 8, 2022
5e76f05
feat: add error reporter tests for ParquetDaggerSource
Meghajit Jul 8, 2022
5704371
feat: add error reporter tests for ParquetFileRecordFormat
Meghajit Jul 8, 2022
6acf9db
feat: add error reporter tests for ParquetFileSource
Meghajit Jul 8, 2022
9e98b97
feat: add error reporter tests for ParquetReader
Meghajit Jul 8, 2022
b854022
feat: delete unused measurements
Meghajit Jul 8, 2022
a4cd687
feat: use lambda to provide error reporter in ParquetFileRecordFormat
Meghajit Jul 11, 2022
2aafa62
doc: add documentation for metrics
Meghajit Jul 11, 2022
9ea2d29
doc: add parquet source metrics dashboard
Meghajit Jul 12, 2022
87ed606
feat: refactor methods
Meghajit Jul 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dagger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ configurations {
dependencies {
minimalJar project(path: ':dagger-common', configuration: 'minimalCommonJar')
minimalJar project(path: ':dagger-functions', configuration: 'minimalFunctionsJar')
minimalJar 'io.odpf:depot:0.1.5'

compileOnly 'org.projectlombok:lombok:1.18.8'
annotationProcessor 'org.projectlombok:lombok:1.18.8'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.odpf.dagger.core;

import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.source.Stream;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -220,6 +221,8 @@ private void addSink(StreamInfo streamInfo) {
}

List<Stream> getStreams() {
return StreamsFactory.getStreams(configuration, stencilClientOrchestrator);
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
DaggerStatsDReporter daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);
return StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.serde.DaggerDeserializer;
import io.odpf.dagger.core.exception.DaggerConfigurationException;
import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier;
import io.odpf.dagger.core.metrics.reporters.statsd.StatsDErrorReporter;
import io.odpf.dagger.core.source.config.StreamConfig;
import org.apache.flink.types.Row;

Expand All @@ -12,12 +14,17 @@
import java.util.stream.Stream;

public class DaggerDeserializerFactory {
public static DaggerDeserializer<Row> create(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator) {
public static DaggerDeserializer<Row> create(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, SerializedStatsDReporterSupplier statsDReporterSupplier) {
return getDaggerDeserializerProviders(streamConfig, configuration, stencilClientOrchestrator)
.stream()
.filter(DaggerDeserializerProvider::canProvide)
.findFirst()
.orElseThrow(() -> new DaggerConfigurationException("No suitable deserializer could be constructed for the given stream configuration."))
.orElseThrow(() -> {
StatsDErrorReporter statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier);
DaggerConfigurationException ex = new DaggerConfigurationException("No suitable deserializer could be constructed for the given stream configuration.");
statsDErrorReporter.reportFatalException(ex);
return ex;
})
.getDaggerDeserializer();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.odpf.dagger.core.metrics.aspects;

import io.odpf.dagger.common.metrics.aspects.AspectType;
import io.odpf.dagger.common.metrics.aspects.Aspects;

public enum ChronologyOrderedSplitAssignerAspects implements Aspects {
TOTAL_SPLITS_DISCOVERED("total_splits_discovered", AspectType.Gauge),
TOTAL_SPLITS_RECORDED("total_splits_recorded", AspectType.Gauge),
SPLITS_AWAITING_ASSIGNMENT("splits_awaiting_assignment", AspectType.Counter);

ChronologyOrderedSplitAssignerAspects(String value, AspectType aspectType) {
this.value = value;
this.aspectType = aspectType;
}

private final String value;
private final AspectType aspectType;

@Override
public String getValue() {
return value;
}

@Override
public AspectType getAspectType() {
return aspectType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.odpf.dagger.core.metrics.aspects;

import io.odpf.dagger.common.metrics.aspects.AspectType;
import io.odpf.dagger.common.metrics.aspects.Aspects;

public enum ParquetReaderAspects implements Aspects {
READER_CREATED("reader_created", AspectType.Counter),
READER_CLOSED("reader_closed", AspectType.Counter),
READER_ROWS_EMITTED("reader_rows_emitted", AspectType.Counter),
READER_ROW_DESERIALIZATION_TIME("reader_row_deserialization_time", AspectType.Histogram),
READER_ROW_READ_TIME("reader_row_read_time", AspectType.Histogram);

private final String value;
private final AspectType aspectType;

ParquetReaderAspects(String value, AspectType aspectType) {
this.value = value;
this.aspectType = aspectType;
}

@Override
public String getValue() {
return value;
}

@Override
public AspectType getAspectType() {
return aspectType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.odpf.dagger.core.metrics.reporters.statsd;

import io.odpf.depot.config.MetricsConfig;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;

public class DaggerMetricsConfig implements MetricsConfig {
private static final String FLINK_STATSD_HOST_CONFIG_KEY = "metrics.reporter.stsd.host";
private static final String DEFAULT_STATSD_HOST_VALUE = "localhost";
private static final String FLINK_STATSD_PORT_CONFIG_KEY = "metrics.reporter.stsd.port";
private static final int DEFAULT_STATSD_PORT_VALUE = 8125;
private final String hostName;
private final int port;

public DaggerMetricsConfig(Configuration flinkConfiguration) {
ConfigOption<String> hostConfigOption = ConfigOptions
.key(FLINK_STATSD_HOST_CONFIG_KEY)
.stringType()
.defaultValue(DEFAULT_STATSD_HOST_VALUE);
ConfigOption<Integer> portConfigOption = ConfigOptions
.key(FLINK_STATSD_PORT_CONFIG_KEY)
.intType()
.defaultValue(DEFAULT_STATSD_PORT_VALUE);
this.hostName = flinkConfiguration.getString(hostConfigOption);
this.port = flinkConfiguration.getInteger(portConfigOption);
}

@Override
public String getMetricStatsDHost() {
return hostName;
}

@Override
public Integer getMetricStatsDPort() {
return port;
}

@Override
public String getMetricStatsDTags() {
return "";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.odpf.dagger.core.metrics.reporters.statsd;

import io.odpf.dagger.core.metrics.reporters.statsd.tags.GlobalTags;
import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag;
import io.odpf.depot.metrics.StatsDReporter;
import io.odpf.depot.metrics.StatsDReporterBuilder;
import org.apache.flink.configuration.Configuration;

import java.io.IOException;
import java.util.Arrays;

import static io.odpf.dagger.core.utils.Constants.FLINK_JOB_ID_DEFAULT;
import static io.odpf.dagger.core.utils.Constants.FLINK_JOB_ID_KEY;

public class DaggerStatsDReporter implements SerializedStatsDReporterSupplier {
private static StatsDReporter statsDReporter;
private final Configuration flinkConfiguration;
private final io.odpf.dagger.common.configuration.Configuration daggerConfiguration;

private DaggerStatsDReporter(Configuration flinkConfiguration, io.odpf.dagger.common.configuration.Configuration daggerConfiguration) {
this.flinkConfiguration = flinkConfiguration;
this.daggerConfiguration = daggerConfiguration;
}

private String[] generateGlobalTags() {
StatsDTag[] globalTags = new StatsDTag[]{
new StatsDTag(GlobalTags.JOB_ID, daggerConfiguration.getString(FLINK_JOB_ID_KEY, FLINK_JOB_ID_DEFAULT))};
return Arrays.stream(globalTags)
.map(StatsDTag::getFormattedTag)
.toArray(String[]::new);
}

@Override
public StatsDReporter buildStatsDReporter() {
if (statsDReporter == null) {
DaggerMetricsConfig daggerMetricsConfig = new DaggerMetricsConfig(flinkConfiguration);
String[] globalTags = generateGlobalTags();
statsDReporter = StatsDReporterBuilder
.builder()
.withMetricConfig(daggerMetricsConfig)
.withExtraTags(globalTags)
.build();
}
return statsDReporter;
}

protected static void close() throws IOException {
if (statsDReporter != null) {
statsDReporter.close();
statsDReporter = null;
}
}

public static class Provider {
public static DaggerStatsDReporter provide(Configuration flinkConfiguration, io.odpf.dagger.common.configuration.Configuration daggerConfiguration) {
return new DaggerStatsDReporter(flinkConfiguration, daggerConfiguration);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.odpf.dagger.core.metrics.reporters.statsd;

import io.odpf.depot.metrics.StatsDReporter;

import java.io.Serializable;

/* Flink requires that all objects which are needed to prepare the Job Graph should be serializable along with their
properties/fields. StatsDReporter and its fields are not serializable. Hence, in order to mitigate job graph creation
failure, we create a serializable interface around the reporter as below. This is a common idiom to make un-serializable
fields serializable in Java 8: https://stackoverflow.com/a/22808112 */

@FunctionalInterface
public interface SerializedStatsDReporterSupplier extends Serializable {
StatsDReporter buildStatsDReporter();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.odpf.dagger.core.metrics.reporters.statsd;

import io.odpf.dagger.core.metrics.reporters.ErrorReporter;
import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag;
import io.odpf.depot.metrics.StatsDReporter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

import java.io.Serializable;

import static io.odpf.dagger.core.utils.Constants.FATAL_EXCEPTION_METRIC_GROUP_KEY;
import static io.odpf.dagger.core.utils.Constants.NONFATAL_EXCEPTION_METRIC_GROUP_KEY;

public class StatsDErrorReporter implements ErrorReporter, Serializable {
private static final String FATAL_EXCEPTION_TAG_KEY = "fatal_exception_type";
private static final String NON_FATAL_EXCEPTION_TAG_KEY = "non_fatal_exception_type";
private final StatsDReporter statsDReporter;

public StatsDErrorReporter(SerializedStatsDReporterSupplier statsDReporterSupplier) {
this.statsDReporter = statsDReporterSupplier.buildStatsDReporter();
}

@Override
public void reportFatalException(Exception exception) {
StatsDTag statsDTag = new StatsDTag(FATAL_EXCEPTION_TAG_KEY, exception.getClass().getName());
statsDReporter.captureCount(FATAL_EXCEPTION_METRIC_GROUP_KEY, 1L, statsDTag.getFormattedTag());
}

@Override
public void reportNonFatalException(Exception exception) {
StatsDTag statsDTag = new StatsDTag(NON_FATAL_EXCEPTION_TAG_KEY, exception.getClass().getName());
statsDReporter.captureCount(NONFATAL_EXCEPTION_METRIC_GROUP_KEY, 1L, statsDTag.getFormattedTag());
}

@Override
public Counter addExceptionToCounter(Exception exception, MetricGroup metricGroup, String metricGroupKey) {
throw new UnsupportedOperationException("This operation is not supported on StatsDErrorReporter");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.odpf.dagger.core.metrics.reporters.statsd.manager;

import io.odpf.dagger.common.metrics.aspects.Aspects;
import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier;
import io.odpf.dagger.core.metrics.reporters.statsd.measurement.Counter;
import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag;
import io.odpf.depot.metrics.StatsDReporter;

import java.util.ArrayList;

public class DaggerCounterManager implements MeasurementManager, Counter {
private final StatsDReporter statsDReporter;
private String[] formattedTags;

public DaggerCounterManager(SerializedStatsDReporterSupplier statsDReporterSupplier) {
this.statsDReporter = statsDReporterSupplier.buildStatsDReporter();
}

@Override
public void register(StatsDTag[] tags) {
ArrayList<String> tagList = new ArrayList<>();
for (StatsDTag measurementTag : tags) {
tagList.add(measurementTag.getFormattedTag());
}
this.formattedTags = tagList.toArray(new String[0]);
}

@Override
public void increment(Aspects aspect) {
increment(aspect, 1L);
}

@Override
public void increment(Aspects aspect, long positiveCount) {
statsDReporter.captureCount(aspect.getValue(), positiveCount, formattedTags);
}

@Override
public void decrement(Aspects aspect) {
decrement(aspect, -1L);
}

@Override
public void decrement(Aspects aspect, long negativeCount) {
statsDReporter.captureCount(aspect.getValue(), negativeCount, formattedTags);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.odpf.dagger.core.metrics.reporters.statsd.manager;

import io.odpf.dagger.common.metrics.aspects.Aspects;
import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier;
import io.odpf.dagger.core.metrics.reporters.statsd.measurement.Gauge;
import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag;
import io.odpf.depot.metrics.StatsDReporter;

import java.util.ArrayList;

public class DaggerGaugeManager implements MeasurementManager, Gauge {
private final StatsDReporter statsDReporter;
private String[] formattedTags;

public DaggerGaugeManager(SerializedStatsDReporterSupplier statsDReporterSupplier) {
this.statsDReporter = statsDReporterSupplier.buildStatsDReporter();
}

@Override
public void register(StatsDTag[] tags) {
ArrayList<String> tagList = new ArrayList<>();
for (StatsDTag measurementTag : tags) {
tagList.add(measurementTag.getFormattedTag());
}
this.formattedTags = tagList.toArray(new String[0]);
}

@Override
public void markValue(Aspects aspect, int gaugeValue) {
statsDReporter.gauge(aspect.getValue(), gaugeValue, formattedTags);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.odpf.dagger.core.metrics.reporters.statsd.manager;

import io.odpf.dagger.common.metrics.aspects.Aspects;
import io.odpf.dagger.core.metrics.reporters.statsd.SerializedStatsDReporterSupplier;
import io.odpf.dagger.core.metrics.reporters.statsd.measurement.Histogram;
import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag;
import io.odpf.depot.metrics.StatsDReporter;

import java.util.ArrayList;

public class DaggerHistogramManager implements MeasurementManager, Histogram {
private final StatsDReporter statsDReporter;
private String[] formattedTags;

public DaggerHistogramManager(SerializedStatsDReporterSupplier statsDReporterSupplier) {
this.statsDReporter = statsDReporterSupplier.buildStatsDReporter();
}

@Override
public void register(StatsDTag[] tags) {
ArrayList<String> tagList = new ArrayList<>();
for (StatsDTag measurementTag : tags) {
tagList.add(measurementTag.getFormattedTag());
}
this.formattedTags = tagList.toArray(new String[0]);
}

@Override
public void recordValue(Aspects aspect, long value) {
statsDReporter.captureHistogram(aspect.getValue(), value, formattedTags);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.odpf.dagger.core.metrics.reporters.statsd.manager;

import io.odpf.dagger.core.metrics.reporters.statsd.tags.StatsDTag;

import java.io.Serializable;

public interface MeasurementManager extends Serializable {
Copy link
Member

@lavkesh lavkesh Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we have discussed the design of these interfaces..

this is not used anywhere. then IMO, it's not needed.

Copy link
Member Author

@Meghajit Meghajit Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface serves as an abstraction for registering and memoizing statsd tags of a component ( SplitAssigner, Parquet Reader, etc) for all measurements which get emitted from that component. Since, StatsDReporter expects tags to be sent along with each measurement call ( gauge, counter, etc), which is unnecessary when all the measurements that belong to a class have the same tags. (Not to be confused with global tags which we register during the creation call of statsdreporter itself)

This interface is not unused. It's implemented by all the 3 measurement managers: DaggerGaugeManager, DaggerHistogramManager and DaggerCounterManager.

void register(StatsDTag[] tags);
}
Loading