Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
188 commits
Select commit Hold shift + click to select a range
b0a93f3
feat: add parquet source specific configs
Meghajit Feb 16, 2022
d27ce82
feat: extract enums into their own file
Meghajit Feb 17, 2022
099287c
feat: create a KafkaSource factory
Meghajit Feb 17, 2022
3419983
feat: create a common interface for all serializers
Meghajit Feb 17, 2022
2cba26a
feat: create a ParquetFileSource and its builder
Meghajit Feb 17, 2022
480603c
feat: add ParquetFileSourceFactory
Meghajit Feb 17, 2022
0e6a3bc
feat: add SourceFactory to create sources
Meghajit Feb 17, 2022
02867bd
feat: create skeleton for a basic parquet file reader
Meghajit Feb 18, 2022
c264de7
feat: create skeleton for two split assigners
Meghajit Feb 18, 2022
e9d0d4c
feat: create skeleton for ParquetFileRecordFormat
Meghajit Feb 18, 2022
56315b1
feat: add more methods in ParquetFileSourceFactory
Meghajit Feb 18, 2022
c82d0be
feat: add runtime dependency for parquet-column into dagger-common
Meghajit Feb 21, 2022
0ed5f41
feat: add an interface for parquet data type parser
Meghajit Feb 21, 2022
1a6f12d
feat: implement parquet parsers for some primitive data types
Meghajit Feb 21, 2022
d9ae93d
feat: handle null args for ParquetDataTypeParser
Meghajit Feb 22, 2022
7690b89
feat: implement parquet parser for float
Meghajit Feb 22, 2022
189fb7a
feat: implement parquet parsers for binary string
Meghajit Feb 22, 2022
b83aba3
feat: implement parquet parsers for double primitive type
Meghajit Feb 22, 2022
1bd9598
feat: implement parquet parsers for enum
Meghajit Feb 22, 2022
e73d736
feat: implement parquet parser for timestamp
Meghajit Feb 24, 2022
ab33372
feat: add check for missing logical type annotation
Meghajit Feb 24, 2022
d99e1ea
feat: add validation for missing logical type annotation
Meghajit Feb 24, 2022
b2fd657
feat: add a validation factory for parquet schema checks
Meghajit Feb 24, 2022
a3223a2
feat: add more checks to SimpleGroupValidation
Meghajit Feb 25, 2022
dc45553
feat: apply validations to ParquetBoolean parser
Meghajit Feb 28, 2022
2f96b4c
feat: apply validations in ParquetTimestampParser
Meghajit Feb 28, 2022
374f98c
feat: apply validations in ParquetInt64Parser
Meghajit Feb 28, 2022
c7d4ea7
feat: apply validations in ParquetInt32Parser
Meghajit Feb 28, 2022
cd2a143
feat: apply validations in ParquetFloatParser
Meghajit Feb 28, 2022
5b52214
feat: apply validations in ParquetDoubleParser
Meghajit Feb 28, 2022
3cc0828
feat: apply validations in ParquetEnumParser
Meghajit Feb 28, 2022
359cbef
feat: apply validations in ParquetStringParser
Meghajit Feb 28, 2022
bfde8a4
feat: implement parquet data type identifier
Meghajit Feb 28, 2022
45f9789
feat: add canHandle as an abstract method in ParquetDataTypeParser
Meghajit Feb 28, 2022
bc829d3
feat: change signature of ParquetDataTypeParser.getValueOrDefault()
Meghajit Feb 28, 2022
fde86ac
bugfix: use Objects.equals instead of ==`
Meghajit Feb 28, 2022
55ce9fc
refactor: opt to use safer Objects.equals
Meghajit Feb 28, 2022
1a243a5
feat: mark constructor as public for ParquetDataTypeID
Meghajit Mar 3, 2022
2e073bc
feat: add factory method for producing ParquetDataTypeParser
Meghajit Mar 3, 2022
8ccb516
feat: add method to deserialize from simple group in BooleanPrimitive…
Meghajit Mar 4, 2022
e82bea0
feat: add method to deserialize from simple group in DoublePrimitiveT…
Meghajit Mar 4, 2022
f70ee38
feat: add method to deserialize from simple group in FloatPrimitiveTy…
Meghajit Mar 4, 2022
a490dbd
feat: add method to deserialize from simple group in IntegerPrimitive…
Meghajit Mar 4, 2022
0c43843
feat: add method to deserialize from simple group in LongPrimitiveTyp…
Meghajit Mar 4, 2022
9b89593
feat: add method to deserialize from simple group in StringPrimitiveT…
Meghajit Mar 4, 2022
a2a8c2c
feat: add method to deserialize from simple group in ByteStringPrimit…
Meghajit Mar 4, 2022
a85b07e
feat: rename interface method and its usages
Meghajit Mar 6, 2022
17a7284
feat: add support for transforming simple group in EnumProtoHandler
Meghajit Mar 6, 2022
1f32b54
feat: add support for transforming simple group in TimestampProtoHandler
Meghajit Mar 6, 2022
d01fb95
feat: undo adding factory method to create ParquetDataTypeParser
Meghajit Mar 6, 2022
e0705be
feat: add factory method in SimpleGroupValidation
Meghajit Mar 6, 2022
960886c
feat: add a test proto for testing parquet source
Meghajit Mar 6, 2022
071122f
feat: support transforming of simple group in PrimitiveProtoHandler
Meghajit Mar 7, 2022
4c4dbe2
feat: add factory method to convert simple group to row
Meghajit Mar 7, 2022
5200fbd
feat: add a simple group deserializer
Meghajit Mar 8, 2022
9ba9c2c
feat: rename interface method to transformFromKafka
Meghajit Mar 8, 2022
255814e
feat: add and implement transformFromParquet
Meghajit Mar 8, 2022
4b375c6
feat: move SimpleGroupValidation to another package
Meghajit Mar 8, 2022
bec88e9
feat: delete io.odpf.dagger.common.serde.parquet.parser package
Meghajit Mar 8, 2022
20890f6
feat: call transformFromParquet instead in RowFactory method
Meghajit Mar 10, 2022
9a30c92
feat: throw exception for bounded kafka source
Meghajit Mar 11, 2022
c4a7651
feat: stop passing stencil to ParquetFileSource and its downstream co…
Meghajit Mar 11, 2022
7104a05
feat: throw exception for unbounded parquet source
Meghajit Mar 11, 2022
c847670
feat: add constructor to SourceDetails
Meghajit Mar 11, 2022
8b3bd8d
feat: make both JsonDeserializer and ProtoDeserializer implement Dagg…
Meghajit Mar 11, 2022
a28e7a7
feat: Add a deserializer factory
Meghajit Mar 11, 2022
651583f
feat: add new field and constructor to Stream POJO
Meghajit Mar 11, 2022
f2de48a
feat: add method to create data sources with deserializer in StreamBu…
Meghajit Mar 11, 2022
4cb77f7
feat: add a 2 argument constructor for StreamBuilder
Meghajit Mar 11, 2022
2ef46ec
feat: call StreamBuilder with stencil from ProtoDataStreamBuilder
Meghajit Mar 11, 2022
5360b14
feat: add tests to check cases related to new stream config SourceDet…
Meghajit Mar 11, 2022
91afc8f
feat: enable Mockito plugin for mocking of final methods
Meghajit Mar 11, 2022
6037afc
feat: move DeserializerFactory to another package and add tests
Meghajit Mar 11, 2022
a236b3b
feat: add a class for populating source specific stream metrics
Meghajit Mar 14, 2022
a8d3278
feat: delegate metrics away from StreamBuilder to StreamMetrics
Meghajit Mar 14, 2022
3f7582c
feat: create stream with source details from SourceFactory
Meghajit Mar 14, 2022
07e86c5
feat: migrate to using buildStream in StreamBuilder implementations
Meghajit Mar 14, 2022
09cb1af
feat: rename buildStream to build
Meghajit Mar 14, 2022
8375565
feat: add tests for StreamMetrics
Meghajit Mar 14, 2022
cd83662
feat: refactor StreamsFactory
Meghajit Mar 14, 2022
13cac7e
feat: refactor StreamMetricsTest
Meghajit Mar 14, 2022
69c5cb9
feat: add tests for DeserializerFactory
Meghajit Mar 14, 2022
2d566df
feat: stop sending StreamConfig to ParquetFileRecordFormat
Meghajit Mar 14, 2022
908a7aa
feat: cleanup test and edit exception message
Meghajit Mar 14, 2022
dc7cff9
feat: refactor SourceFactory and add tests
Meghajit Mar 14, 2022
0ff1de6
feat: refactor DeserializerFactory
Meghajit Mar 15, 2022
357a4f5
feat: refactor StreamBuilder
Meghajit Mar 15, 2022
5a03736
feat: add custom exception for simple group parsing failures
Meghajit Mar 15, 2022
56d1dcc
feat: throw SimpleGroupParsingException from SimpleGroupDeserializer
Meghajit Mar 15, 2022
0a64f11
feat: change exception thrown during simple group parsing in Timestam…
Meghajit Mar 15, 2022
e3321d8
feat: build FileSource from ParquetFileSource in SourceFactory
Meghajit Mar 15, 2022
95e334c
feat: add a Builder nested class inside ParquetFileRecordFormat
Meghajit Mar 15, 2022
437ecc1
feat: delete factories for kafka and parquet source
Meghajit Mar 15, 2022
0aaf743
feat: use Google precondition checks instead of flink ones
Meghajit Mar 15, 2022
dc1286c
feat: Create StreamType and its implementations
Meghajit Mar 29, 2022
057f46f
feat: make abstract methods public in StreamType and its implementations
Meghajit Mar 29, 2022
66727f8
feat: add tests for KafkaSourceJsonType
Meghajit Mar 29, 2022
c9b773a
feat: return StreamType from build method
Meghajit Mar 29, 2022
945d21c
feat: refactor tests
Meghajit Mar 29, 2022
8be0e78
feat: add tests for ParquetSourceProtoType
Meghajit Mar 29, 2022
cfa4990
feat: add tests for KafkaSourceProtoType
Meghajit Mar 29, 2022
0972a1f
feat: add a POJO for holding parquet file split with its instant
Meghajit Mar 30, 2022
cba471b
feat: implement chronology ordered split assigner and add tests
Meghajit Mar 30, 2022
9606844
feat: rename files and tests for streamtype
Meghajit Mar 30, 2022
b51fbaa
feat: move the builder class as a nested class inside ParquetFileSource
Meghajit Mar 30, 2022
c5ff670
feat: allow chronology ordered split assigner to be initialized with …
Meghajit Mar 30, 2022
ab992c3
feat: add tests for ParquetFileSource and add more validations
Meghajit Mar 30, 2022
e732769
feat: make provider class as nested class inside PrimitiveReader
Meghajit Mar 30, 2022
e372b31
feat: add tests for ParquetFileRecordFormat
Meghajit Mar 30, 2022
4830b1a
feat: add few tests for PrimitiveReader
Meghajit Mar 30, 2022
35736bb
feat: implement PrimitiveReader and add tests
Meghajit Mar 31, 2022
fadee0b
feat: throw runtime exception if error in creating parquet reader
Meghajit Mar 31, 2022
daebbfe
Merge remote-tracking branch 'origin/main' into feat/add-parquet-data…
Meghajit Apr 1, 2022
f2abffb
feat: make the type information provider and reader provider function…
Meghajit Apr 1, 2022
19612ed
feat: fix checkstyle formatting issues
Meghajit Apr 4, 2022
ae6bf6b
feat: fix checkstyle issues in split assigner
Meghajit Apr 4, 2022
d61f6e3
feat: fix checkstyle formatting issues in stream type impls
Meghajit Apr 4, 2022
6cad4ee
feat: make ParquetFileSource serializable
Meghajit Apr 4, 2022
a0cb47d
feat: remove serialization from PrimitiveReader
Meghajit Apr 4, 2022
c518d0f
feat: make SourceDetails and StreamType serializable
Meghajit Apr 4, 2022
bde26e2
feat: fix checkstyle formatting issues
Meghajit Apr 4, 2022
05763f9
feat: fix checkstyle formatting errors
Meghajit Apr 4, 2022
6d58b98
feat: return UNBOUNDED/KAFKA as default SourceDetails when not specified
Meghajit Apr 4, 2022
6523779
feat: return StreamType instead of Stream from StreamsFactory
Meghajit Apr 5, 2022
404f476
feat: delete Stream, StreamBuilder and its implementations
Meghajit Apr 5, 2022
5eb3b55
feat: delete SourceFactory,DeserializerFactory and StreamMetrics
Meghajit Apr 5, 2022
556ccec
feat: delete Mockito extension
Meghajit Apr 5, 2022
642a0ea
feat: rename KafkaSourceJsonSchema to KafkaSourceJsonSchemaStreamType
Meghajit Apr 5, 2022
0a44559
feat: rename KafkaSourceProtoSchema to KafkaSourceProtoSchemaStreamType
Meghajit Apr 5, 2022
514793d
feat: rename ParquetSourceProtoSchema to ParquetSourceProtoSchemaStre…
Meghajit Apr 5, 2022
e298aed
feat: throw runtime exception for unsupported split assigner
Meghajit Apr 5, 2022
5cae0f4
feat: return default timestamp when not set or present in simple group
Meghajit Apr 5, 2022
66dcf95
feat: delete unused methods from SimpleGroupValidation
Meghajit Apr 5, 2022
539dbd0
feat: inject file path regex into chronology split assigner from stre…
Meghajit Apr 5, 2022
f6cf116
feat: edit the exception message for regex no match in chronology spl…
Meghajit Apr 5, 2022
7b51bb6
Revert "feat: edit the exception message for regex no match in chrono…
Meghajit Apr 5, 2022
a2c0c5c
Revert "feat: inject file path regex into chronology split assigner f…
Meghajit Apr 5, 2022
1850a61
feat: rename functions of ProtoHandler as per review
Meghajit Apr 5, 2022
83489dd
feat: create row from millis inline instead of calling RowFactory
Meghajit Apr 5, 2022
4fa0986
feat: remove unused imports
Meghajit Apr 7, 2022
3877689
feat: add support for deprecated FlinkKafkaConsumer
Meghajit Apr 13, 2022
1714c4d
test: add tests for StreamsFactory
Meghajit Apr 18, 2022
18d2752
test: add tests for StreamType
Meghajit Apr 18, 2022
4034cba
test: add tests for DaggerDeserializerFactory
Meghajit Apr 19, 2022
0cef54c
test: add tests for DaggerSourceFactory
Meghajit Apr 19, 2022
ad796c5
test: add tests for JsonDeserializerProvider
Meghajit Apr 19, 2022
4a2fb73
test: add tests for ProtoDeserializerProvider
Meghajit Apr 19, 2022
711e95b
test: add tests for SimpleGroupDeserializerProvider
Meghajit Apr 19, 2022
9564169
test: add tests for KafkaDaggerSource
Meghajit Apr 19, 2022
8514ef0
test: add tests for FlinkKafkaConsumerDaggerSource
Meghajit Apr 19, 2022
a698cbb
test: add tests for ParquetDaggerSource
Meghajit Apr 19, 2022
93615e7
test: add tests for FlinkKafkaConsumerCustom
Meghajit Apr 19, 2022
bfed30b
feat: update constants and usages
Meghajit Apr 19, 2022
3809d3a
feat: return UNBOUNDED/KAFKA_CONSUMER as default SourceDetails when m…
Meghajit Apr 19, 2022
bd5006d
test: add and update tests for StreamManager
Meghajit Apr 19, 2022
4e2452c
Merge remote-tracking branch 'upstream/main' into feat/add-parquet-da…
Meghajit Apr 19, 2022
7d90975
feat: revert changes to local.properties
Meghajit Apr 19, 2022
05a0d7a
feat: delete unused files and tests
Meghajit Apr 19, 2022
2beddc0
feat: fix checkstyle formatting errors
Meghajit Apr 19, 2022
cb17f2c
feat: delete duplicate files and unused configs
Meghajit Apr 19, 2022
10f20ee
feat: add new interface method to parse from SimpleGroup
Meghajit Apr 20, 2022
2dfb01b
feat: remove redundant hadoop file path
Meghajit Apr 20, 2022
18ee55d
feat: refactor constructor of PrimitiveReader
Meghajit Apr 20, 2022
814e4dd
feat: remove unused initMocks
Meghajit Apr 20, 2022
f106599
test: use normal values instead of limits
Meghajit Apr 20, 2022
cc97d43
test: use normal values instead of limits
Meghajit Apr 20, 2022
26961c1
test: use normal values instead of limits
Meghajit Apr 20, 2022
26b77f6
test: use normal values instead of limits
Meghajit Apr 20, 2022
01bacb8
feat: remove unused constructor from StreamType.Builder
Meghajit Apr 20, 2022
4a82762
feat: rename StreamType to Stream
Meghajit Apr 21, 2022
fcac8d4
feat: rename KAFKA & PARQUET to KAFKA_SOURCE and PARQUET_SOURCE respe…
Meghajit Apr 21, 2022
a867b9e
feat: rename PrimitiveReader to ParquetReader
Meghajit Apr 21, 2022
3d29d9a
feat: change access to package-private for Stream constructor
Meghajit Apr 21, 2022
a939b4c
refactor: change package names
Meghajit Apr 21, 2022
c37bb83
refactor: move typehandler outside proto package
Meghajit Apr 21, 2022
7637420
refactor: rename handler classes under repeated package
Meghajit Apr 21, 2022
7d35c53
refactor: rename files
Meghajit Apr 21, 2022
0dbff61
refactor: rename files in primitive package
Meghajit Apr 21, 2022
cdcf558
refactor: fix checkstyle formatting errors
Meghajit Apr 21, 2022
21331f5
feat: make properties private
Meghajit Apr 21, 2022
b843098
Merge branch 'feat/add-parquet-data-source' into feat/rename-serde-fi…
Meghajit Apr 21, 2022
0c25068
feat: move initialization of RecordReader out of class constructor
Meghajit Apr 25, 2022
9f02b87
feat: refactor ParquetReader
Meghajit Apr 25, 2022
15da9d8
Merge branch 'feat/add-parquet-data-source' into feat/rename-serde-fi…
Meghajit Apr 25, 2022
49c555d
Merge remote-tracking branch 'upstream/dagger-parquet-file-processing…
Meghajit Apr 25, 2022
47cc2a6
Merge branch 'feat/add-parquet-data-source' into feat/rename-serde-fi…
Meghajit Apr 25, 2022
369b7bf
feat: remove keyword `type` from different handlers
Meghajit May 4, 2022
30146a5
feat: rename methods in TypeHandlerFactory
Meghajit May 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.odpf.dagger.common.serde.DaggerDeserializer;
import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.proto.deserialization.ProtoType;
import io.odpf.dagger.common.serde.proto.protohandler.RowFactory;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.parquet.example.data.simple.SimpleGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException;
import io.odpf.dagger.common.serde.proto.protohandler.RowFactory;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import io.odpf.dagger.common.serde.DaggerInternalTypeInformation;
import io.odpf.dagger.common.serde.proto.protohandler.TypeInformationFactory;
import io.odpf.dagger.common.serde.typehandler.TypeInformationFactory;

import java.io.Serializable;

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
import io.odpf.dagger.common.exceptions.serde.InvalidColumnMappingException;
import io.odpf.dagger.common.serde.proto.protohandler.ProtoHandler;
import io.odpf.dagger.common.serde.proto.protohandler.ProtoHandlerFactory;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -137,10 +137,10 @@ private DynamicMessage.Builder populateBuilder(DynamicMessage.Builder builder, D
if (fieldDescriptor == null) {
return builder;
}
ProtoHandler protoHandler = ProtoHandlerFactory.getProtoHandler(fieldDescriptor);
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
if (data != null) {
try {
builder = protoHandler.transformToProtoBuilder(builder, data);
builder = typeHandler.transformToProtoBuilder(builder, data);
} catch (IllegalArgumentException e) {
String protoType = fieldDescriptor.getType().toString();
if (fieldDescriptor.isRepeated()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package io.odpf.dagger.common.serde.proto.protohandler;
package io.odpf.dagger.common.serde.typehandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import io.odpf.dagger.common.exceptions.serde.InvalidDataTypeException;
import io.odpf.dagger.common.serde.proto.protohandler.typehandler.PrimitiveTypeHandlerFactory;
import io.odpf.dagger.common.serde.proto.protohandler.typehandler.PrimitiveTypeHandler;
import io.odpf.dagger.common.serde.typehandler.primitive.PrimitiveHandlerFactory;
import io.odpf.dagger.common.serde.typehandler.primitive.PrimitiveHandler;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;

/**
* The type Primitive proto handler.
*/
public class PrimitiveProtoHandler implements ProtoHandler {
public class PrimitiveTypeHandler implements TypeHandler {
private Descriptors.FieldDescriptor fieldDescriptor;

/**
* Instantiates a new Primitive proto handler.
*
* @param fieldDescriptor the field descriptor
*/
public PrimitiveProtoHandler(Descriptors.FieldDescriptor fieldDescriptor) {
public PrimitiveTypeHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}

Expand All @@ -40,9 +40,9 @@ public Object transformFromPostProcessor(Object field) {
}

private Object transform(Object field) {
PrimitiveTypeHandler primitiveTypeHandler = PrimitiveTypeHandlerFactory.getTypeHandler(fieldDescriptor);
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
try {
return primitiveTypeHandler.parseObject(field);
return primitiveHandler.parseObject(field);
} catch (NumberFormatException e) {
String errMessage = String.format("type mismatch of field: %s, expecting %s type, actual type %s", fieldDescriptor.getName(), fieldDescriptor.getType(), field.getClass());
throw new InvalidDataTypeException(errMessage);
Expand All @@ -56,8 +56,8 @@ public Object transformFromProto(Object field) {

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
PrimitiveTypeHandler primitiveTypeHandler = PrimitiveTypeHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveTypeHandler.parseSimpleGroup(simpleGroup);
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveHandler.parseSimpleGroup(simpleGroup);
}

@Override
Expand All @@ -67,8 +67,8 @@ public Object transformToJson(Object field) {

@Override
public TypeInformation getTypeInformation() {
PrimitiveTypeHandler primitiveTypeHandler = PrimitiveTypeHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveTypeHandler.getTypeInformation();
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveHandler.getTypeInformation();
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.common.serde.proto.protohandler;
package io.odpf.dagger.common.serde.typehandler;

import org.apache.flink.types.Row;

Expand Down Expand Up @@ -28,9 +28,9 @@ public static Row createRow(Map<String, Object> inputMap, Descriptors.Descriptor
return row;
}
for (FieldDescriptor fieldDescriptor : descriptorFields) {
ProtoHandler protoHandler = ProtoHandlerFactory.getProtoHandler(fieldDescriptor);
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
if (inputMap.get(fieldDescriptor.getName()) != null) {
row.setField(fieldDescriptor.getIndex(), protoHandler.transformFromPostProcessor(inputMap.get(fieldDescriptor.getName())));
row.setField(fieldDescriptor.getIndex(), typeHandler.transformFromPostProcessor(inputMap.get(fieldDescriptor.getName())));
}
}
return row;
Expand All @@ -47,8 +47,8 @@ public static Row createRow(DynamicMessage proto, int extraColumns) {
List<FieldDescriptor> descriptorFields = proto.getDescriptorForType().getFields();
Row row = new Row(descriptorFields.size() + extraColumns);
for (FieldDescriptor fieldDescriptor : descriptorFields) {
ProtoHandler protoHandler = ProtoHandlerFactory.getProtoHandler(fieldDescriptor);
row.setField(fieldDescriptor.getIndex(), protoHandler.transformFromProto(proto.getField(fieldDescriptor)));
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
row.setField(fieldDescriptor.getIndex(), typeHandler.transformFromProto(proto.getField(fieldDescriptor)));
}
return row;
}
Expand All @@ -57,8 +57,8 @@ public static Row createRow(Descriptors.Descriptor descriptor, SimpleGroup simpl
List<FieldDescriptor> descriptorFields = descriptor.getFields();
Row row = new Row(descriptorFields.size() + extraColumns);
for (FieldDescriptor fieldDescriptor : descriptorFields) {
ProtoHandler protoHandler = ProtoHandlerFactory.getProtoHandler(fieldDescriptor);
row.setField(fieldDescriptor.getIndex(), protoHandler.transformFromParquet(simpleGroup));
TypeHandler typeHandler = TypeHandlerFactory.getTypeHandler(fieldDescriptor);
row.setField(fieldDescriptor.getIndex(), typeHandler.transformFromParquet(simpleGroup));
}
return row;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.common.serde.proto.protohandler;
package io.odpf.dagger.common.serde.typehandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;

Expand All @@ -8,7 +8,7 @@
/**
* The interface Proto handler.
*/
public interface ProtoHandler {
public interface TypeHandler {
/**
* Can handle boolean.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.odpf.dagger.common.serde.typehandler;

import com.google.protobuf.Descriptors;
import io.odpf.dagger.common.serde.typehandler.complex.EnumHandler;
import io.odpf.dagger.common.serde.typehandler.complex.MapHandler;
import io.odpf.dagger.common.serde.typehandler.complex.MessageHandler;
import io.odpf.dagger.common.serde.typehandler.complex.StructMessageHandler;
import io.odpf.dagger.common.serde.typehandler.complex.TimestampHandler;
import io.odpf.dagger.common.serde.typehandler.repeated.RepeatedEnumHandler;
import io.odpf.dagger.common.serde.typehandler.repeated.RepeatedMessageHandler;
import io.odpf.dagger.common.serde.typehandler.repeated.RepeatedPrimitiveHandler;
import io.odpf.dagger.common.serde.typehandler.repeated.RepeatedStructMessageHandler;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* The factory class for Type handler.
*/
public class TypeHandlerFactory {
private static Map<String, TypeHandler> typeHandlerMap = new ConcurrentHashMap<>();

/**
* Gets type handler.
*
* @param fieldDescriptor the field descriptor
* @return the type handler
*/
public static TypeHandler getTypeHandler(final Descriptors.FieldDescriptor fieldDescriptor) {
return typeHandlerMap.computeIfAbsent(fieldDescriptor.getFullName(),
k -> getSpecificHandlers(fieldDescriptor).stream().filter(TypeHandler::canHandle)
.findFirst().orElseGet(() -> new PrimitiveTypeHandler(fieldDescriptor)));
}

/**
* Clear type handler map.
*/
protected static void clearTypeHandlerMap() {
typeHandlerMap.clear();
}

private static List<TypeHandler> getSpecificHandlers(Descriptors.FieldDescriptor fieldDescriptor) {
return Arrays.asList(
new MapHandler(fieldDescriptor),
new TimestampHandler(fieldDescriptor),
new EnumHandler(fieldDescriptor),
new StructMessageHandler(fieldDescriptor),
new RepeatedStructMessageHandler(fieldDescriptor),
new RepeatedPrimitiveHandler(fieldDescriptor),
new RepeatedMessageHandler(fieldDescriptor),
new RepeatedEnumHandler(fieldDescriptor),
new MessageHandler(fieldDescriptor)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package io.odpf.dagger.common.serde.proto.protohandler;
package io.odpf.dagger.common.serde.typehandler;

import com.google.protobuf.Descriptors;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

import com.google.protobuf.Descriptors;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;

/**
* The factory class for Type information.
*/
Expand All @@ -29,7 +28,7 @@ public static TypeInformation<Row> getRowType(Descriptors.Descriptor descriptor)
TypeInformation[] fieldTypes = descriptor
.getFields()
.stream()
.map(fieldDescriptor -> ProtoHandlerFactory.getProtoHandler(fieldDescriptor).getTypeInformation())
.map(fieldDescriptor -> TypeHandlerFactory.getTypeHandler(fieldDescriptor).getTypeInformation())
.toArray(TypeInformation[]::new);
return Types.ROW_NAMED(fieldNames, fieldTypes);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.odpf.dagger.common.serde.proto.protohandler;
package io.odpf.dagger.common.serde.typehandler.complex;

import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;

Expand All @@ -12,15 +13,15 @@
/**
* The type Enum proto handler.
*/
public class EnumProtoHandler implements ProtoHandler {
public class EnumHandler implements TypeHandler {
private Descriptors.FieldDescriptor fieldDescriptor;

/**
* Instantiates a new Enum proto handler.
*
* @param fieldDescriptor the field descriptor
*/
public EnumProtoHandler(Descriptors.FieldDescriptor fieldDescriptor) {
public EnumHandler(Descriptors.FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
}

Expand Down
Loading