diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidation.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidation.java
index 1542252f5..3fa8f9c57 100644
--- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidation.java
+++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidation.java
@@ -1,9 +1,94 @@
package io.odpf.dagger.common.serde.parquet;
import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.Type;
+
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
public class SimpleGroupValidation {
public static boolean checkFieldExistsAndIsInitialized(SimpleGroup simpleGroup, String fieldName) {
return simpleGroup.getType().containsField(fieldName) && simpleGroup.getFieldRepetitionCount(fieldName) != 0;
}
+
+ /**
+ * This method checks if the map field inside the simple group is
+ * serialized using this legacy format:
+ * {@code
+ *
+ * repeated group <name> {
+ * <repetition-type> <data-type> key;
+ * <repetition-type> <data-type> value;
+ * }
+ *
+ * }
+ * The outer group is always repeated. key and value are constant field names.
+ *
+ * @param simpleGroup The SimpleGroup object inside which the map field is present
+ * @param fieldName The name of the map field
+ * @return true, if the map structure follows the spec and false otherwise.
+ */
+ public static boolean checkIsLegacySimpleGroupMap(SimpleGroup simpleGroup, String fieldName) {
+ if (!(simpleGroup.getType().getType(fieldName) instanceof GroupType)) {
+ return false;
+ }
+ GroupType nestedMapGroupType = simpleGroup.getType().getType(fieldName).asGroupType();
+ return nestedMapGroupType.isRepetition(Type.Repetition.REPEATED)
+ && nestedMapGroupType.getFieldCount() == 2
+ && nestedMapGroupType.containsField("key")
+ && nestedMapGroupType.containsField("value");
+ }
+
+ /**
+ * This method checks if the map field inside the simple group is
+ * serialized using this standard parquet map specification:
+ * {@code
+ *
+ * <repetition-type> group <name> (MAP) {
+ * repeated group key_value {
+ * required <data-type> key;
+ * <repetition-type> <data-type> value;
+ * }
+ * }
+ *
+ * }
+ * The validation checks below follow the Apache Parquet LogicalTypes Specificationfor Maps.
+ *
+ * @param simpleGroup The SimpleGroup object inside which the map field is present
+ * @param fieldName The name of the map field
+ * @return true, if the map structure follows the spec and false otherwise.
+ */
+ public static boolean checkIsStandardSimpleGroupMap(SimpleGroup simpleGroup, String fieldName) {
+ return applyMapFieldValidations(simpleGroup, fieldName)
+ && applyNestedKeyValueFieldValidations(simpleGroup, fieldName);
+ }
+
+ private static boolean applyMapFieldValidations(SimpleGroup simpleGroup, String fieldName) {
+ Type mapType = simpleGroup.getType().getType(fieldName);
+ if (mapType instanceof GroupType) {
+ GroupType mapGroupType = mapType.asGroupType();
+ return (mapGroupType.getRepetition().equals(OPTIONAL)
+ || mapGroupType.isRepetition(REQUIRED))
+ && mapGroupType.getLogicalTypeAnnotation().equals(LogicalTypeAnnotation.mapType())
+ && mapGroupType.getFieldCount() == 1;
+ }
+ return false;
+ }
+
+ private static boolean applyNestedKeyValueFieldValidations(SimpleGroup simpleGroup, String fieldName) {
+ GroupType mapGroupType = simpleGroup.getType().getType(fieldName).asGroupType();
+ if (mapGroupType.containsField("key_value")) {
+ Type nestedKeyValueType = mapGroupType.getType("key_value");
+ if (nestedKeyValueType instanceof GroupType) {
+ GroupType nestedKeyValueGroupType = nestedKeyValueType.asGroupType();
+ return nestedKeyValueGroupType.isRepetition(REPEATED)
+ && nestedKeyValueGroupType.containsField("key")
+ && nestedKeyValueGroupType.getType("key").isRepetition(REQUIRED);
+ }
+ }
+ return false;
+ }
}
diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java
index 8b2d92d44..b860109a5 100644
--- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java
+++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializer.java
@@ -4,11 +4,10 @@
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException;
-import io.odpf.dagger.common.exceptions.serde.SimpleGroupParsingException;
import io.odpf.dagger.common.serde.DaggerDeserializer;
-import io.odpf.dagger.common.serde.parquet.SimpleGroupValidation;
import io.odpf.dagger.common.serde.proto.deserialization.ProtoType;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
+import io.odpf.dagger.common.serde.typehandler.complex.TimestampHandler;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.parquet.example.data.simple.SimpleGroup;
@@ -49,19 +48,14 @@ public Row deserialize(SimpleGroup simpleGroup) {
private Row addTimestampFieldToRow(Row row, SimpleGroup simpleGroup, Descriptors.Descriptor descriptor) {
Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByNumber(timestampFieldIndex);
- String timestampFieldName = fieldDescriptor.getName();
- if (SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, timestampFieldName)) {
- long timeInMillis = simpleGroup.getLong(timestampFieldName, 0);
- Instant instant = Instant.ofEpochMilli(timeInMillis);
+ TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
+ Row timestampRow = (Row) timestampHandler.transformFromParquet(simpleGroup);
+ long seconds = timestampRow.getFieldAs(0);
+ int nanos = timestampRow.getFieldAs(1);
- row.setField(row.getArity() - 2, true);
- row.setField(row.getArity() - 1, Timestamp.from(instant));
- return row;
- } else {
- String errMessage = String.format("Could not extract timestamp with field name %s from simple group of type %s",
- fieldDescriptor.getName(), simpleGroup.getType().toString());
- throw new SimpleGroupParsingException(errMessage);
- }
+ row.setField(row.getArity() - 2, true);
+ row.setField(row.getArity() - 1, Timestamp.from(Instant.ofEpochSecond(seconds, nanos)));
+ return row;
}
@Override
diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandler.java
index 3524a58ce..b48fbe80b 100644
--- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandler.java
+++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandler.java
@@ -17,6 +17,10 @@
import java.util.Map;
import java.util.Map.Entry;
+import static io.odpf.dagger.common.serde.parquet.SimpleGroupValidation.checkFieldExistsAndIsInitialized;
+import static io.odpf.dagger.common.serde.parquet.SimpleGroupValidation.checkIsLegacySimpleGroupMap;
+import static io.odpf.dagger.common.serde.parquet.SimpleGroupValidation.checkIsStandardSimpleGroupMap;
+
/**
* The type Map proto handler.
*/
@@ -79,7 +83,39 @@ public Object transformFromProto(Object field) {
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
- return null;
+ String fieldName = fieldDescriptor.getName();
+ if (simpleGroup != null && checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
+ if (checkIsLegacySimpleGroupMap(simpleGroup, fieldName)) {
+ return transformLegacyMapFromSimpleGroup(simpleGroup, fieldName);
+ } else if (checkIsStandardSimpleGroupMap(simpleGroup, fieldName)) {
+ return transformStandardMapFromSimpleGroup(simpleGroup, fieldName);
+ }
+ }
+ return new Row[0];
+ }
+
+ private Row[] transformLegacyMapFromSimpleGroup(SimpleGroup simpleGroup, String fieldName) {
+ ArrayList deserializedRows = new ArrayList<>();
+ int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
+ Descriptors.Descriptor keyValueDescriptor = fieldDescriptor.getMessageType();
+ for (int i = 0; i < repetitionCount; i++) {
+ SimpleGroup keyValuePair = (SimpleGroup) simpleGroup.getGroup(fieldName, i);
+ deserializedRows.add(RowFactory.createRow(keyValueDescriptor, keyValuePair));
+ }
+ return deserializedRows.toArray(new Row[]{});
+ }
+
+ private Row[] transformStandardMapFromSimpleGroup(SimpleGroup simpleGroup, String fieldName) {
+ ArrayList deserializedRows = new ArrayList<>();
+ final String innerFieldName = "key_value";
+ SimpleGroup nestedMapGroup = (SimpleGroup) simpleGroup.getGroup(fieldName, 0);
+ int repetitionCount = nestedMapGroup.getFieldRepetitionCount(innerFieldName);
+ Descriptors.Descriptor keyValueDescriptor = fieldDescriptor.getMessageType();
+ for (int i = 0; i < repetitionCount; i++) {
+ SimpleGroup keyValuePair = (SimpleGroup) nestedMapGroup.getGroup(innerFieldName, i);
+ deserializedRows.add(RowFactory.createRow(keyValueDescriptor, keyValuePair));
+ }
+ return deserializedRows.toArray(new Row[]{});
}
@Override
diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandler.java
index 7ac62ccc0..08e745f22 100644
--- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandler.java
+++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandler.java
@@ -11,6 +11,9 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
import java.text.SimpleDateFormat;
import java.time.Instant;
@@ -109,16 +112,36 @@ public Object transformFromProto(Object field) {
public Object transformFromParquet(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
+ Type timestampType = simpleGroup.getType().getType(fieldName);
+ if (timestampType instanceof PrimitiveType) {
+ return parseInt64TimestampFromSimpleGroup(simpleGroup, fieldName);
+ } else if (timestampType instanceof GroupType) {
+ return parseGroupTypeTimestampFromSimpleGroup(simpleGroup, fieldName);
+ }
+ }
+ return Row.of(DEFAULT_SECONDS_VALUE, DEFAULT_NANOS_VALUE);
+ }
- /* conversion from ms to nanos borrowed from Instant.java class and inlined here for performance reasons */
- long timeInMillis = simpleGroup.getLong(fieldName, 0);
- long seconds = Math.floorDiv(timeInMillis, SECOND_TO_MS_FACTOR);
- int mos = (int) Math.floorMod(timeInMillis, SECOND_TO_MS_FACTOR);
- int nanos = mos * MS_TO_NANOS_FACTOR;
- return Row.of(seconds, nanos);
- } else {
- return Row.of(DEFAULT_SECONDS_VALUE, DEFAULT_NANOS_VALUE);
+ private Row parseInt64TimestampFromSimpleGroup(SimpleGroup simpleGroup, String timestampFieldName) {
+ /* conversion from ms to nanos borrowed from Instant.java class and inlined here for performance reasons */
+ long timeInMillis = simpleGroup.getLong(timestampFieldName, 0);
+ long seconds = Math.floorDiv(timeInMillis, SECOND_TO_MS_FACTOR);
+ int mos = (int) Math.floorMod(timeInMillis, SECOND_TO_MS_FACTOR);
+ int nanos = mos * MS_TO_NANOS_FACTOR;
+ return Row.of(seconds, nanos);
+ }
+
+ private Row parseGroupTypeTimestampFromSimpleGroup(SimpleGroup simpleGroup, String timestampFieldName) {
+ SimpleGroup timestampGroup = (SimpleGroup) simpleGroup.getGroup(timestampFieldName, 0);
+ long seconds = 0L;
+ int nanos = 0;
+ if (SimpleGroupValidation.checkFieldExistsAndIsInitialized(timestampGroup, "seconds")) {
+ seconds = timestampGroup.getLong("seconds", 0);
+ }
+ if (SimpleGroupValidation.checkFieldExistsAndIsInitialized(timestampGroup, "nanos")) {
+ nanos = timestampGroup.getInteger("nanos", 0);
}
+ return Row.of(seconds, nanos);
}
@Override
diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java
index c6fa879eb..a254edc90 100644
--- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java
+++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/primitive/LongHandler.java
@@ -60,14 +60,14 @@ public Object parseRepeatedObjectField(Object field) {
@Override
public Object parseRepeatedSimpleGroupField(SimpleGroup simpleGroup) {
String fieldName = fieldDescriptor.getName();
- ArrayList deserializedValues = new ArrayList<>();
+ ArrayList longArrayList = new ArrayList<>();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
for (int i = 0; i < repetitionCount; i++) {
- deserializedValues.add(simpleGroup.getLong(fieldName, i));
+ longArrayList.add(simpleGroup.getLong(fieldName, i));
}
}
- return deserializedValues.toArray(new Long[]{});
+ return longArrayList.toArray(new Long[]{});
}
@Override
diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java
index b3617dcde..fe8d9cb08 100644
--- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java
+++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedEnumHandler.java
@@ -56,7 +56,7 @@ public Object transformFromProto(Object field) {
@Override
public Object transformFromParquet(SimpleGroup simpleGroup) {
String defaultEnumValue = fieldDescriptor.getEnumType().findValueByNumber(0).getName();
- List deserializedEnumArray = new ArrayList<>();
+ List enumArrayList = new ArrayList<>();
String fieldName = fieldDescriptor.getName();
if (simpleGroup != null && SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, fieldName)) {
int repetitionCount = simpleGroup.getFieldRepetitionCount(fieldName);
@@ -64,10 +64,10 @@ public Object transformFromParquet(SimpleGroup simpleGroup) {
String extractedValue = simpleGroup.getString(fieldName, positionIndex);
Descriptors.EnumValueDescriptor enumValueDescriptor = fieldDescriptor.getEnumType().findValueByName(extractedValue);
String enumValue = enumValueDescriptor == null ? defaultEnumValue : enumValueDescriptor.getName();
- deserializedEnumArray.add(enumValue);
+ enumArrayList.add(enumValue);
}
}
- return deserializedEnumArray.toArray(new String[]{});
+ return enumArrayList.toArray(new String[]{});
}
@Override
diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidationTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidationTest.java
index c98b0bd74..673e70d24 100644
--- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidationTest.java
+++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/SimpleGroupValidationTest.java
@@ -2,10 +2,19 @@
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.junit.Test;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Types.buildMessage;
+import static org.apache.parquet.schema.Types.optionalGroup;
+import static org.apache.parquet.schema.Types.optionalMap;
+import static org.apache.parquet.schema.Types.repeatedGroup;
+import static org.apache.parquet.schema.Types.requiredGroup;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -42,4 +51,238 @@ public void checkFieldExistsAndIsInitializedShouldReturnTrueWhenFieldIsBothPrese
assertTrue(SimpleGroupValidation.checkFieldExistsAndIsInitialized(simpleGroup, "primitive-type-column"));
}
+
+ @Test
+ public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldIsNotOfTypeGroupType() {
+ MessageType parquetSchema = buildMessage()
+ .required(INT32).named("sample_map_field")
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldIsNotRepeated() {
+ GroupType mapSchema = requiredGroup()
+ .optional(INT32).named("key")
+ .optional(FLOAT).named("value")
+ .named("sample_map_field");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldSchemaDoesNotHaveCorrectNumberOfNestedFields() {
+ GroupType mapSchema = repeatedGroup()
+ .optional(INT32).named("key")
+ .optional(FLOAT).named("value")
+ .optional(DOUBLE).named("extra_field")
+ .named("sample_map_field");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldSchemaDoesNotHaveKey() {
+ GroupType mapSchema = repeatedGroup()
+ .optional(FLOAT).named("value")
+ .optional(DOUBLE).named("extra_field")
+ .named("sample_map_field");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsLegacySimpleGroupMapShouldReturnFalseWhenMapFieldSchemaDoesNotHaveValue() {
+ GroupType mapSchema = repeatedGroup()
+ .optional(FLOAT).named("key")
+ .optional(DOUBLE).named("extra_field")
+ .named("sample_map_field");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsLegacySimpleGroupMapShouldOnlyReturnTrueWhenMapFieldConformsToTheLegacySchema() {
+ GroupType mapSchema = repeatedGroup()
+ .optional(INT32).named("key")
+ .optional(FLOAT).named("value")
+ .named("sample_map_field");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertTrue(SimpleGroupValidation.checkIsLegacySimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenMapFieldIsNotOfTypeGroupType() {
+ MessageType parquetSchema = buildMessage()
+ .required(INT32).named("sample_map_field")
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenMapFieldIsOfRepeatedType() {
+ GroupType mapType = repeatedGroup()
+ .required(INT32).named("key_value")
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenMapFieldDoesNotContainCorrectLogicalTypeAnnotation() {
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.enumType())
+ .required(INT32).named("key_value")
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenMapFieldDoesNotContainCorrectNumberOfNestedFields() {
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.mapType())
+ .required(INT32).named("key_value")
+ .required(INT32).named("extra_field")
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenMapFieldDoesNotContainNestedKeyValueField() {
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.mapType())
+ .required(INT32).named("extra_field")
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenNestedKeyValueFieldIsNotAGroupType() {
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.mapType())
+ .required(INT32).named("key_value")
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenNestedKeyValueFieldIsNotRepeated() {
+ GroupType keyValueType = optionalGroup()
+ .required(INT32).named("some_key")
+ .required(INT32).named("some_value")
+ .named("key_value");
+
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.mapType())
+ .addField(keyValueType)
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenNestedKeyValueGroupDoesNotContainKeyField() {
+ GroupType keyValueType = repeatedGroup()
+ .required(INT32).named("some_key")
+ .required(INT32).named("some_value")
+ .named("key_value");
+
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.mapType())
+ .addField(keyValueType)
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldReturnFalseWhenNestedKeyValueGroupDoesNotHaveKeyAsRequired() {
+ GroupType keyValueType = repeatedGroup()
+ .optional(INT32).named("key")
+ .required(INT32).named("some_value")
+ .named("key_value");
+
+ GroupType mapType = requiredGroup().as(LogicalTypeAnnotation.mapType())
+ .addField(keyValueType)
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertFalse(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
+
+ @Test
+ public void checkIsStandardSimpleGroupMapShouldOnlyReturnTrueWhenMapFieldConformsToTheStandardSpec() {
+ GroupType mapType = optionalMap()
+ .key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .optionalValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .named("sample_map_field");
+
+ MessageType parquetSchema = buildMessage()
+ .addField(mapType)
+ .named("TestMessage");
+ SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+
+ assertTrue(SimpleGroupValidation.checkIsStandardSimpleGroupMap(simpleGroup, "sample_map_field"));
+ }
}
diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializerTest.java
index 9e443d188..9af785c68 100644
--- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializerTest.java
+++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/parquet/deserialization/SimpleGroupDeserializerTest.java
@@ -158,28 +158,6 @@ public void shouldSetDefaultValueForAllPrimitiveTypeFieldsExceptTimestampIfMissi
assertEquals("UNKNOWN", row.getField(getProtoIndex("service_type")));
}
- @Test
- public void shouldThrowExceptionIfTimestampIsMissingInSimpleGroup() {
- SimpleGroupDeserializer simpleGroupDeserializer = new SimpleGroupDeserializer(TestPrimitiveMessage.class.getTypeName(), 9, "rowtime", stencilClientOrchestrator);
- GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup()
- .required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named("is_valid")
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_number")
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_hash")
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("latitude")
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("longitude")
- .required(PrimitiveType.PrimitiveTypeName.FLOAT).named("price")
- .required(PrimitiveType.PrimitiveTypeName.INT32).named("packet_count")
- .required(PrimitiveType.PrimitiveTypeName.INT64).named("phone")
- .required(PrimitiveType.PrimitiveTypeName.INT64).named("event_timestamp")
- .required(PrimitiveType.PrimitiveTypeName.BINARY).named("service_type")
- .named("TestGroupType");
-
- SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
-
- Assert.assertThrows(DaggerDeserializationException.class,
- () -> simpleGroupDeserializer.deserialize(simpleGroup));
- }
-
@Test
public void shouldThrowExceptionIfSimpleGroupIsNull() {
SimpleGroupDeserializer simpleGroupDeserializer = new SimpleGroupDeserializer(TestPrimitiveMessage.class.getTypeName(), 9, "rowtime", stencilClientOrchestrator);
diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandlerTest.java
index 41ded48e4..c522a4459 100644
--- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandlerTest.java
+++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/MapHandlerTest.java
@@ -13,6 +13,9 @@
import io.odpf.dagger.consumer.TestMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.junit.Assert;
import org.junit.Test;
@@ -22,6 +25,10 @@
import java.util.List;
import java.util.Map;
+import static org.apache.parquet.schema.Types.buildMessage;
+import static org.apache.parquet.schema.Types.repeatedGroup;
+import static org.apache.parquet.schema.Types.requiredGroup;
+import static org.apache.parquet.schema.Types.requiredMap;
import static org.junit.Assert.*;
public class MapHandlerTest {
@@ -345,13 +352,387 @@ public void shouldReturnTypeInformation() {
}
@Test
- public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() {
+ public void shouldReturnArrayOfRowsForSimpleGroupContainingStandardSpecMap() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
- MapHandler protoHandler = new MapHandler(fieldDescriptor);
- GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup()
- .named("TestGroupType");
- SimpleGroup simpleGroup = new SimpleGroup(parquetSchema);
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType keyValueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("key")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("value")
+ .named("key_value");
+ GroupType mapSchema = requiredMap()
+ .key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .requiredValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+
+ SimpleGroup keyValue1 = new SimpleGroup(keyValueSchema);
+ keyValue1.add("key", "batman");
+ keyValue1.add("value", "DC");
+ SimpleGroup keyValue2 = new SimpleGroup(keyValueSchema);
+ keyValue2.add("key", "starlord");
+ keyValue2.add("value", "Marvel");
+
+ SimpleGroup mapMessage = new SimpleGroup(mapSchema);
+ mapMessage.add("key_value", keyValue1);
+ mapMessage.add("key_value", keyValue2);
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("metadata", mapMessage);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{Row.of("batman", "DC"), Row.of("starlord", "Marvel")};
+
+ assertEquals(2, actualRows.length);
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldReturnArrayOfRowsForSimpleGroupContainingLegacySpecMap() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType mapSchema = repeatedGroup()
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("key")
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("value")
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+
+ SimpleGroup keyValue1 = new SimpleGroup(mapSchema);
+ keyValue1.add("key", "batman");
+ keyValue1.add("value", "DC");
+ SimpleGroup keyValue2 = new SimpleGroup(mapSchema);
+ keyValue2.add("key", "starlord");
+ keyValue2.add("value", "Marvel");
+
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("metadata", keyValue1);
+ mainMessage.add("metadata", keyValue2);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{Row.of("batman", "DC"), Row.of("starlord", "Marvel")};
+
+ assertEquals(2, actualRows.length);
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldReturnArrayOfRowsWhenHandlingSimpleGroupContainingStandardSpecMapOfComplexTypes() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestComplexMap.getDescriptor().findFieldByName("complex_map");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType valueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_number")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_url")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_details")
+ .named("value");
+ GroupType keyValueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.INT32).named("key")
+ .addField(valueSchema)
+ .named("key_value");
+ GroupType mapSchema = requiredMap()
+ .key(PrimitiveType.PrimitiveTypeName.INT32)
+ .value(valueSchema)
+ .named("complex_map");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestComplexMap");
+
+ SimpleGroup keyValue1 = new SimpleGroup(keyValueSchema);
+ SimpleGroup value1 = new SimpleGroup(valueSchema);
+ value1.add("order_number", "RS-123");
+ value1.add("order_url", "http://localhost");
+ value1.add("order_details", "some-details");
+ keyValue1.add("key", 10);
+ keyValue1.add("value", value1);
+
+ SimpleGroup keyValue2 = new SimpleGroup(keyValueSchema);
+ SimpleGroup value2 = new SimpleGroup(valueSchema);
+ value2.add("order_number", "RS-456");
+ value2.add("order_url", "http://localhost:8888/some-url");
+ value2.add("order_details", "extra-details");
+ keyValue2.add("key", 90);
+ keyValue2.add("value", value2);
+
+ SimpleGroup mapMessage = new SimpleGroup(mapSchema);
+ mapMessage.add("key_value", keyValue1);
+ mapMessage.add("key_value", keyValue2);
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("complex_map", mapMessage);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{
+ Row.of(10, Row.of("RS-123", "http://localhost", "some-details")),
+ Row.of(90, Row.of("RS-456", "http://localhost:8888/some-url", "extra-details"))};
+
+ assertEquals(2, actualRows.length);
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldReturnArrayOfRowsWhenHandlingSimpleGroupContainingLegacySpecMapOfComplexTypes() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestComplexMap.getDescriptor().findFieldByName("complex_map");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType valueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_number")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_url")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_details")
+ .named("value");
+ GroupType mapSchema = repeatedGroup()
+ .required(PrimitiveType.PrimitiveTypeName.INT32).named("key")
+ .addField(valueSchema)
+ .named("complex_map");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestComplexMap");
+
+ SimpleGroup keyValue1 = new SimpleGroup(mapSchema);
+ SimpleGroup value1 = new SimpleGroup(valueSchema);
+ value1.add("order_number", "RS-123");
+ value1.add("order_url", "http://localhost");
+ value1.add("order_details", "some-details");
+ keyValue1.add("key", 10);
+ keyValue1.add("value", value1);
+
+ SimpleGroup keyValue2 = new SimpleGroup(mapSchema);
+ SimpleGroup value2 = new SimpleGroup(valueSchema);
+ value2.add("order_number", "RS-456");
+ value2.add("order_url", "http://localhost:8888/some-url");
+ value2.add("order_details", "extra-details");
+ keyValue2.add("key", 90);
+ keyValue2.add("value", value2);
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("complex_map", keyValue1);
+ mainMessage.add("complex_map", keyValue2);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{
+ Row.of(10, Row.of("RS-123", "http://localhost", "some-details")),
+ Row.of(90, Row.of("RS-456", "http://localhost:8888/some-url", "extra-details"))};
+
+ assertEquals(2, actualRows.length);
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldReturnEmptyRowArrayWhenHandlingASimpleGroupNotContainingTheMapField() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+ MessageType parquetSchema = buildMessage()
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+
+ assertArrayEquals(new Row[0], actualRows);
+ }
+
+ @Test
+ public void shouldReturnEmptyRowArrayWhenHandlingASimpleGroupWithStandardSpecMapFieldNotInitialized() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType mapSchema = requiredMap()
+ .key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .requiredValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+
+ assertArrayEquals(new Row[0], actualRows);
+ }
+
+ @Test
+ public void shouldReturnEmptyRowArrayWhenHandlingASimpleGroupWithLegacySpecMapFieldNotInitialized() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType mapSchema = repeatedGroup()
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("key")
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("value")
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+
+ assertArrayEquals(new Row[0], actualRows);
+ }
+
+
+ @Test
+ public void shouldReturnEmptyRowArrayWhenHandlingNullSimpleGroup() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(null);
+
+ assertArrayEquals(new Row[0], actualRows);
+ }
+
+ @Test
+ public void shouldUseDefaultKeyAsPerTypeWhenHandlingSimpleGroupAndStandardSpecMapEntryDoesNotHaveKeyInitialized() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType keyValueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("key")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("value")
+ .named("key_value");
+ GroupType mapSchema = requiredMap()
+ .key(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .requiredValue(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType())
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+
+ /* Creating a map entry and only initializing the value but not the key */
+ SimpleGroup keyValue = new SimpleGroup(keyValueSchema);
+ keyValue.add("value", "DC");
+
+ SimpleGroup mapMessage = new SimpleGroup(mapSchema);
+ mapMessage.add("key_value", keyValue);
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("metadata", mapMessage);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{Row.of("", "DC")};
+
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldUseDefaultKeyAsPerTypeWhenHandlingSimpleGroupAndLegacySpecMapEntryDoesNotHaveKeyInitialized() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType mapSchema = repeatedGroup()
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("key")
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("value")
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+
+ /* Creating a map entry and only initializing the value but not the key */
+ SimpleGroup keyValue = new SimpleGroup(mapSchema);
+ keyValue.add("value", "DC");
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("metadata", keyValue);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{Row.of("", "DC")};
+
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldUseDefaultValueAsPerTypeWhenHandlingSimpleGroupAndStandardSpecMapEntryDoesNotHaveValueInitialized() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestComplexMap.getDescriptor().findFieldByName("complex_map");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType valueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_number")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_url")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY).named("order_details")
+ .named("value");
+ GroupType keyValueSchema = requiredGroup()
+ .required(PrimitiveType.PrimitiveTypeName.INT32).named("key")
+ .addField(valueSchema)
+ .named("key_value");
+ GroupType mapSchema = requiredMap()
+ .key(PrimitiveType.PrimitiveTypeName.INT32)
+ .value(valueSchema)
+ .named("complex_map");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestComplexMap");
+
+ SimpleGroup keyValue = new SimpleGroup(keyValueSchema);
+ keyValue.add("key", 10);
+
+ /* Just creating an empty simple group for the value, without initializing any of the fields in it */
+ SimpleGroup value = new SimpleGroup(valueSchema);
+ keyValue.add("value", value);
+
+ SimpleGroup mapMessage = new SimpleGroup(mapSchema);
+ mapMessage.add("key_value", keyValue);
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("complex_map", mapMessage);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{
+ Row.of(10, Row.of("", "", ""))};
+
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldUseDefaultValueAsPerTypeWhenHandlingSimpleGroupAndLegacySpecMapEntryDoesNotHaveValueInitialized() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType mapSchema = repeatedGroup()
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("key")
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("value")
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+
+ /* Creating a map entry and only initializing the key but not the value */
+ SimpleGroup keyValue = new SimpleGroup(mapSchema);
+ keyValue.add("key", "Superman");
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("metadata", keyValue);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
+ Row[] expectedRows = new Row[]{Row.of("Superman", "")};
+
+ assertArrayEquals(expectedRows, actualRows);
+ }
+
+ @Test
+ public void shouldReturnEmptyRowArrayWhenHandlingSimpleGroupAndMapFieldDoesNotConformWithAnySpec() {
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ MapHandler mapHandler = new MapHandler(fieldDescriptor);
+
+ GroupType mapSchema = repeatedGroup()
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("random_key")
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("random_value")
+ .named("metadata");
+ MessageType parquetSchema = buildMessage()
+ .addField(mapSchema)
+ .named("TestBookingLogMessage");
+
+ SimpleGroup keyValue = new SimpleGroup(mapSchema);
+ keyValue.add("random_key", "Superman");
+ keyValue.add("random_value", "DC");
+
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("metadata", keyValue);
+
+ Row[] actualRows = (Row[]) mapHandler.transformFromParquet(mainMessage);
- assertNull(protoHandler.transformFromParquet(simpleGroup));
+ assertArrayEquals(new Row[]{}, actualRows);
}
}
diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandlerTest.java
index 1566b97a1..e2c385dd6 100644
--- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandlerTest.java
+++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/complex/TimestampHandlerTest.java
@@ -12,6 +12,7 @@
import io.odpf.dagger.consumer.TestBookingLogMessage;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
import org.junit.Test;
import java.sql.Timestamp;
@@ -19,7 +20,9 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.buildMessage;
import static org.junit.Assert.*;
public class TimestampHandlerTest {
@@ -305,7 +308,7 @@ public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotCo
}
@Test
- public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotContainValueForField() {
+ public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotContainValueForInt64TimestampField() {
Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup()
.required(INT64).named("event_timestamp")
@@ -318,4 +321,142 @@ public void shouldReturnDefaultTimestampRowDuringTransformIfSimpleGroupDoesNotCo
Row expectedRow = Row.of(0L, 0);
assertEquals(expectedRow, actualRow);
}
+
+ @Test
+ public void shouldTransformGroupTypeTimestampFromSimpleGroup() {
+ Instant currentInstant = Instant.now();
+ long seconds = currentInstant.getEpochSecond();
+ int nanos = currentInstant.getNano();
+ Row expectedRow = Row.of(seconds, nanos);
+
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
+ GroupType timestampSchema = org.apache.parquet.schema.Types.requiredGroup()
+ .required(INT64).named("seconds")
+ .required(INT32).named("nanos")
+ .named("event_timestamp");
+ SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
+ timestampMessage.add("seconds", seconds);
+ timestampMessage.add("nanos", nanos);
+
+ MessageType parquetSchema = buildMessage()
+ .addField(timestampSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("event_timestamp", timestampMessage);
+
+ TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
+ Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);
+
+ assertEquals(expectedRow, actualRow);
+ }
+
+ @Test
+ public void shouldUseDefaultSecondsDuringTransformIfSimpleGroupDoesNotContainSecondsInGroupTypeTimestamp() {
+ Instant currentInstant = Instant.now();
+ int nanos = currentInstant.getNano();
+ Row expectedRow = Row.of(0L, nanos);
+
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
+ TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
+
+ /* only adding nanos field to the timestamp schema and initializing it */
+ GroupType timestampSchema = org.apache.parquet.schema.Types.optionalGroup()
+ .optional(INT32).named("nanos")
+ .named("event_timestamp");
+ SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
+ timestampMessage.add("nanos", nanos);
+
+ MessageType parquetSchema = buildMessage()
+ .addField(timestampSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("event_timestamp", timestampMessage);
+
+ Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);
+
+ assertEquals(expectedRow, actualRow);
+ }
+
+ @Test
+ public void shouldUseDefaultSecondsDuringTransformIfSimpleGroupHasGroupTypeTimestampWithSecondsNotInitialized() {
+ Instant currentInstant = Instant.now();
+ int nanos = currentInstant.getNano();
+ Row expectedRow = Row.of(0L, nanos);
+
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
+ TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
+
+ /* adding both nanos and seconds field to the timestamp schema but initializing only for nanos */
+ GroupType timestampSchema = org.apache.parquet.schema.Types.requiredGroup()
+ .required(INT64).named("seconds")
+ .required(INT32).named("nanos")
+ .named("event_timestamp");
+ SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
+ timestampMessage.add("nanos", nanos);
+
+ MessageType parquetSchema = buildMessage()
+ .addField(timestampSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("event_timestamp", timestampMessage);
+
+ Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);
+
+ assertEquals(expectedRow, actualRow);
+ }
+
+ @Test
+ public void shouldUseDefaultNanosDuringTransformIfSimpleGroupDoesNotContainNanosInGroupTypeTimestamp() {
+ Instant currentInstant = Instant.now();
+ long seconds = currentInstant.getEpochSecond();
+ Row expectedRow = Row.of(seconds, 0);
+
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
+ TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
+
+ /* only adding seconds field to the timestamp schema and initializing it */
+ GroupType timestampSchema = org.apache.parquet.schema.Types.optionalGroup()
+ .optional(INT64).named("seconds")
+ .named("event_timestamp");
+ SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
+ timestampMessage.add("seconds", seconds);
+
+ MessageType parquetSchema = buildMessage()
+ .addField(timestampSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("event_timestamp", timestampMessage);
+
+ Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);
+
+ assertEquals(expectedRow, actualRow);
+ }
+
+ @Test
+ public void shouldUseDefaultNanosDuringTransformIfSimpleGroupHasGroupTypeTimestampWithNanosNotInitialized() {
+ Instant currentInstant = Instant.now();
+ long seconds = currentInstant.getEpochSecond();
+ Row expectedRow = Row.of(seconds, 0);
+
+ Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("event_timestamp");
+ TimestampHandler timestampHandler = new TimestampHandler(fieldDescriptor);
+
+ /* adding both nanos and seconds field to the timestamp schema but initializing only for seconds */
+ GroupType timestampSchema = org.apache.parquet.schema.Types.optionalGroup()
+ .optional(INT64).named("seconds")
+ .optional(INT32).named("nanos")
+ .named("event_timestamp");
+ SimpleGroup timestampMessage = new SimpleGroup(timestampSchema);
+ timestampMessage.add("seconds", seconds);
+
+ MessageType parquetSchema = buildMessage()
+ .addField(timestampSchema)
+ .named("TestBookingLogMessage");
+ SimpleGroup mainMessage = new SimpleGroup(parquetSchema);
+ mainMessage.add("event_timestamp", timestampMessage);
+
+ Row actualRow = (Row) timestampHandler.transformFromParquet(mainMessage);
+
+ assertEquals(expectedRow, actualRow);
+ }
}
diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedStructMessageHandlerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedStructMessageHandlerTest.java
index 55b55ae08..f1ee06d40 100644
--- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedStructMessageHandlerTest.java
+++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/typehandler/repeated/RepeatedStructMessageHandlerTest.java
@@ -75,7 +75,7 @@ public void shouldReturnTypeInformation() {
@Test
public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() {
- Descriptors.FieldDescriptor fieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("metadata");
+ Descriptors.FieldDescriptor fieldDescriptor = TestNestedRepeatedMessage.getDescriptor().findFieldByName("metadata");
RepeatedStructMessageHandler protoHandler = new RepeatedStructMessageHandler(fieldDescriptor);
GroupType parquetSchema = org.apache.parquet.schema.Types.requiredGroup()
.named("TestGroupType");
@@ -83,5 +83,4 @@ public void shouldReturnNullWhenTransformFromParquetIsCalledWithAnyArgument() {
assertNull(protoHandler.transformFromParquet(simpleGroup));
}
-
}