diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandler.java index f21d1be0f..009ffcae7 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandler.java @@ -1,5 +1,7 @@ package io.odpf.dagger.common.serde.typehandler.complex; +import com.google.protobuf.Descriptors; +import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation; import io.odpf.dagger.common.serde.typehandler.TypeHandler; import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory; import io.odpf.dagger.common.serde.typehandler.RowFactory; @@ -24,6 +26,8 @@ public class MessageHandler implements TypeHandler { private FieldDescriptor fieldDescriptor; private JsonRowSerializationSchema jsonRowSerializationSchema; + private DynamicMessage defaultMessageInstance; + private Descriptors.Descriptor fieldMessageDescriptor; /** * Instantiates a new Message proto handler. @@ -32,6 +36,10 @@ public class MessageHandler implements TypeHandler { */ public MessageHandler(FieldDescriptor fieldDescriptor) { this.fieldDescriptor = fieldDescriptor; + if (canHandle()) { + this.defaultMessageInstance = DynamicMessage.getDefaultInstance(fieldDescriptor.getMessageType()); + this.fieldMessageDescriptor = fieldDescriptor.getMessageType(); + } } @Override @@ -74,7 +82,12 @@ public Object transformFromProto(Object field) { @Override public Object transformFromParquet(SimpleGroup simpleGroup) { - return null; + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + SimpleGroup nestedGroup = (SimpleGroup) simpleGroup.getGroup(fieldName, 0); + return RowFactory.createRow(fieldMessageDescriptor, nestedGroup); + } + return RowFactory.createRow(defaultMessageInstance); } @Override diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandler.java index 3184be536..ed8de1997 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandler.java @@ -49,7 +49,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { boolean[] inputValues = new boolean[0]; if (field != null) { inputValues = Booleans.toArray((List) field); @@ -57,6 +57,20 @@ public Object getArray(Object field) { return inputValues; } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + boolean[] booleanArray = new boolean[repetitionCount]; + for (int i = 0; i < repetitionCount; i++) { + booleanArray[i] = simpleGroup.getBoolean(fieldName, i); + } + return booleanArray; + } + return new boolean[0]; + } + @Override public TypeInformation getTypeInformation() { return Types.BOOLEAN; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandler.java index 29413fc1f..dabc2c51e 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandler.java @@ -51,7 +51,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { List inputValues = new ArrayList<>(); if (field != null) { inputValues = (List) field; @@ -59,6 +59,20 @@ public Object getArray(Object field) { return inputValues.toArray(new ByteString[]{}); } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + ArrayList byteStringList = new ArrayList<>(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + for (int i = 0; i < repetitionCount; i++) { + byte[] byteArray = simpleGroup.getBinary(fieldName, i).getBytes(); + byteStringList.add(ByteString.copyFrom(byteArray)); + } + } + return byteStringList.toArray(new ByteString[]{}); + } + @Override public TypeInformation getTypeInformation() { return TypeInformation.of(ByteString.class); diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandler.java index ef6644e9d..37f77d520 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandler.java @@ -49,7 +49,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { double[] inputValues = new double[0]; if (field != null) { inputValues = Doubles.toArray((List) field); @@ -57,6 +57,20 @@ public Object getArray(Object field) { return inputValues; } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + double[] doubleArray = new double[repetitionCount]; + for (int i = 0; i < repetitionCount; i++) { + doubleArray[i] = simpleGroup.getDouble(fieldName, i); + } + return doubleArray; + } + return new double[0]; + } + @Override public TypeInformation getTypeInformation() { return Types.DOUBLE; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandler.java index 0b7f54b04..b3f69dc8a 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandler.java @@ -49,7 +49,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { float[] inputValues = new float[0]; if (field != null) { @@ -58,6 +58,20 @@ public Object getArray(Object field) { return inputValues; } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + float[] floatArray = new float[repetitionCount]; + for (int i = 0; i < repetitionCount; i++) { + floatArray[i] = simpleGroup.getFloat(fieldName, i); + } + return floatArray; + } + return new float[0]; + } + @Override public TypeInformation getTypeInformation() { return Types.FLOAT; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandler.java index 6fdf41db8..cdaf675a5 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandler.java @@ -49,7 +49,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { int[] inputValues = new int[0]; if (field != null) { inputValues = Ints.toArray((List) field); @@ -57,6 +57,20 @@ public Object getArray(Object field) { return inputValues; } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + int[] intArray = new int[repetitionCount]; + for (int i = 0; i < repetitionCount; i++) { + intArray[i] = simpleGroup.getInteger(fieldName, i); + } + return intArray; + } + return new int[0]; + } + @Override public TypeInformation getTypeInformation() { return Types.INT; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java index 464a99dcb..c6fa879eb 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java @@ -49,7 +49,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { List inputValues = new ArrayList<>(); if (field != null) { inputValues = (List) field; @@ -57,6 +57,19 @@ public Object getArray(Object field) { return inputValues.toArray(new Long[]{}); } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + ArrayList deserializedValues = new ArrayList<>(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + for (int i = 0; i < repetitionCount; i++) { + deserializedValues.add(simpleGroup.getLong(fieldName, i)); + } + } + return deserializedValues.toArray(new Long[]{}); + } + @Override public TypeInformation getTypeInformation() { return Types.LONG; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/PrimitiveHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/PrimitiveHandler.java index b9080a208..fb517d702 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/PrimitiveHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/PrimitiveHandler.java @@ -31,12 +31,21 @@ public interface PrimitiveHandler { Object parseSimpleGroup(SimpleGroup simpleGroup); /** - * Gets array. + * Parses a repeated object into an array of Java primitives. * - * @param field the field - * @return the array + * @param field the object array + * @return array of java primitive values as obtained from the repeated object */ - Object getArray(Object field); + Object parseRepeatedObjectField(Object field); + + /** + * Extracts the specific repeated primitive field from a SimpleGroup object and + * transforms it into an array of Java primitives. + * + * @param simpleGroup SimpleGroup object inside which the repeated primitive field resides + * @return array of java primitive values as obtained from the repeated primitive field + */ + Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup); /** * Gets type information. diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandler.java index 6482d3411..37483281e 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandler.java @@ -50,7 +50,7 @@ public Object parseSimpleGroup(SimpleGroup simpleGroup) { } @Override - public Object getArray(Object field) { + public Object parseRepeatedObjectField(Object field) { List inputValues = new ArrayList<>(); if (field != null) { inputValues = (List) field; @@ -58,6 +58,20 @@ public Object getArray(Object field) { return inputValues.toArray(new String[]{}); } + @Override + public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) { + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + String[] stringArray = new String[repetitionCount]; + for (int i = 0; i < repetitionCount; i++) { + stringArray[i] = simpleGroup.getString(fieldName, i); + } + return stringArray; + } + return new String[0]; + } + @Override public TypeInformation getTypeInformation() { return Types.STRING; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java index 5fecaf5d6..b3617dcde 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java @@ -1,5 +1,6 @@ package io.odpf.dagger.common.serde.typehandler.repeated; +import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation; import io.odpf.dagger.common.serde.typehandler.TypeHandler; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -54,7 +55,19 @@ public Object transformFromProto(Object field) { @Override public Object transformFromParquet(SimpleGroup simpleGroup) { - return null; + String defaultEnumValue = fieldDescriptor.getEnumType().findValueByNumber(0).getName(); + List deserializedEnumArray = new ArrayList<>(); + String fieldName = fieldDescriptor.getName(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + for (int positionIndex = 0; positionIndex < repetitionCount; positionIndex++) { + String extractedValue = simpleGroup.getString(fieldName, positionIndex); + Descriptors.EnumValueDescriptor enumValueDescriptor = fieldDescriptor.getEnumType().findValueByName(extractedValue); + String enumValue = enumValueDescriptor == null ? defaultEnumValue : enumValueDescriptor.getName(); + deserializedEnumArray.add(enumValue); + } + } + return deserializedEnumArray.toArray(new String[]{}); } @Override diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandler.java index cfb0187b6..cfed9e93b 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandler.java @@ -1,5 +1,7 @@ package io.odpf.dagger.common.serde.typehandler.repeated; +import com.google.protobuf.Descriptors; +import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation; import io.odpf.dagger.common.serde.typehandler.TypeHandler; import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory; import io.odpf.dagger.common.serde.typehandler.RowFactory; @@ -28,6 +30,7 @@ public class RepeatedMessageHandler implements TypeHandler { private JsonRowSerializationSchema jsonRowSerializationSchema; private FieldDescriptor fieldDescriptor; + private Descriptors.Descriptor fieldMessageDescriptor; /** * Instantiates a new Repeated message proto handler. @@ -36,6 +39,9 @@ public class RepeatedMessageHandler implements TypeHandler { */ public RepeatedMessageHandler(FieldDescriptor fieldDescriptor) { this.fieldDescriptor = fieldDescriptor; + if (canHandle()) { + this.fieldMessageDescriptor = fieldDescriptor.getMessageType(); + } } @Override @@ -90,7 +96,16 @@ public Object transformFromProto(Object field) { @Override public Object transformFromParquet(SimpleGroup simpleGroup) { - return null; + String fieldName = fieldDescriptor.getName(); + ArrayList rowList = new ArrayList<>(); + if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) { + int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName); + for (int i = 0; i < repetitionCount; i++) { + SimpleGroup nestedGroup = (SimpleGroup) simpleGroup.getGroup(fieldName, i); + rowList.add(RowFactory.createRow(fieldMessageDescriptor, nestedGroup)); + } + } + return rowList.toArray(new Row[]{}); } @Override diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandler.java index 88944c3db..52b2760ed 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandler.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandler.java @@ -66,12 +66,13 @@ public Object transformFromPostProcessor(Object field) { @Override public Object transformFromProto(Object field) { PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor); - return primitiveHandler.getArray(field); + return primitiveHandler.parseRepeatedObjectField(field); } @Override public Object transformFromParquet(SimpleGroup simpleGroup) { - return null; + PrimitiveHandler primitiveHandler = PrimitiveHandlerFactory.getTypeHandler(fieldDescriptor); + return primitiveHandler.parseRepeatedSimpleGroupField(simpleGroup); } @Override diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/PrimitiveTypeHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/PrimitiveTypeHandlerTest.java index d055e4f2e..6effa7297 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/PrimitiveTypeHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/PrimitiveTypeHandlerTest.java @@ -94,7 +94,7 @@ public void shouldThrowInvalidDataTypeExceptionInCaseOfTypeMismatchForPostProces } @Test - public void shouldReturnSameValueForTransformFromKafka() { + public void shouldReturnSameValueForTransformFromProto() { Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor(); Descriptors.FieldDescriptor stringFieldDescriptor = descriptor.findFieldByName("order_number"); PrimitiveTypeHandler primitiveTypeHandler = new PrimitiveTypeHandler(stringFieldDescriptor); diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/EnumHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/EnumHandlerTest.java index 502622321..750b9d064 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/EnumHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/EnumHandlerTest.java @@ -150,7 +150,7 @@ public void shouldReturnTypeInformation() { } @Test - public void shouldTransformValueForKafka() { + public void shouldTransformValueFromProto() { Descriptors.Descriptor descriptor = TestBookingLogMessage.getDescriptor(); Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName("status"); EnumHandler enumHandler = new EnumHandler(fieldDescriptor); diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandlerTest.java index e66ac404b..48faff3da 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MessageHandlerTest.java @@ -12,10 +12,13 @@ import io.odpf.dagger.consumer.TestPaymentOptionMetadata; import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.PrimitiveType; import org.junit.Test; import java.util.HashMap; +import static org.apache.parquet.schema.Types.buildMessage; +import static org.apache.parquet.schema.Types.requiredGroup; import static org.junit.Assert.*; public class MessageHandlerTest { @@ -23,36 +26,36 @@ public class MessageHandlerTest { @Test public void shouldReturnTrueIfMessageFieldDescriptorIsPassed() { Descriptors.FieldDescriptor messageFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("payment_option_metadata"); - MessageHandler messsageProtoHandler = new MessageHandler(messageFieldDescriptor); + MessageHandler messageHandler = new MessageHandler(messageFieldDescriptor); - assertTrue(messsageProtoHandler.canHandle()); + assertTrue(messageHandler.canHandle()); } @Test public void shouldReturnFalseIfFieldDescriptorOtherThanMessageTypeIsPassed() { Descriptors.FieldDescriptor otherFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); - MessageHandler messsageProtoHandler = new MessageHandler(otherFieldDescriptor); + MessageHandler messageHandler = new MessageHandler(otherFieldDescriptor); - assertFalse(messsageProtoHandler.canHandle()); + assertFalse(messageHandler.canHandle()); } @Test public void shouldReturnTheSameBuilderWithoutSettingFieldIfCanNotHandle() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); - MessageHandler messsageProtoHandler = new MessageHandler(fieldDescriptor); + MessageHandler messageHandler = new MessageHandler(fieldDescriptor); DynamicMessage.Builder builder = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()); - assertEquals(builder, messsageProtoHandler.transformToProtoBuilder(builder, 123)); - assertEquals("", messsageProtoHandler.transformToProtoBuilder(builder, 123).getField(fieldDescriptor)); + assertEquals(builder, messageHandler.transformToProtoBuilder(builder, 123)); + assertEquals("", messageHandler.transformToProtoBuilder(builder, 123).getField(fieldDescriptor)); } @Test public void shouldReturnTheSameBuilderWithoutSettingFieldIfNullPassed() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); - MessageHandler messsageProtoHandler = new MessageHandler(fieldDescriptor); + MessageHandler messageHandler = new MessageHandler(fieldDescriptor); DynamicMessage.Builder builder = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()); - DynamicMessage.Builder outputBuilder = messsageProtoHandler.transformToProtoBuilder(builder, null); + DynamicMessage.Builder outputBuilder = messageHandler.transformToProtoBuilder(builder, null); assertEquals(builder, outputBuilder); assertEquals("", outputBuilder.getField(fieldDescriptor)); } @@ -137,7 +140,7 @@ public void shouldReturnEmptyRowIfNullPassedForTransformForPostProcessor() { } @Test - public void shouldReturnRowGivenAMapForFieldDescriptorOfTypeMessageIfAllValueArePassedForTransformForKafka() throws InvalidProtocolBufferException { + public void shouldReturnRowGivenAMapForFieldDescriptorOfTypeMessageIfAllValueArePassedForTransformFromProto() throws InvalidProtocolBufferException { TestBookingLogMessage bookingLogMessage = TestBookingLogMessage .newBuilder() .setPaymentOptionMetadata(TestPaymentOptionMetadata.newBuilder().setMaskedCard("test1").setNetwork("test2").build()) @@ -155,7 +158,7 @@ public void shouldReturnRowGivenAMapForFieldDescriptorOfTypeMessageIfAllValueAre } @Test - public void shouldReturnRowGivenAMapForFieldDescriptorOfTypeMessageIfAllValueAreNotPassedForTransformForKafka() throws InvalidProtocolBufferException { + public void shouldReturnRowGivenAMapForFieldDescriptorOfTypeMessageIfAllValueAreNotPassedForTransformFromProto() throws InvalidProtocolBufferException { TestBookingLogMessage bookingLogMessage = TestBookingLogMessage .newBuilder() .setPaymentOptionMetadata(TestPaymentOptionMetadata.newBuilder().setMaskedCard("test1").build()) @@ -196,13 +199,73 @@ public void shouldConvertComplexRowDataToJsonString() { } @Test - public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() { + public void shouldReturnRowContainingAllFieldsWhenTransformFromParquetIsCalledWithANestedSimpleGroup() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("payment_option_metadata"); - MessageHandler protoHandler = new MessageHandler(fieldDescriptor); - GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup() - .named("TestGroupType"); - SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + MessageHandler messageHandler = new MessageHandler(fieldDescriptor); + + GroupType nestedGroupSchema = requiredGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("masked_card") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("network") + .named("payment_option_metadata"); + SimpleGroup nestedGroup = new SimpleGroup(nestedGroupSchema); + nestedGroup.add("masked_card", "4567XXXX1234"); + nestedGroup.add("network", "4G"); + + GroupType mainMessageSchema = buildMessage().addField(nestedGroupSchema).named("MainMessage"); + SimpleGroup mainMessageGroup = new SimpleGroup(mainMessageSchema); + mainMessageGroup.add("payment_option_metadata", nestedGroup); + + Row row = (Row) messageHandler.transformFromParquet(mainMessageGroup); + + assertEquals(2, row.getArity()); + assertEquals("4567XXXX1234", row.getField(0)); + assertEquals("4G", row.getField(1)); + } + + @Test + public void shouldReturnRowContainingDefaultValuesForFieldsWhenTransformFromParquetIsCalledWithUninitializedNestedSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("payment_option_metadata"); + MessageHandler messageHandler = new MessageHandler(fieldDescriptor); + + GroupType nestedGroupSchema = requiredGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("masked_card") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("network") + .named("payment_option_metadata"); + + GroupType mainMessageSchema = buildMessage().addField(nestedGroupSchema).named("MainMessage"); + SimpleGroup mainMessageGroup = new SimpleGroup(mainMessageSchema); + + Row row = (Row) messageHandler.transformFromParquet(mainMessageGroup); + + assertEquals(2, row.getArity()); + assertEquals("", row.getField(0)); + assertEquals("", row.getField(1)); + } + + @Test + public void shouldReturnRowContainingDefaultValuesForFieldsWhenTransformFromParquetIsCalledWithMissingNestedSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("payment_option_metadata"); + MessageHandler messageHandler = new MessageHandler(fieldDescriptor); + + GroupType mainMessageSchema = buildMessage().named("MainMessage"); + SimpleGroup mainMessageGroup = new SimpleGroup(mainMessageSchema); + + Row row = (Row) messageHandler.transformFromParquet(mainMessageGroup); + + assertEquals(2, row.getArity()); + assertEquals("", row.getField(0)); + assertEquals("", row.getField(1)); + } + + @Test + public void shouldReturnRowContainingDefaultValuesForFieldsWhenTransformFromParquetIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("payment_option_metadata"); + MessageHandler messageHandler = new MessageHandler(fieldDescriptor); + + Row row = (Row) messageHandler.transformFromParquet(null); - assertNull(protoHandler.transformFromParquet(simpleGroup)); + assertEquals(2, row.getArity()); + assertEquals("", row.getField(0)); + assertEquals("", row.getField(1)); } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandlerTest.java index a47cc9598..3a71862a9 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/BooleanHandlerTest.java @@ -12,6 +12,7 @@ import java.util.Arrays; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -86,7 +87,7 @@ public void shouldReturnArrayValues() { BooleanHandler booleanHandler = new BooleanHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList(true, false, false)); - Object actualValues = booleanHandler.getArray(inputValues); + Object actualValues = booleanHandler.parseRepeatedObjectField(inputValues); assertArrayEquals(new boolean[]{true, false, false}, (boolean[]) actualValues); } @@ -96,7 +97,7 @@ public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("customer_dynamic_surge_enabled"); BooleanHandler booleanHandler = new BooleanHandler(fieldDescriptor); - Object actualValues = booleanHandler.getArray(null); + Object actualValues = booleanHandler.parseRepeatedObjectField(null); assertEquals(0, ((boolean[]) actualValues).length); } @@ -144,4 +145,61 @@ public void shouldFetchDefaultValueIfFieldNotInitializedWithAValueInSimpleGroup( assertEquals(false, actualValue); } + + @Test + public void shouldReturnArrayOfJavaBooleanValuesForFieldOfTypeRepeatedBooleanInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("boolean_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("boolean_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + simpleGroup.add("boolean_array_field", true); + simpleGroup.add("boolean_array_field", false); + + BooleanHandler booleanHandler = new BooleanHandler(fieldDescriptor); + boolean[] actualValue = (boolean[]) booleanHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new boolean[]{true, false}, actualValue); + } + + @Test + public void shouldReturnEmptyBooleanArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("boolean_array_field"); + + BooleanHandler booleanHandler = new BooleanHandler(fieldDescriptor); + boolean[] actualValue = (boolean[]) booleanHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new boolean[0], actualValue); + } + + @Test + public void shouldReturnEmptyBooleanArrayWhenRepeatedFieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("boolean_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + BooleanHandler booleanHandler = new BooleanHandler(fieldDescriptor); + boolean[] actualValue = (boolean[]) booleanHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new boolean[0], actualValue); + } + + @Test + public void shouldReturnEmptyBooleanArrayWhenRepeatedFieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("boolean_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("boolean_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + BooleanHandler booleanHandler = new BooleanHandler(fieldDescriptor); + boolean[] actualValue = (boolean[]) booleanHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new boolean[0], actualValue); + } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandlerTest.java index cb5516a5e..88f05fa6a 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/ByteStringHandlerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.common.serde.typehandler.primitive; +import io.odpf.dagger.consumer.TestRepeatedPrimitiveMessage; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -15,6 +16,8 @@ import java.util.Arrays; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.*; public class ByteStringHandlerTest { @@ -62,7 +65,7 @@ public void shouldReturnArrayValues() { Descriptors.FieldDescriptor fieldDescriptor = TestMessageEnvelope.getDescriptor().findFieldByName("log_key"); ByteStringHandler byteStringHandler = new ByteStringHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList(ByteString.copyFromUtf8("test1"), ByteString.copyFromUtf8("test2"))); - Object actualValues = byteStringHandler.getArray(inputValues); + Object actualValues = byteStringHandler.parseRepeatedObjectField(inputValues); assertArrayEquals(inputValues.toArray(), (ByteString[]) actualValues); } @@ -70,7 +73,7 @@ public void shouldReturnArrayValues() { public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestMessageEnvelope.getDescriptor().findFieldByName("log_key"); ByteStringHandler byteStringHandler = new ByteStringHandler(fieldDescriptor); - Object actualValues = byteStringHandler.getArray(null); + Object actualValues = byteStringHandler.parseRepeatedObjectField(null); assertEquals(0, ((ByteString[]) actualValues).length); } @@ -119,4 +122,67 @@ public void shouldReturnNullIfFieldNotInitializedWithAValueInSimpleGroup() { assertNull(actualValue); } + + @Test + public void shouldReturnArrayOfByteStringValuesForFieldOfTypeRepeatedBinaryInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedPrimitiveMessage.getDescriptor().findFieldByName("metadata_bytes"); + + String testString1 = "useful-metadata-string"; + String testString2 = "another-metadata-string"; + ByteString expectedByteString1 = ByteString.copyFrom(testString1.getBytes()); + ByteString expectedByteString2 = ByteString.copyFrom(testString2.getBytes()); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("metadata_bytes") + .named("TestRepeatedPrimitiveMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + simpleGroup.add("metadata_bytes", Binary.fromConstantByteArray(expectedByteString1.toByteArray())); + simpleGroup.add("metadata_bytes", Binary.fromConstantByteArray(expectedByteString2.toByteArray())); + + ByteStringHandler byteStringHandler = new ByteStringHandler(fieldDescriptor); + ByteString[] actualValue = (ByteString[]) byteStringHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new ByteString[]{expectedByteString1, expectedByteString2}, actualValue); + } + + @Test + public void shouldReturnEmptyByteStringArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedPrimitiveMessage.getDescriptor().findFieldByName("metadata_bytes"); + + ByteStringHandler byteStringHandler = new ByteStringHandler(fieldDescriptor); + ByteString[] actualValue = (ByteString[]) byteStringHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new ByteString[0], actualValue); + } + + @Test + public void shouldReturnEmptyByteStringArrayWhenRepeatedFieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedPrimitiveMessage.getDescriptor().findFieldByName("metadata_bytes"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_field") + .named("TestRepeatedPrimitiveMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + ByteStringHandler byteStringHandler = new ByteStringHandler(fieldDescriptor); + ByteString[] actualValue = (ByteString[]) byteStringHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new ByteString[0], actualValue); + } + + @Test + public void shouldReturnEmptyByteStringArrayWhenRepeatedFieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedPrimitiveMessage.getDescriptor().findFieldByName("metadata_bytes"); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("metadata_bytes") + .named("TestRepeatedPrimitiveMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + ByteStringHandler byteStringHandler = new ByteStringHandler(fieldDescriptor); + ByteString[] actualValue = (ByteString[]) byteStringHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new ByteString[0], actualValue); + } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandlerTest.java index 6892b4327..d6cff55d8 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/DoubleHandlerTest.java @@ -10,7 +10,9 @@ import java.util.ArrayList; import java.util.Arrays; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.*; public class DoubleHandlerTest { @@ -79,7 +81,7 @@ public void shouldReturnArrayValues() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("cash_amount"); DoubleHandler doubleHandler = new DoubleHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList(1D, 2D, 3D)); - double[] actualValues = (double[]) doubleHandler.getArray(inputValues); + double[] actualValues = (double[]) doubleHandler.parseRepeatedObjectField(inputValues); assertTrue(Arrays.equals(new double[]{1D, 2D, 3D}, actualValues)); } @@ -88,7 +90,7 @@ public void shouldReturnArrayValues() { public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("cash_amount"); DoubleHandler doubleHandler = new DoubleHandler(fieldDescriptor); - Object actualValues = doubleHandler.getArray(null); + Object actualValues = doubleHandler.parseRepeatedObjectField(null); assertEquals(0, ((double[]) actualValues).length); } @@ -139,4 +141,61 @@ public void shouldFetchDefaultValueIfFieldNotInitializedWithAValueInSimpleGroup( assertEquals(0.0D, actualValue); } + @Test + public void shouldReturnArrayOfDoubleValuesForFieldOfTypeRepeatedDoubleInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("double_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(DOUBLE).named("double_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + simpleGroup.add("double_array_field", 0.45123D); + simpleGroup.add("double_array_field", 23.0123D); + + DoubleHandler doubleHandler = new DoubleHandler(fieldDescriptor); + double[] actualValue = (double[]) doubleHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new double[]{0.45123D, 23.0123D}, actualValue, 0D); + } + + @Test + public void shouldReturnEmptyDoubleArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("double_array_field"); + + DoubleHandler doubleHandler = new DoubleHandler(fieldDescriptor); + double[] actualValue = (double[]) doubleHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new double[0], actualValue, 0D); + } + + @Test + public void shouldReturnEmptyDoubleArrayWhenRepeatedDoubleFieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("double_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + DoubleHandler doubleHandler = new DoubleHandler(fieldDescriptor); + double[] actualValue = (double[]) doubleHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new double[0], actualValue, 0D); + } + + @Test + public void shouldReturnEmptyDoubleArrayWhenRepeatedDoubleFieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("double_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(DOUBLE).named("double_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + DoubleHandler doubleHandler = new DoubleHandler(fieldDescriptor); + double[] actualValue = (double[]) doubleHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new double[0], actualValue, 0D); + } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandlerTest.java index 6ce746c2c..5b9488a8a 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/FloatHandlerTest.java @@ -11,8 +11,10 @@ import java.util.ArrayList; import java.util.Arrays; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.Types.*; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -82,7 +84,7 @@ public void shouldReturnArrayValues() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("amount_paid_by_cash"); FloatHandler floatHandler = new FloatHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList(1F, 2F, 3F)); - Object actualValues = floatHandler.getArray(inputValues); + Object actualValues = floatHandler.parseRepeatedObjectField(inputValues); assertTrue(Arrays.equals(new float[]{1F, 2F, 3F}, (float[]) actualValues)); } @@ -91,7 +93,7 @@ public void shouldReturnArrayValues() { public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("amount_paid_by_cash"); FloatHandler floatHandler = new FloatHandler(fieldDescriptor); - Object actualValues = floatHandler.getArray(null); + Object actualValues = floatHandler.parseRepeatedObjectField(null); assertEquals(0, ((float[]) actualValues).length); } @@ -141,4 +143,62 @@ public void shouldFetchDefaultValueIfFieldNotInitializedWithAValueInSimpleGroup( assertEquals(0.0F, actualValue); } + @Test + public void shouldReturnArrayOfFloatValuesForFieldOfTypeRepeatedFloatInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("float_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(FLOAT).named("float_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + simpleGroup.add("float_array_field", 0.45123F); + simpleGroup.add("float_array_field", 23.0123F); + + FloatHandler floatHandler = new FloatHandler(fieldDescriptor); + float[] actualValue = (float[]) floatHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new float[]{0.45123F, 23.0123F}, actualValue, 0F); + } + + @Test + public void shouldReturnEmptyFloatArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("float_array_field"); + + FloatHandler floatHandler = new FloatHandler(fieldDescriptor); + float[] actualValue = (float[]) floatHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new float[0], actualValue, 0F); + } + + @Test + public void shouldReturnEmptyFloatArrayWhenRepeatedFloatFieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("float_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + FloatHandler floatHandler = new FloatHandler(fieldDescriptor); + float[] actualValue = (float[]) floatHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new float[0], actualValue, 0F); + } + + @Test + public void shouldReturnEmptyFloatArrayWhenRepeatedFloatFieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("float_array_field"); + + GroupType parquetSchema = buildMessage() + .repeated(FLOAT).named("float_array_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + FloatHandler floatHandler = new FloatHandler(fieldDescriptor); + float[] actualValue = (float[]) floatHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new float[0], actualValue, 0F); + } + } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandlerTest.java index fee725f29..ee07661a3 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/IntegerHandlerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.common.serde.typehandler.primitive; +import io.odpf.dagger.consumer.TestNestedRepeatedMessage; import org.apache.flink.api.common.typeinfo.Types; import com.google.protobuf.Descriptors; @@ -11,7 +12,9 @@ import java.util.ArrayList; import java.util.Arrays; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -83,7 +86,7 @@ public void shouldReturnArrayValues() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("cancel_reason_id"); IntegerHandler integerHandler = new IntegerHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList(1, 2, 3)); - Object actualValues = integerHandler.getArray(inputValues); + Object actualValues = integerHandler.parseRepeatedObjectField(inputValues); assertArrayEquals(new int[]{1, 2, 3}, (int[]) actualValues); } @@ -92,7 +95,7 @@ public void shouldReturnArrayValues() { public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("cancel_reason_id"); IntegerHandler integerHandler = new IntegerHandler(fieldDescriptor); - Object actualValues = integerHandler.getArray(null); + Object actualValues = integerHandler.parseRepeatedObjectField(null); assertEquals(0, ((int[]) actualValues).length); } @@ -143,4 +146,62 @@ public void shouldFetchDefaultValueIfFieldNotInitializedWithAValueInSimpleGroup( assertEquals(0, actualValue); } + + @Test + public void shouldReturnArrayOfIntValuesForFieldOfTypeRepeatedInt32InsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_number_field"); + + GroupType parquetSchema = buildMessage() + .repeated(INT32).named("repeated_number_field") + .named("TestNestedRepeatedMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + simpleGroup.add("repeated_number_field", 2342882); + simpleGroup.add("repeated_number_field", -382922); + + IntegerHandler integerHandler = new IntegerHandler(fieldDescriptor); + int[] actualValue = (int[]) integerHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new int[]{2342882, -382922}, actualValue); + } + + @Test + public void shouldReturnEmptyIntArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_number_field"); + + IntegerHandler integerHandler = new IntegerHandler(fieldDescriptor); + int[] actualValue = (int[]) integerHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new int[0], actualValue); + } + + @Test + public void shouldReturnEmptyIntArrayWhenRepeatedInt32FieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_number_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_field") + .named("TestNestedRepeatedMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + IntegerHandler integerHandler = new IntegerHandler(fieldDescriptor); + int[] actualValue = (int[]) integerHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new int[0], actualValue); + } + + @Test + public void shouldReturnEmptyIntArrayWhenRepeatedInt32FieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_number_field"); + + GroupType parquetSchema = buildMessage() + .repeated(INT32).named("repeated_number_field") + .named("TestNestedRepeatedMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + IntegerHandler integerHandler = new IntegerHandler(fieldDescriptor); + int[] actualValue = (int[]) integerHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new int[0], actualValue); + } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandlerTest.java index 1df43d99e..7b1f44f2b 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandlerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.common.serde.typehandler.primitive; +import io.odpf.dagger.consumer.TestNestedRepeatedMessage; import org.apache.flink.api.common.typeinfo.Types; import com.google.protobuf.Descriptors; @@ -11,7 +12,9 @@ import java.util.ArrayList; import java.util.Arrays; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -83,7 +86,7 @@ public void shouldReturnArrayValues() { Descriptors.FieldDescriptor fieldDescriptor = TestAggregatedSupplyMessage.getDescriptor().findFieldByName("s2_id"); LongHandler longHandler = new LongHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList(1L, 2L, 3L)); - Object actualValues = longHandler.getArray(inputValues); + Object actualValues = longHandler.parseRepeatedObjectField(inputValues); assertArrayEquals(inputValues.toArray(), (Long[]) actualValues); } @@ -91,7 +94,7 @@ public void shouldReturnArrayValues() { public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestAggregatedSupplyMessage.getDescriptor().findFieldByName("s2_id"); LongHandler longHandler = new LongHandler(fieldDescriptor); - Object actualValues = longHandler.getArray(null); + Object actualValues = longHandler.parseRepeatedObjectField(null); assertEquals(0, ((Long[]) actualValues).length); } @@ -140,4 +143,62 @@ public void shouldFetchDefaultValueIfFieldNotInitializedWithAValueInSimpleGroup( assertEquals(0L, actualValue); } + + @Test + public void shouldReturnArrayOfLongValuesForFieldOfTypeRepeatedInt64InsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_long_field"); + + GroupType parquetSchema = buildMessage() + .repeated(INT64).named("repeated_long_field") + .named("TestNestedRepeatedMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + simpleGroup.add("repeated_long_field", 6222L); + simpleGroup.add("repeated_long_field", 0L); + + LongHandler longHandler = new LongHandler(fieldDescriptor); + Long[] actualValue = (Long[]) longHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new Long[]{6222L, 0L}, actualValue); + } + + @Test + public void shouldReturnEmptyLongArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_long_field"); + + LongHandler longHandler = new LongHandler(fieldDescriptor); + Long[] actualValue = (Long[]) longHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new Long[0], actualValue); + } + + @Test + public void shouldReturnEmptyLongArrayWhenRepeatedInt64FieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_long_field"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_field") + .named("TestNestedRepeatedMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + LongHandler longHandler = new LongHandler(fieldDescriptor); + Long[] actualValue = (Long[]) longHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new Long[0], actualValue); + } + + @Test + public void shouldReturnEmptyLongArrayWhenRepeatedInt64FieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("repeated_long_field"); + + GroupType parquetSchema = buildMessage() + .repeated(INT64).named("repeated_long_field") + .named("TestNestedRepeatedMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + LongHandler longHandler = new LongHandler(fieldDescriptor); + Long[] actualValue = (Long[]) longHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new Long[0], actualValue); + } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandlerTest.java index 2e5f749db..e8cee4aac 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/primitive/StringHandlerTest.java @@ -13,6 +13,8 @@ import java.util.Arrays; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -84,7 +86,7 @@ public void shouldReturnArrayValues() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); StringHandler stringHandler = new StringHandler(fieldDescriptor); ArrayList inputValues = new ArrayList<>(Arrays.asList("1", "2", "3")); - Object actualValues = stringHandler.getArray(inputValues); + Object actualValues = stringHandler.parseRepeatedObjectField(inputValues); assertArrayEquals(inputValues.toArray(), (String[]) actualValues); } @@ -92,7 +94,7 @@ public void shouldReturnArrayValues() { public void shouldReturnEmptyArrayOnNull() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); StringHandler stringHandler = new StringHandler(fieldDescriptor); - Object actualValues = stringHandler.getArray(null); + Object actualValues = stringHandler.parseRepeatedObjectField(null); assertEquals(0, ((String[]) actualValues).length); } @@ -139,4 +141,62 @@ public void shouldFetchDefaultValueIfFieldNotInitializedWithAValueInSimpleGroup( assertEquals("", actualValue); } + + @Test + public void shouldReturnArrayOfStringValuesForFieldOfTypeRepeatedBinaryInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("meta_array"); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("meta_array") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + simpleGroup.add("meta_array", "Hello World"); + simpleGroup.add("meta_array", "Welcome"); + + StringHandler stringHandler = new StringHandler(fieldDescriptor); + String[] actualValue = (String[]) stringHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new String[]{"Hello World", "Welcome"}, actualValue); + } + + @Test + public void shouldReturnEmptyStringArrayWhenParseRepeatedSimpleGroupFieldIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("meta_array"); + + StringHandler stringHandler = new StringHandler(fieldDescriptor); + String[] actualValue = (String[]) stringHandler.parseRepeatedSimpleGroupField(null); + + assertArrayEquals(new String[0], actualValue); + } + + @Test + public void shouldReturnEmptyStringArrayWhenRepeatedBinaryFieldInsideSimpleGroupIsNotPresent() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("meta_array"); + + GroupType parquetSchema = buildMessage() + .repeated(BOOLEAN).named("some_other_field") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + StringHandler stringHandler = new StringHandler(fieldDescriptor); + String[] actualValue = (String[]) stringHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new String[0], actualValue); + } + + @Test + public void shouldReturnEmptyStringArrayWhenRepeatedBinaryFieldInsideSimpleGroupIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("meta_array"); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("meta_array") + .named("TestBookingLogMessage"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + StringHandler stringHandler = new StringHandler(fieldDescriptor); + String[] actualValue = (String[]) stringHandler.parseRepeatedSimpleGroupField(simpleGroup); + + assertArrayEquals(new String[0], actualValue); + } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandlerTest.java index 1ff507198..537bf8711 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandlerTest.java @@ -17,7 +17,13 @@ import java.util.ArrayList; import java.util.Collections; -import static org.junit.Assert.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.buildMessage; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class RepeatedEnumHandlerTest { @Test @@ -88,7 +94,7 @@ public void shouldTransformValueForPostProcessorAsEmptyStringArrayForNull() { } @Test - public void shouldTransformValueForKafkaAsStringArray() throws InvalidProtocolBufferException { + public void shouldTransformValueFromProtoAsStringArray() throws InvalidProtocolBufferException { TestRepeatedEnumMessage testRepeatedEnumMessage = TestRepeatedEnumMessage.newBuilder().addTestEnums(TestEnumMessage.Enum.UNKNOWN).build(); DynamicMessage dynamicMessage = DynamicMessage.parseFrom(TestRepeatedEnumMessage.getDescriptor(), testRepeatedEnumMessage.toByteArray()); @@ -101,7 +107,7 @@ public void shouldTransformValueForKafkaAsStringArray() throws InvalidProtocolBu } @Test - public void shouldTransformValueForKafkaAsEmptyStringArrayForNull() { + public void shouldTransformValueFromProtoAsEmptyStringArrayForNull() { Descriptors.FieldDescriptor repeatedEnumFieldDescriptor = TestRepeatedEnumMessage.getDescriptor().findFieldByName("test_enums"); RepeatedEnumHandler repeatedEnumHandler = new RepeatedEnumHandler(repeatedEnumFieldDescriptor); @@ -111,13 +117,81 @@ public void shouldTransformValueForKafkaAsEmptyStringArrayForNull() { } @Test - public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() { - Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("test_enums"); - RepeatedEnumHandler protoHandler = new RepeatedEnumHandler(fieldDescriptor); - GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup() + public void shouldTransformValueForParquetAsStringArray() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedEnumMessage.getDescriptor().findFieldByName("test_enums"); + RepeatedEnumHandler repeatedEnumHandler = new RepeatedEnumHandler(fieldDescriptor); + String enum1 = String.valueOf(fieldDescriptor.getEnumType().findValueByName("FIRST_ENUM_VALUE")); + String enum2 = String.valueOf(fieldDescriptor.getEnumType().findValueByName("SECOND_ENUM_VALUE")); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("test_enums") .named("TestGroupType"); SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + simpleGroup.add("test_enums", enum1); + simpleGroup.add("test_enums", enum2); - assertNull(protoHandler.transformFromParquet(simpleGroup)); + String[] actualEnumArray = (String[]) repeatedEnumHandler.transformFromParquet(simpleGroup); + assertEquals("FIRST_ENUM_VALUE", actualEnumArray[0]); + assertEquals("SECOND_ENUM_VALUE", actualEnumArray[1]); + } + + @Test + public void shouldTransformValueForParquetAsEmptyStringArrayWhenNullIsPassedAsArgument() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedEnumMessage.getDescriptor().findFieldByName("test_enums"); + RepeatedEnumHandler repeatedEnumHandler = new RepeatedEnumHandler(fieldDescriptor); + + String[] expectedEnumArray = (String[]) repeatedEnumHandler.transformFromParquet(null); + + assertArrayEquals(new String[0], expectedEnumArray); + } + + @Test + public void shouldSubstituteDefaultEnumStringWhenAnyValueInsideSimpleGroupRepeatedEnumIsNotPresentInProtoDefinition() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedEnumMessage.getDescriptor().findFieldByName("test_enums"); + RepeatedEnumHandler repeatedEnumHandler = new RepeatedEnumHandler(fieldDescriptor); + String enum1 = String.valueOf(fieldDescriptor.getEnumType().findValueByName("FIRST_ENUM_VALUE")); + String enum2 = "some-junk-value"; + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("test_enums") + .named("TestGroupType"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + simpleGroup.add("test_enums", enum1); + simpleGroup.add("test_enums", enum2); + + String[] actualEnumArray = (String[]) repeatedEnumHandler.transformFromParquet(simpleGroup); + assertEquals("FIRST_ENUM_VALUE", actualEnumArray[0]); + assertEquals("UNKNOWN", actualEnumArray[1]); + } + + @Test + public void shouldTransformValueForParquetAsEmptyStringArrayWhenFieldIsNotPresentInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedEnumMessage.getDescriptor().findFieldByName("test_enums"); + RepeatedEnumHandler repeatedEnumHandler = new RepeatedEnumHandler(fieldDescriptor); + + GroupType parquetSchema = buildMessage() + .repeated(INT64).named("first_name") + .named("TestGroupType"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + simpleGroup.add("first_name", 34L); + + String[] actualEnumArray = (String[]) repeatedEnumHandler.transformFromParquet(simpleGroup); + + assertArrayEquals(new String[0], actualEnumArray); + } + + @Test + public void shouldTransformValueForParquetAsEmptyStringArrayWhenFieldIsNotInitializedInsideSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestRepeatedEnumMessage.getDescriptor().findFieldByName("test_enums"); + RepeatedEnumHandler repeatedEnumHandler = new RepeatedEnumHandler(fieldDescriptor); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("test_enums") + .named("TestGroupType"); + SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); + + String[] actualEnumArray = (String[]) repeatedEnumHandler.transformFromParquet(simpleGroup); + + assertArrayEquals(new String[0], actualEnumArray); } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandlerTest.java index f20a2e43f..971bf18d1 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedMessageHandlerTest.java @@ -13,6 +13,7 @@ import net.minidev.json.JSONArray; import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.PrimitiveType; import org.junit.Test; import java.util.ArrayList; @@ -22,6 +23,9 @@ import static org.apache.flink.api.common.typeinfo.Types.OBJECT_ARRAY; import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED; import static org.apache.flink.api.common.typeinfo.Types.STRING; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Types.buildMessage; +import static org.apache.parquet.schema.Types.repeatedGroup; import static org.junit.Assert.*; public class RepeatedMessageHandlerTest { @@ -37,28 +41,28 @@ public void shouldReturnTrueIfRepeatedMessageFieldDescriptorIsPassed() { @Test public void shouldReturnFalseIfFieldDescriptorOtherThanRepeatedMessageTypeIsPassed() { Descriptors.FieldDescriptor otherFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); - RepeatedMessageHandler repeatedMesssageProtoHandler = new RepeatedMessageHandler(otherFieldDescriptor); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(otherFieldDescriptor); - assertFalse(repeatedMesssageProtoHandler.canHandle()); + assertFalse(repeatedMessageHandler.canHandle()); } @Test public void shouldReturnTheSameBuilderWithoutSettingFieldIfCanNotHandle() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); - RepeatedMessageHandler repeatedMesssageProtoHandler = new RepeatedMessageHandler(fieldDescriptor); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor); DynamicMessage.Builder builder = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()); - assertEquals(builder, repeatedMesssageProtoHandler.transformToProtoBuilder(builder, 123)); - assertEquals("", repeatedMesssageProtoHandler.transformToProtoBuilder(builder, 123).getField(fieldDescriptor)); + assertEquals(builder, repeatedMessageHandler.transformToProtoBuilder(builder, 123)); + assertEquals("", repeatedMessageHandler.transformToProtoBuilder(builder, 123).getField(fieldDescriptor)); } @Test public void shouldReturnTheSameBuilderWithoutSettingFieldIfNullPassed() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("order_number"); - RepeatedMessageHandler repeatedMesssageProtoHandler = new RepeatedMessageHandler(fieldDescriptor); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor); DynamicMessage.Builder builder = DynamicMessage.newBuilder(fieldDescriptor.getContainingType()); - DynamicMessage.Builder outputBuilder = repeatedMesssageProtoHandler.transformToProtoBuilder(builder, null); + DynamicMessage.Builder outputBuilder = repeatedMessageHandler.transformToProtoBuilder(builder, null); assertEquals(builder, outputBuilder); assertEquals("", outputBuilder.getField(fieldDescriptor)); } @@ -97,7 +101,7 @@ public void shouldSetTheFieldsPassedInTheBuilderForRepeatedMessageFieldTypeDescr @Test public void shouldSetTheFieldsPassedInTheBuilderForRepeatedMessageFieldTypeDescriptorIfInputIsList() throws InvalidProtocolBufferException { Descriptors.FieldDescriptor repeatedMessageFieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); - RepeatedMessageHandler repeatedMesssageProtoHandler = new RepeatedMessageHandler(repeatedMessageFieldDescriptor); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(repeatedMessageFieldDescriptor); DynamicMessage.Builder builder = DynamicMessage.newBuilder(repeatedMessageFieldDescriptor.getContainingType()); Row inputRow1 = new Row(2); @@ -113,7 +117,7 @@ public void shouldSetTheFieldsPassedInTheBuilderForRepeatedMessageFieldTypeDescr inputRows.add(inputRow1); inputRows.add(inputRow2); - DynamicMessage.Builder returnedBuilder = repeatedMesssageProtoHandler.transformToProtoBuilder(builder, inputRows); + DynamicMessage.Builder returnedBuilder = repeatedMessageHandler.transformToProtoBuilder(builder, inputRows); List reasons = (List) returnedBuilder.getField(repeatedMessageFieldDescriptor); @@ -127,11 +131,10 @@ public void shouldSetTheFieldsPassedInTheBuilderForRepeatedMessageFieldTypeDescr assertEquals("group2", reason2.getGroupId()); } - @Test public void shouldSetTheFieldsNotPassedInTheBuilderForRepeatedMessageFieldTypeDescriptorToDefaults() throws InvalidProtocolBufferException { Descriptors.FieldDescriptor repeatedMessageFieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); - RepeatedMessageHandler repeatedMesssageProtoHandler = new RepeatedMessageHandler(repeatedMessageFieldDescriptor); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(repeatedMessageFieldDescriptor); DynamicMessage.Builder builder = DynamicMessage.newBuilder(repeatedMessageFieldDescriptor.getContainingType()); Row inputRow1 = new Row(2); @@ -145,7 +148,7 @@ public void shouldSetTheFieldsNotPassedInTheBuilderForRepeatedMessageFieldTypeDe inputRows.add(inputRow1); inputRows.add(inputRow2); - DynamicMessage.Builder returnedBuilder = repeatedMesssageProtoHandler.transformToProtoBuilder(builder, inputRows); + DynamicMessage.Builder returnedBuilder = repeatedMessageHandler.transformToProtoBuilder(builder, inputRows); List reasons = (List) returnedBuilder.getField(repeatedMessageFieldDescriptor); @@ -238,7 +241,7 @@ public void shouldReturnArrayOfRowsGivenAListForFieldDescriptorOfTypeRepeatedMes } @Test - public void shouldReturnEmptyArrayOfRowsIfNullPassedForKafkaTransform() { + public void shouldReturnEmptyArrayOfRowsIfNullPassedForTransformFromProto() { Descriptors.FieldDescriptor repeatedMessageFieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); Object[] values = (Object[]) new RepeatedMessageHandler(repeatedMessageFieldDescriptor).transformFromProto(null); @@ -247,7 +250,7 @@ public void shouldReturnEmptyArrayOfRowsIfNullPassedForKafkaTransform() { } @Test - public void shouldReturnArrayOfRowsGivenAListForFieldDescriptorOfTypeRepeatedMessageOfAsDescriptorForKafkaTransform() throws InvalidProtocolBufferException { + public void shouldReturnArrayOfRowsGivenAListForFieldDescriptorOfTypeRepeatedMessageOfAsDescriptorForTransformFromProto() throws InvalidProtocolBufferException { TestFeedbackLogMessage logMessage = TestFeedbackLogMessage .newBuilder() .addReason(TestReason.newBuilder().setReasonId("reason1").setGroupId("group1").build()) @@ -267,7 +270,6 @@ public void shouldReturnArrayOfRowsGivenAListForFieldDescriptorOfTypeRepeatedMes assertEquals("group2", ((Row) values[1]).getField(1)); } - @Test public void shouldReturnTypeInformation() { Descriptors.FieldDescriptor repeatedMessageFieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); @@ -298,13 +300,79 @@ public void shouldConvertRepeatedComplexRowDataToJsonString() { } @Test - public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() { - Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("reason"); - RepeatedMessageHandler protoHandler = new RepeatedMessageHandler(fieldDescriptor); - GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup() - .named("TestGroupType"); - SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); - - assertNull(protoHandler.transformFromParquet(simpleGroup)); + public void shouldReturnArrayOfRowsWhenTransformFromParquetIsCalledWithSimpleGroupContainingRepeatedSimpleGroup() { + Descriptors.FieldDescriptor fieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor); + + GroupType nestedGroupSchema = repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("reason_id") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("group_id") + .named("reason"); + SimpleGroup value1 = new SimpleGroup(nestedGroupSchema); + value1.add("reason_id", "FIRST"); + value1.add("group_id", "1234XXXX"); + SimpleGroup value2 = new SimpleGroup(nestedGroupSchema); + value2.add("reason_id", "SECOND"); + value2.add("group_id", "6789XXXX"); + + GroupType mainMessageSchema = buildMessage().addField(nestedGroupSchema).named("MainMessage"); + SimpleGroup mainMessageGroup = new SimpleGroup(mainMessageSchema); + mainMessageGroup.add("reason", value1); + mainMessageGroup.add("reason", value2); + + Row[] actualRows = (Row[]) repeatedMessageHandler.transformFromParquet(mainMessageGroup); + + Row expectedRow1 = new Row(2); + expectedRow1.setField(0, "FIRST"); + expectedRow1.setField(1, "1234XXXX"); + Row expectedRow2 = new Row(2); + expectedRow2.setField(0, "SECOND"); + expectedRow2.setField(1, "6789XXXX"); + Row[] expectedRows = new Row[]{expectedRow1, expectedRow2}; + + assertArrayEquals(expectedRows, actualRows); + } + + @Test + public void shouldReturnEmptyRowArrayWhenTransformFromParquetIsCalledWithNull() { + Descriptors.FieldDescriptor fieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor); + + Row[] actualRows = (Row[]) repeatedMessageHandler.transformFromParquet(null); + + assertEquals(0, actualRows.length); + } + + @Test + public void shouldReturnEmptyRowArrayWhenTransformFromParquetIsCalledWithSimpleGroupAndRepeatedSimpleGroupFieldIsNotInitialized() { + Descriptors.FieldDescriptor fieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor); + + GroupType nestedGroupSchema = repeatedGroup() + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("reason_id") + .required(PrimitiveType.PrimitiveTypeName.BINARY).named("group_id") + .named("reason"); + + GroupType mainMessageSchema = buildMessage().addField(nestedGroupSchema).named("MainMessage"); + SimpleGroup mainMessageGroup = new SimpleGroup(mainMessageSchema); + + Row[] actualRows = (Row[]) repeatedMessageHandler.transformFromParquet(mainMessageGroup); + + assertEquals(0, actualRows.length); + } + + @Test + public void shouldReturnEmptyRowArrayWhenTransformFromParquetIsCalledWithSimpleGroupWhichHasRepeatedSimpleGroupFieldAsMissing() { + Descriptors.FieldDescriptor fieldDescriptor = TestFeedbackLogMessage.getDescriptor().findFieldByName("reason"); + RepeatedMessageHandler repeatedMessageHandler = new RepeatedMessageHandler(fieldDescriptor); + + GroupType mainMessageSchema = buildMessage() + .required(INT64).named("some_other_field") + .named("MainMessage"); + SimpleGroup mainMessageGroup = new SimpleGroup(mainMessageSchema); + + Row[] actualRows = (Row[]) repeatedMessageHandler.transformFromParquet(mainMessageGroup); + + assertEquals(0, actualRows.length); } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandlerTest.java index cc340356f..89ec0307e 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandlerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedPrimitiveHandlerTest.java @@ -20,6 +20,8 @@ import java.util.List; import static java.util.Arrays.asList; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.Types.buildMessage; import static org.junit.Assert.*; public class RepeatedPrimitiveHandlerTest { @@ -193,7 +195,7 @@ public void shouldThrowInvalidDataTypeExceptionInCaseOfTypeMismatchForPostProces } @Test - public void shouldReturnAllFieldsInAListOfObjectsIfMultipleFieldsPassedWithSameTypeAsFieldDescriptorForKafkaTransform() throws InvalidProtocolBufferException { + public void shouldReturnAllFieldsInAListOfObjectsIfMultipleFieldsPassedWithSameTypeAsFieldDescriptorForTransformFromProto() throws InvalidProtocolBufferException { Descriptors.FieldDescriptor repeatedFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("meta_array"); RepeatedPrimitiveHandler repeatedPrimitiveHandler = new RepeatedPrimitiveHandler(repeatedFieldDescriptor); @@ -209,9 +211,8 @@ public void shouldReturnAllFieldsInAListOfObjectsIfMultipleFieldsPassedWithSameT assertArrayEquals(new String[]{"1", "2", "3"}, outputValues); } - @Test - public void shouldThrowUnsupportedDataTypeExceptionInCaseOfInCaseOfEnumForKafkaTransform() { + public void shouldThrowUnsupportedDataTypeExceptionInCaseOfInCaseOfEnumForTransformFromProto() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("status"); RepeatedPrimitiveHandler repeatedPrimitiveHandler = new RepeatedPrimitiveHandler(fieldDescriptor); DataTypeNotSupportedException exception = Assert.assertThrows(DataTypeNotSupportedException.class, @@ -240,13 +241,21 @@ public void shouldConvertRepeatedRowDataToJsonString() { } @Test - public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() { + public void shouldReturnArrayOfPrimitiveValuesWhenTransformFromParquetIsCalledWithSimpleGroupContainingRepeatedPrimitive() { Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("meta_array"); - RepeatedPrimitiveHandler protoHandler = new RepeatedPrimitiveHandler(fieldDescriptor); - GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup() - .named("TestGroupType"); + + RepeatedPrimitiveHandler repeatedPrimitiveHandler = new RepeatedPrimitiveHandler(fieldDescriptor); + + GroupType parquetSchema = buildMessage() + .repeated(BINARY).named("meta_array") + .named("TestBookingLogMessage"); SimpleGroup simpleGroup = new SimpleGroup(parquetSchema); - assertNull(protoHandler.transformFromParquet(simpleGroup)); + simpleGroup.add("meta_array", "Hello World"); + simpleGroup.add("meta_array", "Welcome"); + + String[] actualValue = (String[]) repeatedPrimitiveHandler.transformFromParquet(simpleGroup); + + assertArrayEquals(new String[]{"Hello World", "Welcome"}, actualValue); } } diff --git a/dagger-common/src/test/proto/TestMessage.proto b/dagger-common/src/test/proto/TestMessage.proto index fa769be39..5f88433ca 100644 --- a/dagger-common/src/test/proto/TestMessage.proto +++ b/dagger-common/src/test/proto/TestMessage.proto @@ -47,9 +47,15 @@ message TestRepeatedEnumMessage { message TestEnumMessage { enum Enum { UNKNOWN = 0; + FIRST_ENUM_VALUE = 1; + SECOND_ENUM_VALUE = 2; } } message TestComplexMap { map complex_map = 1; } + +message TestRepeatedPrimitiveMessage { + repeated bytes metadata_bytes = 1; +}