diff --git a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala index 1b585c9d..aba9fe72 100644 --- a/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala +++ b/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala @@ -19,7 +19,7 @@ import java.time.Duration import java.util.UUID.randomUUID import java.util.{Collections, Properties} -import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.{Parser, Type} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.util.Utf8 import org.apache.hadoop.fs.{FileSystem, Path} @@ -59,7 +59,7 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w val valueSchemaString = raw"""{"type": "record", "name": "$sourceTopic", "fields": [ {"type": "int", "name": "some_id"}, - {"type": "string", "name": "value_field"} + {"type": ["string", "null"], "name": "value_field"} ]}""" val valueSchema = new Parser().parse(valueSchemaString) @@ -138,6 +138,10 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w keyFieldNames should contain theSameElementsAs List("some_id", "key_field") records.map(_.key().get("some_id")) should contain theSameElementsInOrderAs List.tabulate(numberOfRecords)(_ / 5) records.map(_.key().get("key_field")).distinct should contain theSameElementsAs List(new Utf8("keyHello")) + + records.head.value().getSchema.getField("some_id").schema().getType shouldBe Type.INT + records.head.value().getSchema.getField("value_field").schema().getTypes + .asScala.map(_.getType) should contain theSameElementsAs Seq(Type.STRING, Type.NULL) val valueFieldNames = records.head.value().getSchema.getFields.asScala.map(_.name()) valueFieldNames should contain theSameElementsAs List("some_id", "value_field") diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala index 2463ab4c..0a4bd4e5 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala @@ -18,18 +18,19 @@ package za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent import org.apache.commons.configuration2.Configuration import org.apache.commons.lang3.{RandomStringUtils, StringUtils} import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.functions.col -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.DataStreamReader +import org.apache.spark.sql.{Column, DataFrame} import za.co.absa.abris.avro.functions.from_confluent_avro import za.co.absa.abris.avro.read.confluent.SchemaManager._ import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory} -import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryConsumerConfigKeys, SchemaRegistrySettingsUtil} -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._ import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys +import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryConsumerConfigKeys, SchemaRegistrySettingsUtil} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._ private[decoder] class ConfluentAvroKafkaStreamDecoder( val topic: String, @@ -60,9 +61,10 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder( private def getKeyValueDataFrame(dataFrame: DataFrame, keySchemaRegistrySettings: Map[String, String]) = { - val keyValueDf = dataFrame.select( + val decodedDf = dataFrame.select( from_confluent_avro(col("key"), keySchemaRegistrySettings) as 'key, from_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'value) + val keyValueDf = setColumnNonNullable(decodedDf, "value") val keyColumnNames = keyValueDf.select("key.*").columns.toSeq val valueColumnNames = keyValueDf.select("value.*").columns.toSeq @@ -77,11 +79,17 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder( } private def getValueDataFrame(dataFrame: DataFrame) = { - dataFrame + val decodedDf = dataFrame .select(from_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'data) + setColumnNonNullable(decodedDf, "data") .select("data.*") } + private def setColumnNonNullable(dataFrame: DataFrame, columnName: String) = { + dataFrame + .filter(col(columnName).isNotNull) + .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr))) + } }