-
Notifications
You must be signed in to change notification settings - Fork 42
feat: add parquet source metrics #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add parquet source metrics #174
Conversation
This reverts commit 5eccb23.
This reverts commit 9d0c8e8.
- add new package types for flink and statsd measurements [#108]
- Dagger will use its own version to send metrics to telegraf. Keeping multiple versions of the same dependency can be avoided here. [#108]
- remove unused imports [#108]
- use statsDReporter for raising metrics [#108]
- revert package names refactoring [#108]
- return empty string in getMetricStatsDTags [#108]
- global tags fail to be sent if statsdClient is used - remove unused interface methods [#108]
- refactor flow for null tag value [#108]
- FileRecordFormat object and all it's fields need to be serializable in order to construct the Flink job graph. Even though StatsDErrorReporter is made serializable, it contains StatsDErrorReporter, which in turn contains more fields which may not be serializable.Hence, in order to mitigate job graph creation failures, we wrap the error reporter inside a serializable lambda. This is a common idiom to make un-serializable fields serializable in Java 8: https://stackoverflow.com/a/22808112 [#108]
|
|
||
| import java.io.Serializable; | ||
|
|
||
| public interface MeasurementManager extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we have discussed the design of these interfaces..
this is not used anywhere. then IMO, it's not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface serves as an abstraction for registering and memoizing statsd tags of a component ( SplitAssigner, Parquet Reader, etc) for all measurements which get emitted from that component. Since, StatsDReporter expects tags to be sent along with each measurement call ( gauge, counter, etc), which is unnecessary when all the measurements that belong to a class have the same tags. (Not to be confused with global tags which we register during the creation call of statsdreporter itself)
This interface is not unused. It's implemented by all the 3 measurement managers: DaggerGaugeManager, DaggerHistogramManager and DaggerCounterManager.
|
|
||
| import java.io.Serializable; | ||
|
|
||
| public interface Counter extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me an over-design.. but debatable. I see only one implementation, i don't think we will have more implementation of this. it seems like we are wrapping statsDReporter method with another method. I am not sure how this is useful.
Did we discuss all these interfaces before implementing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, there is just one implementation. However, this is analogous to the current class design that we have for Flink metrics as well, which currently get emitted via Flink Runtime Context. You can look at GaugeStatsManager.java, MeterStatsManager.java, etc.
The benefit is this makes the usage of metrics consistent across the codebase. This is implemented by DaggerCounterManager which provides the implementation on how to raise a counter metric via StatsD. If you look at the existing code, we similarly use different managers like CounterStatsManager when raising metrics via Flink.
I wanted to implement a single Counter interface which will have all the methods required to raise a counter metric and then individual classes could decide which implementation to use ( Flink or StatsD). Unfortunately, that would mean changing and making the contracts of "how to raise a counter metric" universal across multiple different places in Dagger which will be too much scope addition for this story.
Hence, I took the middle ground of keeping a similar interface for StatsD measurement types in Dagger as the one we have for Flink Reporter at the moment. Given we have scope and bandwidth, we can merge both into a single interface.
| private ChronologyOrderedSplitAssigner(Collection<FileSourceSplit> fileSourceSplits, PathParser pathParser, | ||
| TimeRangePool timeRangePool, SerializedStatsDReporterSupplier statsDReporterSupplier) { | ||
| this.registerTagsWithMeasurementManagers(statsDReporterSupplier); | ||
| daggerGaugeManager.markValue(TOTAL_SPLITS_DISCOVERED, fileSourceSplits.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of times we are using this and some of times we are not. It's better to be consistent. let's remove it from the function calls..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should Move all the assignment code before any function calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed via commit 87ed606
| private ChronologyOrderedSplitAssigner(Collection<FileSourceSplit> fileSourceSplits, PathParser pathParser, TimeRangePool timeRangePool) { | ||
| private ChronologyOrderedSplitAssigner(Collection<FileSourceSplit> fileSourceSplits, PathParser pathParser, | ||
| TimeRangePool timeRangePool, SerializedStatsDReporterSupplier statsDReporterSupplier) { | ||
| this.registerTagsWithMeasurementManagers(statsDReporterSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method is weird, because it's initialising daggerGaugeManager inside, and we are using the object outside.
we can just inline the code here, we don't need this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this out because the constructor was getting biggish with function calls and whatnot. Generally I like to keep constructors small for code readability and leave it to the compiler for inlining whatever it can.
Should I try to keep the initialization parts in the constructor and move the function calls to a separate method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm. i think we can separate daggerGaugeManager.markValue() calls from the constructor.
private ChronologyOrderedSplitAssigner(Collection<FileSourceSplit> fileSourceSplits, PathParser pathParser,
TimeRangePool timeRangePool, SerializedStatsDReporterSupplier statsDReporterSupplier) {
this.pathParser = pathParser;
this.timeRangePool = timeRangePool;
this.statsDErrorReporter = new StatsDErrorReporter(statsDReporterSupplier);
this.unassignedSplits = new PriorityBlockingQueue<>(INITIAL_DEFAULT_CAPACITY, getFileSourceSplitComparator());
this.daggerGaugeManager = new DaggerGaugeManager(statsDReporterSupplier);
initAndValidate(fileSourceSplits);
}
private void initAndValidate(Collection<FileSourceSplit> fileSourceSplits) {
StatsDTag[] splitAssignerTags = getSplitAssignerTags();
daggerGaugeManager.register(splitAssignerTags);
daggerGaugeManager.markValue(TOTAL_SPLITS_DISCOVERED, fileSourceSplits.size());
for (FileSourceSplit split : fileSourceSplits) {
validateAndAddSplits(split);
}
daggerGaugeManager.markValue(TOTAL_SPLITS_RECORDED, unassignedSplits.size());
daggerGaugeManager.markValue(SPLITS_AWAITING_ASSIGNMENT, unassignedSplits.size());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try to implement it 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed via commit 87ed606
- applies review comment #174 (comment) and #174 (comment) [#108]
Closes #108