Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
216 commits
Select commit Hold shift + click to select a range
b0a93f3
feat: add parquet source specific configs
Meghajit Feb 16, 2022
d27ce82
feat: extract enums into their own file
Meghajit Feb 17, 2022
099287c
feat: create a KafkaSource factory
Meghajit Feb 17, 2022
3419983
feat: create a common interface for all serializers
Meghajit Feb 17, 2022
2cba26a
feat: create a ParquetFileSource and its builder
Meghajit Feb 17, 2022
480603c
feat: add ParquetFileSourceFactory
Meghajit Feb 17, 2022
0e6a3bc
feat: add SourceFactory to create sources
Meghajit Feb 17, 2022
02867bd
feat: create skeleton for a basic parquet file reader
Meghajit Feb 18, 2022
c264de7
feat: create skeleton for two split assigners
Meghajit Feb 18, 2022
e9d0d4c
feat: create skeleton for ParquetFileRecordFormat
Meghajit Feb 18, 2022
56315b1
feat: add more methods in ParquetFileSourceFactory
Meghajit Feb 18, 2022
c82d0be
feat: add runtime dependency for parquet-column into dagger-common
Meghajit Feb 21, 2022
0ed5f41
feat: add an interface for parquet data type parser
Meghajit Feb 21, 2022
1a6f12d
feat: implement parquet parsers for some primitive data types
Meghajit Feb 21, 2022
d9ae93d
feat: handle null args for ParquetDataTypeParser
Meghajit Feb 22, 2022
7690b89
feat: implement parquet parser for float
Meghajit Feb 22, 2022
189fb7a
feat: implement parquet parsers for binary string
Meghajit Feb 22, 2022
b83aba3
feat: implement parquet parsers for double primitive type
Meghajit Feb 22, 2022
1bd9598
feat: implement parquet parsers for enum
Meghajit Feb 22, 2022
e73d736
feat: implement parquet parser for timestamp
Meghajit Feb 24, 2022
ab33372
feat: add check for missing logical type annotation
Meghajit Feb 24, 2022
d99e1ea
feat: add validation for missing logical type annotation
Meghajit Feb 24, 2022
b2fd657
feat: add a validation factory for parquet schema checks
Meghajit Feb 24, 2022
a3223a2
feat: add more checks to SimpleGroupValidation
Meghajit Feb 25, 2022
dc45553
feat: apply validations to ParquetBoolean parser
Meghajit Feb 28, 2022
2f96b4c
feat: apply validations in ParquetTimestampParser
Meghajit Feb 28, 2022
374f98c
feat: apply validations in ParquetInt64Parser
Meghajit Feb 28, 2022
c7d4ea7
feat: apply validations in ParquetInt32Parser
Meghajit Feb 28, 2022
cd2a143
feat: apply validations in ParquetFloatParser
Meghajit Feb 28, 2022
5b52214
feat: apply validations in ParquetDoubleParser
Meghajit Feb 28, 2022
3cc0828
feat: apply validations in ParquetEnumParser
Meghajit Feb 28, 2022
359cbef
feat: apply validations in ParquetStringParser
Meghajit Feb 28, 2022
bfde8a4
feat: implement parquet data type identifier
Meghajit Feb 28, 2022
45f9789
feat: add canHandle as an abstract method in ParquetDataTypeParser
Meghajit Feb 28, 2022
bc829d3
feat: change signature of ParquetDataTypeParser.getValueOrDefault()
Meghajit Feb 28, 2022
fde86ac
bugfix: use Objects.equals instead of ==`
Meghajit Feb 28, 2022
55ce9fc
refactor: opt to use safer Objects.equals
Meghajit Feb 28, 2022
1a243a5
feat: mark constructor as public for ParquetDataTypeID
Meghajit Mar 3, 2022
2e073bc
feat: add factory method for producing ParquetDataTypeParser
Meghajit Mar 3, 2022
8ccb516
feat: add method to deserialize from simple group in BooleanPrimitive…
Meghajit Mar 4, 2022
e82bea0
feat: add method to deserialize from simple group in DoublePrimitiveT…
Meghajit Mar 4, 2022
f70ee38
feat: add method to deserialize from simple group in FloatPrimitiveTy…
Meghajit Mar 4, 2022
a490dbd
feat: add method to deserialize from simple group in IntegerPrimitive…
Meghajit Mar 4, 2022
0c43843
feat: add method to deserialize from simple group in LongPrimitiveTyp…
Meghajit Mar 4, 2022
9b89593
feat: add method to deserialize from simple group in StringPrimitiveT…
Meghajit Mar 4, 2022
a2a8c2c
feat: add method to deserialize from simple group in ByteStringPrimit…
Meghajit Mar 4, 2022
a85b07e
feat: rename interface method and its usages
Meghajit Mar 6, 2022
17a7284
feat: add support for transforming simple group in EnumProtoHandler
Meghajit Mar 6, 2022
1f32b54
feat: add support for transforming simple group in TimestampProtoHandler
Meghajit Mar 6, 2022
d01fb95
feat: undo adding factory method to create ParquetDataTypeParser
Meghajit Mar 6, 2022
e0705be
feat: add factory method in SimpleGroupValidation
Meghajit Mar 6, 2022
960886c
feat: add a test proto for testing parquet source
Meghajit Mar 6, 2022
071122f
feat: support transforming of simple group in PrimitiveProtoHandler
Meghajit Mar 7, 2022
4c4dbe2
feat: add factory method to convert simple group to row
Meghajit Mar 7, 2022
5200fbd
feat: add a simple group deserializer
Meghajit Mar 8, 2022
9ba9c2c
feat: rename interface method to transformFromKafka
Meghajit Mar 8, 2022
255814e
feat: add and implement transformFromParquet
Meghajit Mar 8, 2022
4b375c6
feat: move SimpleGroupValidation to another package
Meghajit Mar 8, 2022
bec88e9
feat: delete io.odpf.dagger.common.serde.parquet.parser package
Meghajit Mar 8, 2022
20890f6
feat: call transformFromParquet instead in RowFactory method
Meghajit Mar 10, 2022
9a30c92
feat: throw exception for bounded kafka source
Meghajit Mar 11, 2022
c4a7651
feat: stop passing stencil to ParquetFileSource and its downstream co…
Meghajit Mar 11, 2022
7104a05
feat: throw exception for unbounded parquet source
Meghajit Mar 11, 2022
c847670
feat: add constructor to SourceDetails
Meghajit Mar 11, 2022
8b3bd8d
feat: make both JsonDeserializer and ProtoDeserializer implement Dagg…
Meghajit Mar 11, 2022
a28e7a7
feat: Add a deserializer factory
Meghajit Mar 11, 2022
651583f
feat: add new field and constructor to Stream POJO
Meghajit Mar 11, 2022
f2de48a
feat: add method to create data sources with deserializer in StreamBu…
Meghajit Mar 11, 2022
4cb77f7
feat: add a 2 argument constructor for StreamBuilder
Meghajit Mar 11, 2022
2ef46ec
feat: call StreamBuilder with stencil from ProtoDataStreamBuilder
Meghajit Mar 11, 2022
5360b14
feat: add tests to check cases related to new stream config SourceDet…
Meghajit Mar 11, 2022
91afc8f
feat: enable Mockito plugin for mocking of final methods
Meghajit Mar 11, 2022
6037afc
feat: move DeserializerFactory to another package and add tests
Meghajit Mar 11, 2022
a236b3b
feat: add a class for populating source specific stream metrics
Meghajit Mar 14, 2022
a8d3278
feat: delegate metrics away from StreamBuilder to StreamMetrics
Meghajit Mar 14, 2022
3f7582c
feat: create stream with source details from SourceFactory
Meghajit Mar 14, 2022
07e86c5
feat: migrate to using buildStream in StreamBuilder implementations
Meghajit Mar 14, 2022
09cb1af
feat: rename buildStream to build
Meghajit Mar 14, 2022
8375565
feat: add tests for StreamMetrics
Meghajit Mar 14, 2022
cd83662
feat: refactor StreamsFactory
Meghajit Mar 14, 2022
13cac7e
feat: refactor StreamMetricsTest
Meghajit Mar 14, 2022
69c5cb9
feat: add tests for DeserializerFactory
Meghajit Mar 14, 2022
2d566df
feat: stop sending StreamConfig to ParquetFileRecordFormat
Meghajit Mar 14, 2022
908a7aa
feat: cleanup test and edit exception message
Meghajit Mar 14, 2022
dc7cff9
feat: refactor SourceFactory and add tests
Meghajit Mar 14, 2022
0ff1de6
feat: refactor DeserializerFactory
Meghajit Mar 15, 2022
357a4f5
feat: refactor StreamBuilder
Meghajit Mar 15, 2022
5a03736
feat: add custom exception for simple group parsing failures
Meghajit Mar 15, 2022
56d1dcc
feat: throw SimpleGroupParsingException from SimpleGroupDeserializer
Meghajit Mar 15, 2022
0a64f11
feat: change exception thrown during simple group parsing in Timestam…
Meghajit Mar 15, 2022
e3321d8
feat: build FileSource from ParquetFileSource in SourceFactory
Meghajit Mar 15, 2022
95e334c
feat: add a Builder nested class inside ParquetFileRecordFormat
Meghajit Mar 15, 2022
437ecc1
feat: delete factories for kafka and parquet source
Meghajit Mar 15, 2022
0aaf743
feat: use Google precondition checks instead of flink ones
Meghajit Mar 15, 2022
dc1286c
feat: Create StreamType and its implementations
Meghajit Mar 29, 2022
057f46f
feat: make abstract methods public in StreamType and its implementations
Meghajit Mar 29, 2022
66727f8
feat: add tests for KafkaSourceJsonType
Meghajit Mar 29, 2022
c9b773a
feat: return StreamType from build method
Meghajit Mar 29, 2022
945d21c
feat: refactor tests
Meghajit Mar 29, 2022
8be0e78
feat: add tests for ParquetSourceProtoType
Meghajit Mar 29, 2022
cfa4990
feat: add tests for KafkaSourceProtoType
Meghajit Mar 29, 2022
0972a1f
feat: add a POJO for holding parquet file split with its instant
Meghajit Mar 30, 2022
cba471b
feat: implement chronology ordered split assigner and add tests
Meghajit Mar 30, 2022
9606844
feat: rename files and tests for streamtype
Meghajit Mar 30, 2022
b51fbaa
feat: move the builder class as a nested class inside ParquetFileSource
Meghajit Mar 30, 2022
c5ff670
feat: allow chronology ordered split assigner to be initialized with …
Meghajit Mar 30, 2022
ab992c3
feat: add tests for ParquetFileSource and add more validations
Meghajit Mar 30, 2022
e732769
feat: make provider class as nested class inside PrimitiveReader
Meghajit Mar 30, 2022
e372b31
feat: add tests for ParquetFileRecordFormat
Meghajit Mar 30, 2022
4830b1a
feat: add few tests for PrimitiveReader
Meghajit Mar 30, 2022
35736bb
feat: implement PrimitiveReader and add tests
Meghajit Mar 31, 2022
fadee0b
feat: throw runtime exception if error in creating parquet reader
Meghajit Mar 31, 2022
daebbfe
Merge remote-tracking branch 'origin/main' into feat/add-parquet-data…
Meghajit Apr 1, 2022
f2abffb
feat: make the type information provider and reader provider function…
Meghajit Apr 1, 2022
19612ed
feat: fix checkstyle formatting issues
Meghajit Apr 4, 2022
ae6bf6b
feat: fix checkstyle issues in split assigner
Meghajit Apr 4, 2022
d61f6e3
feat: fix checkstyle formatting issues in stream type impls
Meghajit Apr 4, 2022
6cad4ee
feat: make ParquetFileSource serializable
Meghajit Apr 4, 2022
a0cb47d
feat: remove serialization from PrimitiveReader
Meghajit Apr 4, 2022
c518d0f
feat: make SourceDetails and StreamType serializable
Meghajit Apr 4, 2022
bde26e2
feat: fix checkstyle formatting issues
Meghajit Apr 4, 2022
05763f9
feat: fix checkstyle formatting errors
Meghajit Apr 4, 2022
6d58b98
feat: return UNBOUNDED/KAFKA as default SourceDetails when not specified
Meghajit Apr 4, 2022
6523779
feat: return StreamType instead of Stream from StreamsFactory
Meghajit Apr 5, 2022
404f476
feat: delete Stream, StreamBuilder and its implementations
Meghajit Apr 5, 2022
5eb3b55
feat: delete SourceFactory,DeserializerFactory and StreamMetrics
Meghajit Apr 5, 2022
556ccec
feat: delete Mockito extension
Meghajit Apr 5, 2022
642a0ea
feat: rename KafkaSourceJsonSchema to KafkaSourceJsonSchemaStreamType
Meghajit Apr 5, 2022
0a44559
feat: rename KafkaSourceProtoSchema to KafkaSourceProtoSchemaStreamType
Meghajit Apr 5, 2022
514793d
feat: rename ParquetSourceProtoSchema to ParquetSourceProtoSchemaStre…
Meghajit Apr 5, 2022
e298aed
feat: throw runtime exception for unsupported split assigner
Meghajit Apr 5, 2022
5cae0f4
feat: return default timestamp when not set or present in simple group
Meghajit Apr 5, 2022
66dcf95
feat: delete unused methods from SimpleGroupValidation
Meghajit Apr 5, 2022
539dbd0
feat: inject file path regex into chronology split assigner from stre…
Meghajit Apr 5, 2022
f6cf116
feat: edit the exception message for regex no match in chronology spl…
Meghajit Apr 5, 2022
7b51bb6
Revert "feat: edit the exception message for regex no match in chrono…
Meghajit Apr 5, 2022
a2c0c5c
Revert "feat: inject file path regex into chronology split assigner f…
Meghajit Apr 5, 2022
1850a61
feat: rename functions of ProtoHandler as per review
Meghajit Apr 5, 2022
83489dd
feat: create row from millis inline instead of calling RowFactory
Meghajit Apr 5, 2022
4fa0986
feat: remove unused imports
Meghajit Apr 7, 2022
3877689
feat: add support for deprecated FlinkKafkaConsumer
Meghajit Apr 13, 2022
1714c4d
test: add tests for StreamsFactory
Meghajit Apr 18, 2022
18d2752
test: add tests for StreamType
Meghajit Apr 18, 2022
4034cba
test: add tests for DaggerDeserializerFactory
Meghajit Apr 19, 2022
0cef54c
test: add tests for DaggerSourceFactory
Meghajit Apr 19, 2022
ad796c5
test: add tests for JsonDeserializerProvider
Meghajit Apr 19, 2022
4a2fb73
test: add tests for ProtoDeserializerProvider
Meghajit Apr 19, 2022
711e95b
test: add tests for SimpleGroupDeserializerProvider
Meghajit Apr 19, 2022
9564169
test: add tests for KafkaDaggerSource
Meghajit Apr 19, 2022
8514ef0
test: add tests for FlinkKafkaConsumerDaggerSource
Meghajit Apr 19, 2022
a698cbb
test: add tests for ParquetDaggerSource
Meghajit Apr 19, 2022
93615e7
test: add tests for FlinkKafkaConsumerCustom
Meghajit Apr 19, 2022
bfed30b
feat: update constants and usages
Meghajit Apr 19, 2022
3809d3a
feat: return UNBOUNDED/KAFKA_CONSUMER as default SourceDetails when m…
Meghajit Apr 19, 2022
bd5006d
test: add and update tests for StreamManager
Meghajit Apr 19, 2022
4e2452c
Merge remote-tracking branch 'upstream/main' into feat/add-parquet-da…
Meghajit Apr 19, 2022
7d90975
feat: revert changes to local.properties
Meghajit Apr 19, 2022
05a0d7a
feat: delete unused files and tests
Meghajit Apr 19, 2022
2beddc0
feat: fix checkstyle formatting errors
Meghajit Apr 19, 2022
cb17f2c
feat: delete duplicate files and unused configs
Meghajit Apr 19, 2022
10f20ee
feat: add new interface method to parse from SimpleGroup
Meghajit Apr 20, 2022
2dfb01b
feat: remove redundant hadoop file path
Meghajit Apr 20, 2022
18ee55d
feat: refactor constructor of PrimitiveReader
Meghajit Apr 20, 2022
814e4dd
feat: remove unused initMocks
Meghajit Apr 20, 2022
f106599
test: use normal values instead of limits
Meghajit Apr 20, 2022
cc97d43
test: use normal values instead of limits
Meghajit Apr 20, 2022
26961c1
test: use normal values instead of limits
Meghajit Apr 20, 2022
26b77f6
test: use normal values instead of limits
Meghajit Apr 20, 2022
01bacb8
feat: remove unused constructor from StreamType.Builder
Meghajit Apr 20, 2022
4a82762
feat: rename StreamType to Stream
Meghajit Apr 21, 2022
fcac8d4
feat: rename KAFKA & PARQUET to KAFKA_SOURCE and PARQUET_SOURCE respe…
Meghajit Apr 21, 2022
a867b9e
feat: rename PrimitiveReader to ParquetReader
Meghajit Apr 21, 2022
3d29d9a
feat: change access to package-private for Stream constructor
Meghajit Apr 21, 2022
a939b4c
refactor: change package names
Meghajit Apr 21, 2022
c37bb83
refactor: move typehandler outside proto package
Meghajit Apr 21, 2022
7637420
refactor: rename handler classes under repeated package
Meghajit Apr 21, 2022
7d35c53
refactor: rename files
Meghajit Apr 21, 2022
0dbff61
refactor: rename files in primitive package
Meghajit Apr 21, 2022
cdcf558
refactor: fix checkstyle formatting errors
Meghajit Apr 21, 2022
21331f5
feat: make properties private
Meghajit Apr 21, 2022
b843098
Merge branch 'feat/add-parquet-data-source' into feat/rename-serde-fi…
Meghajit Apr 21, 2022
a273499
feat: add more enum values to TestEnumMessage
Meghajit Apr 22, 2022
ea3d4c0
feat: handle repeated enums for SimpleGroup deserialization
Meghajit Apr 22, 2022
0c25068
feat: move initialization of RecordReader out of class constructor
Meghajit Apr 25, 2022
2a91c1f
Merge branch 'feat/add-parquet-data-source' into feat/issue#100-parqu…
Meghajit Apr 25, 2022
9f02b87
feat: refactor ParquetReader
Meghajit Apr 25, 2022
c55c330
Merge branch 'feat/add-parquet-data-source' into feat/issue#100-parqu…
Meghajit Apr 25, 2022
15da9d8
Merge branch 'feat/add-parquet-data-source' into feat/rename-serde-fi…
Meghajit Apr 25, 2022
49c555d
Merge remote-tracking branch 'upstream/dagger-parquet-file-processing…
Meghajit Apr 25, 2022
47cc2a6
Merge branch 'feat/add-parquet-data-source' into feat/rename-serde-fi…
Meghajit Apr 25, 2022
58db910
Merge branch 'feat/add-parquet-data-source' into feat/issue#100-parqu…
Meghajit Apr 25, 2022
739cb02
feat: handle nested simple group deserialization
Meghajit Apr 25, 2022
e53d0db
feat: handle repeated simple group deserialization
Meghajit Apr 26, 2022
85c6547
refactor: rename getArray to parseObjectArray in PrimitiveHandler
Meghajit Apr 26, 2022
a53914f
refactor: rename parseObjectArray to parseRepeatedObjectField in Prim…
Meghajit Apr 26, 2022
3fa3de7
feat: add method to parse repeated primitive fields inside simple group
Meghajit Apr 26, 2022
f93731a
feat: add method to parse repeated boolean fields inside simple group
Meghajit Apr 26, 2022
98647f7
test: create a test proto for repeated bytes
Meghajit Apr 26, 2022
84e2800
feat: implement method to parse repeated bytes inside simple group
Meghajit Apr 26, 2022
043fcff
feat: implement method to parse repeated double inside simple group
Meghajit Apr 26, 2022
20dfd78
feat: implement method to parse repeated float inside simple group
Meghajit Apr 26, 2022
469cbb4
feat: implement method to parse repeated int inside simple group
Meghajit Apr 27, 2022
4fbae98
refactor: rename test function
Meghajit Apr 27, 2022
50b3096
feat: implement method to parse repeated long inside simple group
Meghajit Apr 27, 2022
f2be348
feat: implement method to parse repeated String inside simple group
Meghajit Apr 27, 2022
ec84d37
feat: implement transformFromParquet method in RepeatedPrimitiveTypeH…
Meghajit Apr 27, 2022
369b7bf
feat: remove keyword `type` from different handlers
Meghajit May 4, 2022
97260b8
feat: return Long[] instead of long[] for repeated int64 deserialization
Meghajit May 4, 2022
559d179
feat: specify type of array to be created for repeated message deseri…
Meghajit May 4, 2022
c35f9b3
Merge branch 'feat/rename-serde-files-and-packages' into feat/issue#1…
Meghajit May 5, 2022
30146a5
feat: rename methods in TypeHandlerFactory
Meghajit May 11, 2022
6908c87
Merge branch 'feat/rename-serde-files-and-packages' into feat/issue#1…
Meghajit May 11, 2022
256abb2
feat: rename test methods
Meghajit May 11, 2022
a906f3f
Merge branch 'dagger-parquet-file-processing' into feat/rename-serde-…
Meghajit May 12, 2022
564f330
Merge branch 'feat/rename-serde-files-and-packages' into feat/issue#1…
Meghajit May 12, 2022
e63086b
feat: rename test method
Meghajit May 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.odpf.dagger.common.serde.typehandler.complex;

import com.google.protobuf.Descriptors;
import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
Expand All @@ -24,6 +26,8 @@
public class MessageHandler implements TypeHandler {
private FieldDescriptor fieldDescriptor;
private JsonRowSerializationSchema jsonRowSerializationSchema;
private DynamicMessage defaultMessageInstance;
private Descriptors.Descriptor fieldMessageDescriptor;

/**
* Instantiates a new Message proto handler.
Expand All @@ -32,6 +36,10 @@ public class MessageHandler implements TypeHandler {
*/
public MessageHandler(FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
if (canHandle()) {
this.defaultMessageInstance = DynamicMessage.getDefaultInstance(fieldDescriptor.getMessageType());
this.fieldMessageDescriptor = fieldDescriptor.getMessageType();
}
}

@Override
Expand Down Expand Up @@ -74,7 +82,12 @@ public Object transformFromProto(Object field) {

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
SimpleGroup nestedGroup = (SimpleGroup) simpleGroup.getGroup(fieldName, 0);
return RowFactory.createRow(fieldMessageDescriptor, nestedGroup);
}
return RowFactory.createRow(defaultMessageInstance);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,28 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {
boolean[] inputValues = new boolean[0];
if (field != null) {
inputValues = Booleans.toArray((List<Boolean>) field);
}
return inputValues;
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
boolean[] booleanArray = new boolean[repetitionCount];
for (int i = 0; i < repetitionCount; i++) {
booleanArray[i] = simpleGroup.getBoolean(fieldName, i);
}
return booleanArray;
}
return new boolean[0];
}

@Override
public TypeInformation getTypeInformation() {
return Types.BOOLEAN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,28 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {
List<ByteString> inputValues = new ArrayList<>();
if (field != null) {
inputValues = (List<ByteString>) field;
}
return inputValues.toArray(new ByteString[]{});
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
ArrayList<ByteString> byteStringList = new ArrayList<>();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
for (int i = 0; i < repetitionCount; i++) {
byte[] byteArray = simpleGroup.getBinary(fieldName, i).getBytes();
byteStringList.add(ByteString.copyFrom(byteArray));
}
}
return byteStringList.toArray(new ByteString[]{});
}

@Override
public TypeInformation getTypeInformation() {
return TypeInformation.of(ByteString.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,28 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {
double[] inputValues = new double[0];
if (field != null) {
inputValues = Doubles.toArray((List<Double>) field);
}
return inputValues;
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
double[] doubleArray = new double[repetitionCount];
for (int i = 0; i < repetitionCount; i++) {
doubleArray[i] = simpleGroup.getDouble(fieldName, i);
}
return doubleArray;
}
return new double[0];
}

@Override
public TypeInformation getTypeInformation() {
return Types.DOUBLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {

float[] inputValues = new float[0];
if (field != null) {
Expand All @@ -58,6 +58,20 @@ public Object getArray(Object field) {
return inputValues;
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
float[] floatArray = new float[repetitionCount];
for (int i = 0; i < repetitionCount; i++) {
floatArray[i] = simpleGroup.getFloat(fieldName, i);
}
return floatArray;
}
return new float[0];
}

@Override
public TypeInformation getTypeInformation() {
return Types.FLOAT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,28 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {
int[] inputValues = new int[0];
if (field != null) {
inputValues = Ints.toArray((List<Integer>) field);
}
return inputValues;
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
int[] intArray = new int[repetitionCount];
for (int i = 0; i < repetitionCount; i++) {
intArray[i] = simpleGroup.getInteger(fieldName, i);
}
return intArray;
}
return new int[0];
}

@Override
public TypeInformation getTypeInformation() {
return Types.INT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,27 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {
List<Long> inputValues = new ArrayList<>();
if (field != null) {
inputValues = (List<Long>) field;
}
return inputValues.toArray(new Long[]{});
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
ArrayList<Long> deserializedValues = new ArrayList<>();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
for (int i = 0; i < repetitionCount; i++) {
deserializedValues.add(simpleGroup.getLong(fieldName, i));
}
}
return deserializedValues.toArray(new Long[]{});
}

@Override
public TypeInformation getTypeInformation() {
return Types.LONG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,21 @@ public interface PrimitiveHandler {
Object parseSimpleGroup(SimpleGroup simpleGroup);

/**
* Gets array.
* Parses a repeated object into an array of Java primitives.
*
* @param field the field
* @return the array
* @param field the object array
* @return array of java primitive values as obtained from the repeated object
*/
Object getArray(Object field);
Object parseRepeatedObjectField(Object field);

/**
* Extracts the specific repeated primitive field from a SimpleGroup object and
* transforms it into an array of Java primitives.
*
* @param simpleGroup SimpleGroup object inside which the repeated primitive field resides
* @return array of java primitive values as obtained from the repeated primitive field
*/
Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup);

/**
* Gets type information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,28 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) {
}

@Override
public Object getArray(Object field) {
public Object parseRepeatedObjectField(Object field) {
List<String> inputValues = new ArrayList<>();
if (field != null) {
inputValues = (List<String>) field;
}
return inputValues.toArray(new String[]{});
}

@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
String[] stringArray = new String[repetitionCount];
for (int i = 0; i < repetitionCount; i++) {
stringArray[i] = simpleGroup.getString(fieldName, i);
}
return stringArray;
}
return new String[0];
}

@Override
public TypeInformation getTypeInformation() {
return Types.STRING;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.odpf.dagger.common.serde.typehandler.repeated;

import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -54,7 +55,19 @@ public Object transformFromProto(Object field) {

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
String defaultEnumValue = fieldDescriptor.getEnumType().findValueByNumber(0).getName();
List<String> deserializedEnumArray = new ArrayList<>();
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
for (int positionIndex = 0; positionIndex < repetitionCount; positionIndex++) {
String extractedValue = simpleGroup.getString(fieldName, positionIndex);
Descriptors.EnumValueDescriptor enumValueDescriptor = fieldDescriptor.getEnumType().findValueByName(extractedValue);
String enumValue = enumValueDescriptor == null ? defaultEnumValue : enumValueDescriptor.getName();
deserializedEnumArray.add(enumValue);
}
}
return deserializedEnumArray.toArray(new String[]{});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.odpf.dagger.common.serde.typehandler.repeated;

import com.google.protobuf.Descriptors;
import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
Expand Down Expand Up @@ -28,6 +30,7 @@
public class RepeatedMessageHandler implements TypeHandler {
private JsonRowSerializationSchema jsonRowSerializationSchema;
private FieldDescriptor fieldDescriptor;
private Descriptors.Descriptor fieldMessageDescriptor;

/**
* Instantiates a new Repeated message proto handler.
Expand All @@ -36,6 +39,9 @@ public class RepeatedMessageHandler implements TypeHandler {
*/
public RepeatedMessageHandler(FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
if (canHandle()) {
this.fieldMessageDescriptor = fieldDescriptor.getMessageType();
}
}

@Override
Expand Down Expand Up @@ -90,7 +96,16 @@ public Object transformFromProto(Object field) {

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
String fieldName = fieldDescriptor.getName();
ArrayList<Row> rowList = new ArrayList<>();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
for (int i = 0; i < repetitionCount; i++) {
SimpleGroup nestedGroup = (SimpleGroup) simpleGroup.getGroup(fieldName, i);
rowList.add(RowFactory.createRow(fieldMessageDescriptor, nestedGroup));
}
}
return rowList.toArray(new Row[]{});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ public Object transformFromPostProcessor(Object field) {
@Override
public Object transformFromProto(Object field) {
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveHandler.getArray(field);
return primitiveHandler.parseRepeatedObjectField(field);
}

@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
return null;
PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor);
return primitiveHandler.parseRepeatedSimpleGroupField(simpleGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void shouldThrowInvalidDataTypeExceptionInCaseOfTypeMismatchForPostProces
}

@Test
public void shouldReturnSameValueForTransformFromKafka() {
public void shouldReturnSameValueForTransformFromProto() {
Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor();
Descriptors.FieldDescriptor stringFieldDescriptor = descriptor.findFieldByName("order_number");
PrimitiveTypeHandler primitiveTypeHandler = new PrimitiveTypeHandler(stringFieldDescriptor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void shouldReturnTypeInformation() {
}

@Test
public void shouldTransformValueForKafka() {
public void shouldTransformValueFromProto() {
Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor();
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName("status");
EnumHandler enumHandler = new EnumHandler(fieldDescriptor);
Expand Down
Loading