From a52e76a37cc97f0491e67167365045c5e6d96f2b Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Fri, 1 Jul 2022 11:10:49 +0100 Subject: [PATCH 1/5] feat: Bigquery sink using depot (#154) --- .../io/odpf/dagger/common/core/Constants.java | 4 +- .../core/StencilClientOrchestrator.java | 7 +- .../deserialization/ProtoDeserializer.java | 11 +- .../serialization/KafkaProtoSerializer.java | 43 +++++ .../proto/serialization/ProtoSerializer.java | 90 +++-------- .../core/StencilClientOrchestratorTest.java | 14 +- .../KafkaProtoSerializerTest.java | 53 ++++++ .../serialization/ProtoSerializerTest.java | 146 ++++++++--------- dagger-core/build.gradle | 2 +- .../ProtoDeserializerProvider.java | 8 +- .../exception/BigqueryWriterException.java | 14 ++ .../processors/longbow/LongbowFactory.java | 2 +- .../longbow/request/ProtoBytePutRequest.java | 10 +- .../longbow/request/PutRequestFactory.java | 8 +- .../dagger/core/sink/SinkOrchestrator.java | 10 +- .../core/sink/bigquery/BigquerySink.java | 96 +++++++++++ .../sink/bigquery/BigquerySinkBuilder.java | 61 +++++++ .../sink/bigquery/BigquerySinkWriter.java | 116 +++++++++++++ .../builder/KafkaProtoSerializerBuilder.java | 6 +- .../io/odpf/dagger/core/utils/Constants.java | 6 + .../longbow/processor/LongbowWriterTest.java | 2 +- .../request/ProtoBytePutRequestTest.java | 2 +- .../request/PutRequestFactoryTest.java | 2 +- .../core/sink/SinkOrchestratorTest.java | 16 +- .../bigquery/BigquerySinkBuilderTest.java | 26 +++ .../core/sink/bigquery/BigquerySinkTest.java | 53 ++++++ .../sink/bigquery/BigquerySinkWriterTest.java | 152 ++++++++++++++++++ .../KafkaProtoSerializerBuilderTest.java | 4 +- 28 files changed, 770 insertions(+), 194 deletions(-) create mode 100644 dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java create mode 100644 dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializerTest.java create mode 100644 dagger-core/src/main/java/io/odpf/dagger/core/exception/BigqueryWriterException.java create mode 100644 dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySink.java create mode 100644 dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilder.java create mode 100644 dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java create mode 100644 dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilderTest.java create mode 100644 dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkTest.java create mode 100644 dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/core/Constants.java b/dagger-common/src/main/java/io/odpf/dagger/common/core/Constants.java index 202119ec8..204578798 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/core/Constants.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/core/Constants.java @@ -5,8 +5,8 @@ public class Constants { public static final boolean SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT = false; public static final String SCHEMA_REGISTRY_STENCIL_URLS_KEY = "SCHEMA_REGISTRY_STENCIL_URLS"; public static final String SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT = ""; - public static final String SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY = "SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS"; - public static final Integer SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT = 60000; + public static final String SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS = "SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS"; + public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT = 60000; public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS"; public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_DEFAULT = ""; diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/core/StencilClientOrchestrator.java b/dagger-common/src/main/java/io/odpf/dagger/common/core/StencilClientOrchestrator.java index cddc64da6..a3b61a7cc 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/core/StencilClientOrchestrator.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/core/StencilClientOrchestrator.java @@ -38,9 +38,10 @@ public StencilClientOrchestrator(Configuration configuration) { } StencilConfig createStencilConfig() { - Integer timeoutMS = configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT); - List
headers = this.getHeaders(configuration); - return StencilConfig.builder().fetchTimeoutMs(timeoutMS).fetchHeaders(headers).build(); + return StencilConfig.builder() + .fetchHeaders(getHeaders(configuration)) + .fetchTimeoutMs(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)) + .build(); } private List
getHeaders(Configuration config) { diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/deserialization/ProtoDeserializer.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/deserialization/ProtoDeserializer.java index 8b2a0f501..66ec2ec0e 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/deserialization/ProtoDeserializer.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/deserialization/ProtoDeserializer.java @@ -1,17 +1,16 @@ package io.odpf.dagger.common.serde.proto.deserialization; -import io.odpf.dagger.common.serde.DaggerDeserializer; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; -import org.apache.flink.types.Row; - import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.exceptions.DescriptorNotFoundException; import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException; +import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.common.serde.typehandler.RowFactory; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.types.Row; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,11 +24,11 @@ */ public class ProtoDeserializer implements KafkaDeserializationSchema, DaggerDeserializer { + private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class); private final String protoClassName; private final int timestampFieldIndex; private final StencilClientOrchestrator stencilClientOrchestrator; private final TypeInformation typeInformation; - private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class); /** * Instantiates a new Proto deserializer. diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java new file mode 100644 index 000000000..1977a1092 --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializer.java @@ -0,0 +1,43 @@ +package io.odpf.dagger.common.serde.proto.serialization; + +import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.types.Row; + +import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +public class KafkaProtoSerializer implements KafkaRecordSerializationSchema { + private final String outputTopic; + private final ProtoSerializer protoSerializer; + private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink"); + + public KafkaProtoSerializer(ProtoSerializer protoSerializer) { + this(protoSerializer, ""); + } + + public KafkaProtoSerializer(ProtoSerializer protoSerializer, String outputTopic) { + this.protoSerializer = protoSerializer; + this.outputTopic = outputTopic; + } + + @Override + public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception { + KafkaRecordSerializationSchema.super.open(context, sinkContext); + } + + @Override + public ProducerRecord serialize(Row row, KafkaSinkContext context, Long timestamp) { + if (Objects.isNull(outputTopic) || outputTopic.equals("")) { + throw new DaggerSerializationException("outputTopic is required"); + } + LOGGER.info("row to kafka: " + row); + byte[] key = protoSerializer.serializeKey(row); + byte[] message = protoSerializer.serializeValue(row); + return new ProducerRecord<>(outputTopic, key, message); + } +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializer.java b/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializer.java index 7c5ea6c85..a3881fa39 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializer.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializer.java @@ -1,78 +1,38 @@ package io.odpf.dagger.common.serde.proto.serialization; -import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.types.Row; - import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.exceptions.DescriptorNotFoundException; import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException; import io.odpf.dagger.common.exceptions.serde.InvalidColumnMappingException; +import org.apache.flink.types.Row; import io.odpf.dagger.common.serde.typehandler.TypeHandler; import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.Arrays; import java.util.Objects; -public class ProtoSerializer implements KafkaRecordSerializationSchema { - private String[] columnNames; - private StencilClientOrchestrator stencilClientOrchestrator; - private String keyProtoClassName; - private String messageProtoClassName; - private String outputTopic; - private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink"); +public class ProtoSerializer implements Serializable { + + private final String keyProtoClassName; + private final String[] columnNames; + private final StencilClientOrchestrator stencilClientOrchestrator; + private final String messageProtoClassName; - /** - * Instantiates a new Proto serializer with specified output topic name. - * - * @param keyProtoClassName the key proto class name - * @param messageProtoClassName the message proto class name - * @param columnNames the column names - * @param stencilClientOrchestrator the stencil client orchestrator - */ public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator) { - if (Objects.isNull(messageProtoClassName)) { - throw new DaggerSerializationException("messageProtoClassName is required"); - } this.keyProtoClassName = keyProtoClassName; - this.messageProtoClassName = messageProtoClassName; this.columnNames = columnNames; this.stencilClientOrchestrator = stencilClientOrchestrator; + this.messageProtoClassName = messageProtoClassName; + checkValidity(); } - /** - * Instantiates a new Proto serializer with specified output topic name. - * - * @param keyProtoClassName the key proto class name - * @param messageProtoClassName the message proto class name - * @param columnNames the column names - * @param stencilClientOrchestrator the stencil client orchestrator - * @param outputTopic the output topic - */ - public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, String outputTopic) { - this(keyProtoClassName, messageProtoClassName, columnNames, stencilClientOrchestrator); - this.outputTopic = outputTopic; - } - - @Override - public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception { - KafkaRecordSerializationSchema.super.open(context, sinkContext); - } - - @Override - public ProducerRecord serialize(Row row, KafkaSinkContext context, Long timestamp) { - if (Objects.isNull(outputTopic) || outputTopic.equals("")) { - throw new DaggerSerializationException("outputTopic is required"); + private void checkValidity() { + if (Objects.isNull(messageProtoClassName) || messageProtoClassName.isEmpty()) { + throw new DaggerSerializationException("messageProtoClassName is required"); } - LOGGER.info("row to kafka: " + row); - byte[] key = serializeKey(row); - byte[] message = serializeValue(row); - return new ProducerRecord<>(outputTopic, key, message); } /** @@ -82,16 +42,10 @@ public ProducerRecord serialize(Row row, KafkaSinkContext contex * @return the byte [ ] */ public byte[] serializeKey(Row row) { - return (Objects.isNull(keyProtoClassName) || keyProtoClassName.equals("")) ? null + return (Objects.isNull(keyProtoClassName) || keyProtoClassName.isEmpty()) ? null : parse(row, getDescriptor(keyProtoClassName)).toByteArray(); } - /** - * Serialize value message. - * - * @param row the row - * @return the byte [ ] - */ public byte[] serializeValue(Row row) { return parse(row, getDescriptor(messageProtoClassName)).toByteArray(); } @@ -117,6 +71,14 @@ private DynamicMessage parse(Row element, Descriptors.Descriptor descriptor) { return builder.build(); } + private Descriptors.Descriptor getDescriptor(String className) { + Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className); + if (dsc == null) { + throw new DescriptorNotFoundException(); + } + return dsc; + } + private DynamicMessage.Builder populateNestedBuilder(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames, DynamicMessage.Builder parentBuilder, Object data) { String childColumnName = nestedColumnNames[0]; Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName); @@ -153,12 +115,4 @@ private DynamicMessage.Builder populateBuilder(DynamicMessage.Builder builder, D return builder; } - - private Descriptors.Descriptor getDescriptor(String className) { - Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className); - if (dsc == null) { - throw new DescriptorNotFoundException(); - } - return dsc; - } } diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/core/StencilClientOrchestratorTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/core/StencilClientOrchestratorTest.java index f9d0e01a9..1cf6ed783 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/core/StencilClientOrchestratorTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/core/StencilClientOrchestratorTest.java @@ -37,7 +37,7 @@ private Configuration getConfig(Map mapConfig) { public void shouldReturnClassLoadStencilClientIfStencilDisabled() throws NoSuchFieldException, IllegalAccessException { when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT); when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT); - when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT); + when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); stencilClient = stencilClientOrchestrator.getStencilClient(); @@ -53,7 +53,7 @@ public void shouldReturnMultiURLStencilClient() throws NoSuchFieldException, Ill when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest," + "http://localhost/events/latest," + "http://localhost/entities/release"); - when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT); + when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); stencilClient = stencilClientOrchestrator.getStencilClient(); @@ -66,7 +66,7 @@ public void shouldReturnMultiURLStencilClient() throws NoSuchFieldException, Ill @Test public void shouldEnrichStencilClient() throws NoSuchFieldException, IllegalAccessException { when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(true); - when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT); + when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT); when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,"); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); StencilClient oldStencilClient = stencilClientOrchestrator.getStencilClient(); @@ -93,7 +93,7 @@ public void shouldEnrichStencilClient() throws NoSuchFieldException, IllegalAcce public void shouldNotEnrichIfNoNewAdditionalURLsAdded() throws NoSuchFieldException, IllegalAccessException { when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(true); when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,"); - when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT); + when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); StencilClient oldStencilClient = stencilClientOrchestrator.getStencilClient(); @@ -118,7 +118,7 @@ public void shouldNotEnrichIfNoNewAdditionalURLsAdded() throws NoSuchFieldExcept public void shouldReturnClassLoadStencilClientWhenStencilDisabledAndEnrichmentStencilUrlsIsNotNull() throws NoSuchFieldException, IllegalAccessException { when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT); when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT); - when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT); + when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); List enrichmentStencilURLs = Collections @@ -138,13 +138,13 @@ public void shouldReturnDefaultTimeoutIfTimeoutMsConfigNotSet() { Configuration config = getConfig(configMap); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(config); StencilConfig stencilConfig = stencilClientOrchestrator.createStencilConfig(); - assertEquals(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT, stencilConfig.getFetchTimeoutMs()); + assertEquals(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT, stencilConfig.getFetchTimeoutMs()); } @Test public void shouldReturnConfiguredTimeoutIfTimeoutMsConfigIsSet() { Map configMap = new HashMap() {{ - put(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, "8000"); + put(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, "8000"); }}; Configuration config = getConfig(configMap); StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(config); diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializerTest.java new file mode 100644 index 000000000..c6a37c794 --- /dev/null +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/KafkaProtoSerializerTest.java @@ -0,0 +1,53 @@ +package io.odpf.dagger.common.serde.proto.serialization; + +import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException; +import org.apache.flink.types.Row; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +public class KafkaProtoSerializerTest { + + @Test + public void shouldSerializeIntoKafkaRecord() { + ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class); + String outputTopic = "test"; + Row element = new Row(1); + element.setField(0, "testing"); + byte[] keyBytes = "key".getBytes(); + byte[] valueBytes = "value".getBytes(); + Mockito.when(serializer.serializeKey(element)).thenReturn(keyBytes); + Mockito.when(serializer.serializeValue(element)).thenReturn(valueBytes); + KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, outputTopic); + ProducerRecord record = kafkaProtoSerializer.serialize(element, null, null); + ProducerRecord expectedRecord = new ProducerRecord<>("test", keyBytes, valueBytes); + Assert.assertEquals(expectedRecord, record); + } + + @Test + public void shouldThrowExceptionWhenOutputTopicIsNullForSerializeMethod() { + ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class); + KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, null); + Row element = new Row(1); + element.setField(0, "1234"); + DaggerSerializationException exception = assertThrows(DaggerSerializationException.class, + () -> kafkaProtoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); + assertEquals("outputTopic is required", exception.getMessage()); + } + + @Test + public void shouldThrowExceptionWhenOutputTopicIsEmptyForSerializeMethod() { + ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class); + KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, ""); + Row element = new Row(1); + element.setField(0, "1234"); + + DaggerSerializationException exception = assertThrows(DaggerSerializationException.class, + () -> kafkaProtoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); + assertEquals("outputTopic is required", exception.getMessage()); + } +} diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializerTest.java b/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializerTest.java index 7ed1f04b2..367d6e5a4 100644 --- a/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializerTest.java +++ b/dagger-common/src/test/java/io/odpf/dagger/common/serde/proto/serialization/ProtoSerializerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.common.serde.proto.serialization; +import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException; import io.odpf.dagger.common.exceptions.serde.InvalidDataTypeException; import org.apache.flink.types.Row; @@ -8,7 +9,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.exceptions.DescriptorNotFoundException; -import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException; import io.odpf.dagger.common.exceptions.serde.InvalidColumnMappingException; import io.odpf.dagger.consumer.TestBookingLogMessage; import io.odpf.dagger.consumer.TestEnrichedBookingLogMessage; @@ -16,7 +16,6 @@ import io.odpf.dagger.consumer.TestSerDeLogKey; import io.odpf.dagger.consumer.TestSerDeLogMessage; import io.odpf.dagger.consumer.TestServiceType; -import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -34,8 +33,6 @@ public class ProtoSerializerTest { @Mock private StencilClientOrchestrator stencilClientOrchestrator; - private final String outputTopic = "test-topic"; - @Before public void setup() { initMocks(this); @@ -48,7 +45,7 @@ public void shouldSerializeKeyForProto() throws InvalidProtocolBufferException { String[] columnNames = {"window_start_time", "window_end_time", "s2_id_level", "s2_id", "service_type"}; String outputProtoKey = "io.odpf.dagger.consumer.TestSerDeLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestSerDeLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); long seconds = System.currentTimeMillis() / 1000; @@ -65,9 +62,9 @@ public void shouldSerializeKeyForProto() throws InvalidProtocolBufferException { element.setField(3, 3322909458387959808L); element.setField(4, TestServiceType.Enum.GO_RIDE); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, seconds); + byte[] keyBytes = serializer.serializeKey(element); - TestSerDeLogKey actualKey = TestSerDeLogKey.parseFrom(producerRecord.key()); + TestSerDeLogKey actualKey = TestSerDeLogKey.parseFrom(keyBytes); assertEquals(expectedTimestamp, actualKey.getWindowStartTime()); assertEquals(expectedTimestamp, actualKey.getWindowEndTime()); @@ -83,7 +80,7 @@ public void shouldSerializeMessageProto() throws InvalidProtocolBufferException "event_timestamp", "string_type", "bool_type", "message_type", "repeated_message_type", "map_type"}; String outputProtoKey = "io.odpf.dagger.consumer.TestSerDeLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestSerDeLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); long seconds = System.currentTimeMillis() / 1000; Row element = new Row(12); @@ -115,9 +112,9 @@ public void shouldSerializeMessageProto() throws InvalidProtocolBufferException put("key", "value"); }}); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, seconds); + byte[] valueBytes = serializer.serializeValue(element); - TestSerDeLogMessage actualMessage = TestSerDeLogMessage.parseFrom(producerRecord.value()); + TestSerDeLogMessage actualMessage = TestSerDeLogMessage.parseFrom(valueBytes); assertEquals(expectedTimestamp, actualMessage.getWindowStartTime()); assertEquals(expectedTimestamp, actualMessage.getWindowEndTime()); @@ -142,15 +139,15 @@ public void shouldSerializeDataForOneFieldInNestedProtoWhenMappedFromQuery() thr String[] columnNames = {"customer_profile.customer_id"}; String outputProtoKey = "io.odpf.dagger.consumer.TestEnrichedBookingLogMessage"; String outputProtoMessage = "io.odpf.dagger.consumer.TestEnrichedBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(1); element.setField(0, "test-id"); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] value = serializer.serializeValue(element); - TestEnrichedBookingLogMessage actualValue = TestEnrichedBookingLogMessage.parseFrom(producerRecord.value()); + TestEnrichedBookingLogMessage actualValue = TestEnrichedBookingLogMessage.parseFrom(value); assertEquals("test-id", actualValue.getCustomerProfile().getCustomerId()); } @@ -160,7 +157,7 @@ public void shouldSerializeDataForMultipleFieldsInSameNestedProtoWhenMappedFromQ String[] columnNames = {"customer_profile.name", "customer_profile.email", "customer_profile.phone_verified"}; String outputProtoKey = "io.odpf.dagger.consumer.TestEnrichedBookingLogMessage"; String outputProtoMessage = "io.odpf.dagger.consumer.TestEnrichedBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(3); @@ -168,9 +165,9 @@ public void shouldSerializeDataForMultipleFieldsInSameNestedProtoWhenMappedFromQ element.setField(1, "test_email@go-jek.com"); element.setField(2, true); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] valueBytes = serializer.serializeValue(element); - TestEnrichedBookingLogMessage actualValue = TestEnrichedBookingLogMessage.parseFrom(producerRecord.value()); + TestEnrichedBookingLogMessage actualValue = TestEnrichedBookingLogMessage.parseFrom(valueBytes); assertEquals("test-name", actualValue.getCustomerProfile().getName()); assertEquals("test_email@go-jek.com", actualValue.getCustomerProfile().getEmail()); @@ -182,7 +179,7 @@ public void shouldSerializeDataForMultipleFieldsInDifferentNestedProtoWhenMapped String[] columnNames = {"order_number", "service_type", "customer_price", "customer_total_fare_without_surge", "driver_pickup_location.name", "driver_pickup_location.latitude"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(6); @@ -193,9 +190,9 @@ public void shouldSerializeDataForMultipleFieldsInDifferentNestedProtoWhenMapped element.setField(4, "driver_name"); element.setField(5, 876D); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] valueBytes = serializer.serializeValue(element); - TestBookingLogMessage actualValue = TestBookingLogMessage.parseFrom(producerRecord.value()); + TestBookingLogMessage actualValue = TestBookingLogMessage.parseFrom(valueBytes); assertEquals("order_number", actualValue.getOrderNumber()); assertEquals(TestServiceType.Enum.GO_RIDE, actualValue.getServiceType()); @@ -210,13 +207,13 @@ public void shouldThrowExceptionWhenColumnDoesNotExists() { String[] columnNames = {"order_number", "driver_pickup_location.invalid"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(2); element.setField(0, "order_number"); element.setField(1, 876D); InvalidColumnMappingException exception = assertThrows(InvalidColumnMappingException.class, - () -> protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); + () -> serializer.serializeValue(element)); assertEquals("column invalid doesn't exists in the proto of io.odpf.dagger.consumer.TestLocation", exception.getMessage()); @@ -227,14 +224,14 @@ public void shouldMapOtherFieldsWhenOneOfTheFirstFieldIsInvalidForANestedFieldIn String[] columnNames = {"blah.invalid", "customer_email"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(2); element.setField(0, "order_number"); element.setField(1, "customer_email@go-jek.com"); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] valueBytes = serializer.serializeValue(element); - assertEquals("customer_email@go-jek.com", TestBookingLogMessage.parseFrom(producerRecord.value()).getCustomerEmail()); + assertEquals("customer_email@go-jek.com", TestBookingLogMessage.parseFrom(valueBytes).getCustomerEmail()); } @Test @@ -242,13 +239,13 @@ public void shouldMapEmptyDataWhenFieldIsInvalidInTheQuery() { String[] columnNames = {"invalid"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(1); element.setField(0, "order_number"); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] valueBytes = serializer.serializeValue(element); - assertEquals(0, producerRecord.value().length); + assertEquals(0, valueBytes.length); } @Test @@ -256,14 +253,14 @@ public void shouldMapOtherFieldsWhenOneOfTheFieldIsInvalidInTheQuery() throws In String[] columnNames = {"invalid", "order_number"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(2); element.setField(0, "some_data"); element.setField(1, "order_number"); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] valueBytes = serializer.serializeValue(element); - assertEquals("order_number", TestBookingLogMessage.parseFrom(producerRecord.value()).getOrderNumber()); + assertEquals("order_number", TestBookingLogMessage.parseFrom(valueBytes).getOrderNumber()); } @Test @@ -271,12 +268,12 @@ public void shouldNotThrowExceptionWhenPrimitiveTypeCanBeCasted() throws Invalid String[] columnNames = {"order_number"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(1); element.setField(0, 1234); - ProducerRecord testBookingLogMessage = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); - assertEquals("1234", TestBookingLogMessage.parseFrom(testBookingLogMessage.value()).getOrderNumber()); + byte[] testBookingLogMessage = serializer.serializeValue(element); + assertEquals("1234", TestBookingLogMessage.parseFrom(testBookingLogMessage).getOrderNumber()); } @Test @@ -284,12 +281,12 @@ public void shouldThrowExceptionWhenPrimitiveTypeCanNotBeCasted() { String[] columnNames = {"customer_price"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(1); element.setField(0, "invalid_number"); InvalidDataTypeException exception = assertThrows(InvalidDataTypeException.class, - () -> protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); + () -> serializer.serializeValue(element)); assertEquals("type mismatch of field: customer_price, expecting DOUBLE type, actual type class java.lang.String", exception.getMessage()); } @@ -300,12 +297,12 @@ public void shouldHandleRepeatedTypeWhenTypeDoesNotMatch() { String[] columnNames = {"meta_array"}; String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(1); element.setField(0, 1234); InvalidColumnMappingException exception = assertThrows(InvalidColumnMappingException.class, - () -> protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); + () -> serializer.serializeValue(element)); assertEquals("column invalid: type mismatch of column meta_array, expecting REPEATED STRING type. Actual type class java.lang.Integer", exception.getMessage()); } @@ -314,7 +311,7 @@ public void shouldHandleRepeatedTypeWhenTypeDoesNotMatch() { public void shouldSerializeMessageWhenOnlyMessageProtoProvided() throws InvalidProtocolBufferException { String[] columnNames = {"order_number", "driver_id"}; String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(null, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(null, outputProtoMessage, columnNames, stencilClientOrchestrator); String orderNumber = "RB-1234"; @@ -322,90 +319,77 @@ public void shouldSerializeMessageWhenOnlyMessageProtoProvided() throws InvalidP element.setField(0, orderNumber); element.setField(1, "DR-124"); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); - TestBookingLogMessage actualMessage = TestBookingLogMessage.parseFrom(producerRecord.value()); + byte[] valueBytes = serializer.serializeValue(element); + TestBookingLogMessage actualMessage = TestBookingLogMessage.parseFrom(valueBytes); assertEquals(orderNumber, actualMessage.getOrderNumber()); } - @Test - public void shouldThrowExceptionWhenMessageProtoIsNotProvided() { - - String[] columnNames = {}; - DaggerSerializationException exception = assertThrows(DaggerSerializationException.class, - () -> new ProtoSerializer(null, null, columnNames, stencilClientOrchestrator, outputTopic)); - assertEquals("messageProtoClassName is required", exception.getMessage()); - - } - @Test public void shouldReturnNullKeyWhenOnlyMessageProtoProvided() { String[] columnNames = {"s2_id_level"}; String protoMessage = "io.odpf.dagger.consumer.TestSerDeLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(null, protoMessage, columnNames, - stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(null, protoMessage, columnNames, + stencilClientOrchestrator); Row element = new Row(1); element.setField(0, 13); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] valueBytes = serializer.serializeValue(element); + byte[] keyBytes = serializer.serializeKey(element); - assertNull(producerRecord.key()); - assertNotNull(producerRecord.value()); + assertNull(keyBytes); + assertNotNull(valueBytes); } @Test public void shouldReturnNullKeyWhenKeyIsEmptyString() { String[] columnNames = {"s2_id_level"}; String protoMessage = "io.odpf.dagger.consumer.TestSerDeLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer("", protoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer("", protoMessage, columnNames, stencilClientOrchestrator); Row element = new Row(1); element.setField(0, 13); - ProducerRecord producerRecord = protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + byte[] keyBytes = serializer.serializeKey(element); + byte[] valueBytes = serializer.serializeValue(element); - assertNull(producerRecord.key()); - assertNotNull(producerRecord.value()); + assertNull(keyBytes); + assertNotNull(valueBytes); } @Test(expected = DescriptorNotFoundException.class) public void shouldThrowDescriptorNotFoundException() { String[] columnNames = {"s2_id_level"}; String protoMessage = "RandomMessageClass"; - ProtoSerializer protoSerializer = new ProtoSerializer(null, protoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer serializer = new ProtoSerializer(null, protoMessage, columnNames, stencilClientOrchestrator); int s2IdLevel = 13; Row element = new Row(1); element.setField(0, s2IdLevel); - protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000); + serializer.serializeValue(element); } - @Test - public void shouldThrowExceptionWhenOutputTopicIsNullForSerializeMethod() { - String[] columnNames = {"order_number"}; - String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; - String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, null); + @Test(expected = DaggerSerializationException.class) + public void shouldThrowExceptionIfTheValueMessageClassIsMissing() { + String[] columnNames = {"s2_id_level"}; + ProtoSerializer serializer = new ProtoSerializer("keyMessage", null, columnNames, stencilClientOrchestrator); + Row element = new Row(1); - element.setField(0, "1234"); - DaggerSerializationException exception = assertThrows(DaggerSerializationException.class, - () -> protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); - assertEquals("outputTopic is required", exception.getMessage()); + element.setField(0, 13); + + serializer.serializeValue(element); } - @Test - public void shouldThrowExceptionWhenOutputTopicIsEmptyForSerializeMethod() { - String[] columnNames = {"order_number"}; - String outputProtoKey = "io.odpf.dagger.consumer.TestBookingLogKey"; - String outputProtoMessage = "io.odpf.dagger.consumer.TestBookingLogMessage"; - ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, ""); + @Test(expected = DaggerSerializationException.class) + public void shouldThrowExceptionIfTheValueMessageClassIsEmpty() { + String[] columnNames = {"s2_id_level"}; + ProtoSerializer serializer = new ProtoSerializer("keyMessage", "", columnNames, stencilClientOrchestrator); + Row element = new Row(1); - element.setField(0, "1234"); + element.setField(0, 13); - DaggerSerializationException exception = assertThrows(DaggerSerializationException.class, - () -> protoSerializer.serialize(element, null, System.currentTimeMillis() / 1000)); - assertEquals("outputTopic is required", exception.getMessage()); + serializer.serializeValue(element); } } diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index fd2ffd90a..6d1d7404b 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -61,7 +61,7 @@ configurations { dependencies { minimalJar project(path: ':dagger-common', configuration: 'minimalCommonJar') minimalJar project(path: ':dagger-functions', configuration: 'minimalFunctionsJar') - minimalJar('io.odpf:depot:0.1.5') { + minimalJar('io.odpf:depot:0.1.6') { exclude group: 'org.apache.httpcomponents' exclude module: 'stencil', group: 'io.odpf' exclude group: 'com.google.protobuf' diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/ProtoDeserializerProvider.java b/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/ProtoDeserializerProvider.java index d20664f88..152b52195 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/ProtoDeserializerProvider.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/deserializer/ProtoDeserializerProvider.java @@ -5,26 +5,26 @@ import io.odpf.dagger.common.serde.DaggerDeserializer; import io.odpf.dagger.common.serde.DataTypes; import io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer; +import io.odpf.dagger.core.source.config.StreamConfig; import io.odpf.dagger.core.source.config.models.SourceDetails; import io.odpf.dagger.core.source.config.models.SourceName; -import io.odpf.dagger.core.source.config.StreamConfig; import org.apache.flink.types.Row; import java.util.Arrays; import java.util.HashSet; import static io.odpf.dagger.common.serde.DataTypes.PROTO; -import static io.odpf.dagger.core.source.config.models.SourceName.KAFKA_SOURCE; import static io.odpf.dagger.core.source.config.models.SourceName.KAFKA_CONSUMER; +import static io.odpf.dagger.core.source.config.models.SourceName.KAFKA_SOURCE; import static io.odpf.dagger.core.utils.Constants.FLINK_ROWTIME_ATTRIBUTE_NAME_DEFAULT; import static io.odpf.dagger.core.utils.Constants.FLINK_ROWTIME_ATTRIBUTE_NAME_KEY; public class ProtoDeserializerProvider implements DaggerDeserializerProvider { + private static final HashSet COMPATIBLE_SOURCES = new HashSet<>(Arrays.asList(KAFKA_SOURCE, KAFKA_CONSUMER)); + private static final DataTypes COMPATIBLE_INPUT_SCHEMA_TYPE = PROTO; protected final StreamConfig streamConfig; protected final Configuration configuration; protected final StencilClientOrchestrator stencilClientOrchestrator; - private static final HashSet COMPATIBLE_SOURCES = new HashSet<>(Arrays.asList(KAFKA_SOURCE, KAFKA_CONSUMER)); - private static final DataTypes COMPATIBLE_INPUT_SCHEMA_TYPE = PROTO; public ProtoDeserializerProvider(StreamConfig streamConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator) { this.streamConfig = streamConfig; diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/exception/BigqueryWriterException.java b/dagger-core/src/main/java/io/odpf/dagger/core/exception/BigqueryWriterException.java new file mode 100644 index 000000000..97ac7fb66 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/exception/BigqueryWriterException.java @@ -0,0 +1,14 @@ +package io.odpf.dagger.core.exception; + +import java.io.IOException; + +public class BigqueryWriterException extends IOException { + + public BigqueryWriterException(String message, Throwable cause) { + super(message, cause); + } + + public BigqueryWriterException(String message) { + super(message); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/LongbowFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/LongbowFactory.java index 35c778ee0..7c1a7f86c 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/LongbowFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/LongbowFactory.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.processors.longbow; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.types.Row; @@ -25,7 +26,6 @@ import io.odpf.dagger.core.processors.longbow.validator.LongbowValidator; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.PostProcessor; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import java.util.ArrayList; import java.util.Map; diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequest.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequest.java index ab0734ead..71f7631c0 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequest.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequest.java @@ -1,8 +1,8 @@ package io.odpf.dagger.core.processors.longbow.request; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.types.Row; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import io.odpf.dagger.core.processors.longbow.LongbowSchema; import io.odpf.dagger.core.processors.longbow.storage.PutRequest; import io.odpf.dagger.core.utils.Constants; @@ -20,10 +20,10 @@ public class ProtoBytePutRequest implements PutRequest { private static final byte[] COLUMN_FAMILY_NAME = Bytes.toBytes(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT); private static final byte[] QUALIFIER_NAME = Bytes.toBytes(Constants.LONGBOW_QUALIFIER_DEFAULT); - private LongbowSchema longbowSchema; - private Row input; - private ProtoSerializer protoSerializer; - private String tableId; + private final LongbowSchema longbowSchema; + private final Row input; + private final ProtoSerializer protoSerializer; + private final String tableId; /** diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactory.java index 2299076a2..32f5e8d6c 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactory.java @@ -1,10 +1,10 @@ package io.odpf.dagger.core.processors.longbow.request; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.types.Row; import io.odpf.dagger.core.processors.longbow.LongbowSchema; import io.odpf.dagger.core.processors.longbow.storage.PutRequest; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import java.io.Serializable; @@ -13,9 +13,9 @@ */ public class PutRequestFactory implements Serializable { - private LongbowSchema longbowSchema; - private ProtoSerializer protoSerializer; - private String tableId; + private final LongbowSchema longbowSchema; + private final ProtoSerializer protoSerializer; + private final String tableId; /** * Instantiates a new Put request factory. diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/SinkOrchestrator.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/SinkOrchestrator.java index 69f54611e..2df3c6491 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/SinkOrchestrator.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.sink; +import io.odpf.dagger.core.sink.bigquery.BigquerySinkBuilder; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -32,7 +33,7 @@ */ public class SinkOrchestrator implements TelemetryPublisher { private final MetricsTelemetryExporter telemetryExporter; - private Map> metrics; + private final Map> metrics; public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) { this.telemetryExporter = telemetryExporter; @@ -71,6 +72,13 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl case "log": sink = new LogSink(columnNames); break; + case "bigquery": + sink = BigquerySinkBuilder.create() + .setColumnNames(columnNames) + .setConfiguration(configuration) + .setStencilClientOrchestrator(stencilClientOrchestrator) + .build(); + break; default: sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), configuration, columnNames, new ErrorHandler()); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySink.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySink.java new file mode 100644 index 000000000..3c22bab3f --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySink.java @@ -0,0 +1,96 @@ +package io.odpf.dagger.core.sink.bigquery; + +import com.google.common.base.Splitter; +import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import io.odpf.dagger.core.metrics.reporters.ErrorReporter; +import io.odpf.dagger.core.metrics.reporters.ErrorReporterFactory; +import io.odpf.dagger.core.utils.Constants; +import io.odpf.depot.OdpfSink; +import io.odpf.depot.bigquery.BigQuerySinkFactory; +import io.odpf.depot.config.BigQuerySinkConfig; +import io.odpf.depot.error.ErrorType; +import org.aeonbits.owner.ConfigFactory; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class BigquerySink implements Sink { + private final ProtoSerializer protoSerializer; + private final Configuration configuration; + private transient BigQuerySinkFactory sinkFactory; + + protected BigquerySink(Configuration configuration, ProtoSerializer protoSerializer) { + this(configuration, protoSerializer, null); + } + + /** + * Constructor for testing. + */ + protected BigquerySink(Configuration configuration, ProtoSerializer protoSerializer, BigQuerySinkFactory sinkFactory) { + this.configuration = configuration; + this.protoSerializer = protoSerializer; + this.sinkFactory = sinkFactory; + } + + @Override + public SinkWriter createWriter(InitContext context, List states) { + ErrorReporter errorReporter = ErrorReporterFactory.getErrorReporter(context.metricGroup(), configuration); + if (sinkFactory == null) { + BigQuerySinkConfig sinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, configuration.getParam().toMap()); + sinkFactory = new BigQuerySinkFactory(sinkConfig); + try { + sinkFactory.init(); + } catch (Exception e) { + errorReporter.reportFatalException(e); + throw e; + } + } + OdpfSink odpfSink = sinkFactory.create(); + int batchSize = configuration.getInteger( + Constants.SINK_BIGQUERY_BATCH_SIZE, + Constants.SINK_BIGQUERY_BATCH_SIZE_DEFAULT); + String errorsForFailing = configuration.getString( + Constants.SINK_ERROR_TYPES_FOR_FAILURE, + Constants.SINK_ERROR_TYPES_FOR_FAILURE_DEFAULT); + Set errorTypesForFailing = new HashSet<>(); + for (String s : Splitter.on(",").omitEmptyStrings().split(errorsForFailing)) { + errorTypesForFailing.add(ErrorType.valueOf(s.trim())); + } + return new BigquerySinkWriter(protoSerializer, odpfSink, batchSize, errorReporter, errorTypesForFailing); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.empty(); + } + + @Override + public Optional> createCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional> createGlobalCommitter() throws IOException { + return Optional.empty(); + } + + @Override + public Optional> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + public Optional> getGlobalCommittableSerializer() { + return Optional.empty(); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilder.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilder.java new file mode 100644 index 000000000..57cc048b9 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilder.java @@ -0,0 +1,61 @@ +package io.odpf.dagger.core.sink.bigquery; + +import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.StencilClientOrchestrator; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import org.apache.flink.api.java.utils.ParameterTool; + +import java.util.HashMap; +import java.util.Map; + +public class BigquerySinkBuilder { + + private String[] columnNames; + private StencilClientOrchestrator stencilClientOrchestrator; + private Configuration configuration; + + private BigquerySinkBuilder() { + } + + public static BigquerySinkBuilder create() { + return new BigquerySinkBuilder(); + } + + public BigquerySink build() { + ProtoSerializer protoSerializer = new ProtoSerializer( + configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS", ""), + configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", ""), + columnNames, + stencilClientOrchestrator); + Configuration conf = setDefaultValues(configuration); + return new BigquerySink(conf, protoSerializer); + } + + private Configuration setDefaultValues(Configuration inputConf) { + Map configMap = new HashMap<>(inputConf.getParam().toMap()); + configMap.put("SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH", "false"); + configMap.put("SCHEMA_REGISTRY_STENCIL_CACHE_TTL_MS", "86400000"); + configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES", "4"); + configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS", "5000"); + configMap.put("SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY", "LONG_POLLING"); + configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS", "60000"); + configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS", ""); + return new Configuration(ParameterTool.fromMap(configMap)); + } + + public BigquerySinkBuilder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public BigquerySinkBuilder setColumnNames(String[] columnNames) { + this.columnNames = columnNames; + return this; + } + + public BigquerySinkBuilder setStencilClientOrchestrator(StencilClientOrchestrator stencilClientOrchestrator) { + this.stencilClientOrchestrator = stencilClientOrchestrator; + return this; + } + +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java new file mode 100644 index 000000000..2e914bbe6 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java @@ -0,0 +1,116 @@ +package io.odpf.dagger.core.sink.bigquery; + +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import io.odpf.dagger.core.exception.BigqueryWriterException; +import io.odpf.dagger.core.metrics.reporters.ErrorReporter; +import io.odpf.depot.OdpfSink; +import io.odpf.depot.OdpfSinkResponse; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import io.odpf.depot.exception.OdpfSinkException; +import io.odpf.depot.message.OdpfMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +public class BigquerySinkWriter implements SinkWriter { + private final ProtoSerializer protoSerializer; + private final OdpfSink bigquerySink; + private final int batchSize; + private final ErrorReporter errorReporter; + private final Set errorTypesForFailing; + private final List messages = new ArrayList<>(); + private int currentBatchSize; + + public BigquerySinkWriter(ProtoSerializer protoSerializer, OdpfSink bigquerySink, int batchSize, ErrorReporter errorReporter, Set errorTypesForFailing) { + this.protoSerializer = protoSerializer; + this.bigquerySink = bigquerySink; + this.batchSize = batchSize; + this.errorReporter = errorReporter; + this.errorTypesForFailing = errorTypesForFailing; + } + + @Override + public void write(Row element, Context context) throws IOException { + log.info("adding row to BQ batch : " + element); + byte[] key = protoSerializer.serializeKey(element); + byte[] value = protoSerializer.serializeValue(element); + OdpfMessage message = new OdpfMessage(key, value); + if (currentBatchSize < batchSize) { + messages.add(message); + currentBatchSize++; + } + if (currentBatchSize >= batchSize) { + pushToBq(); + messages.clear(); + currentBatchSize = 0; + } + } + + private void pushToBq() throws OdpfSinkException, BigqueryWriterException { + log.info("Pushing " + currentBatchSize + " records to bq"); + OdpfSinkResponse odpfSinkResponse; + try { + odpfSinkResponse = bigquerySink.pushToSink(messages); + } catch (Exception e) { + errorReporter.reportFatalException(e); + throw e; + } + if (odpfSinkResponse.hasErrors()) { + logErrors(odpfSinkResponse, messages); + checkAndThrow(odpfSinkResponse); + } + } + + protected void checkAndThrow(OdpfSinkResponse sinkResponse) throws BigqueryWriterException { + Map> failedErrorTypes = sinkResponse.getErrors().values().stream().collect( + Collectors.partitioningBy(errorInfo -> errorTypesForFailing.contains(errorInfo.getErrorType()))); + failedErrorTypes.get(Boolean.FALSE).forEach(errorInfo -> { + errorReporter.reportNonFatalException(errorInfo.getException()); + }); + failedErrorTypes.get(Boolean.TRUE).forEach(errorInfo -> { + errorReporter.reportFatalException(errorInfo.getException()); + }); + if (failedErrorTypes.get(Boolean.TRUE).size() > 0) { + throw new BigqueryWriterException("Error occurred during writing to Bigquery"); + } + } + + protected void logErrors(OdpfSinkResponse sinkResponse, List sentMessages) { + log.error("Failed to push " + sinkResponse.getErrors().size() + " records to BigquerySink"); + sinkResponse.getErrors().forEach((index, errorInfo) -> { + OdpfMessage message = sentMessages.get(index.intValue()); + log.error("Failed to pushed message with metadata {}. The exception was {}. The ErrorType was {}", + message.getMetadataString(), + errorInfo.getException().getMessage(), + errorInfo.getErrorType().name()); + }); + + } + + @Override + public List prepareCommit(boolean flush) throws IOException, InterruptedException { + return null; + } + + @Override + public void close() throws Exception { + bigquerySink.close(); + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + try { + pushToBq(); + } catch (Exception exception) { + errorReporter.reportFatalException(exception); + throw exception; + } + return Collections.emptyList(); + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilder.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilder.java index 600b1fb0b..a48d2a17d 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilder.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilder.java @@ -1,10 +1,11 @@ package io.odpf.dagger.core.sink.kafka.builder; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import io.odpf.dagger.common.serde.proto.serialization.KafkaProtoSerializer; import io.odpf.dagger.core.metrics.telemetry.TelemetryPublisher; import io.odpf.dagger.core.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.core.sink.kafka.KafkaSerializerBuilder; @@ -39,7 +40,8 @@ public KafkaRecordSerializationSchema build() { addMetric(TelemetryTypes.OUTPUT_STREAM.getValue(), outputStream); notifySubscriber(); - return new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator, outputTopic); + ProtoSerializer protoSerializer = new ProtoSerializer(outputProtoKey, outputProtoMessage, columnNames, stencilClientOrchestrator); + return new KafkaProtoSerializer(protoSerializer, outputTopic); } @Override diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java b/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java index 892aed002..ff1622930 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java @@ -166,4 +166,10 @@ public class Constants { public static final long MAX_EVENT_LOOP_EXECUTE_TIME_DEFAULT = 10000; public static final int LONGBOW_OUTPUT_ADDITIONAL_ARITY = 3; public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIABLES, QUERY_VARIABLES, ENDPOINT_VARIABLE }; + + public static final String SINK_BIGQUERY_BATCH_SIZE = "SINK_BIGQUERY_BATCH_SIZE"; + public static final int SINK_BIGQUERY_BATCH_SIZE_DEFAULT = 500; + // Comma seperated error types + public static final String SINK_ERROR_TYPES_FOR_FAILURE = "SINK_ERROR_TYPES_FOR_FAILURE"; + public static final String SINK_ERROR_TYPES_FOR_FAILURE_DEFAULT = ""; } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/processor/LongbowWriterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/processor/LongbowWriterTest.java index da9fe73aa..2d6313ea6 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/processor/LongbowWriterTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/processor/LongbowWriterTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.processors.longbow.processor; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; @@ -16,7 +17,6 @@ import io.odpf.dagger.core.processors.longbow.request.PutRequestFactory; import io.odpf.dagger.core.processors.longbow.storage.LongbowStore; import io.odpf.dagger.core.processors.longbow.storage.PutRequest; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequestTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequestTest.java index a93546880..457acc48a 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequestTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/ProtoBytePutRequestTest.java @@ -1,9 +1,9 @@ package io.odpf.dagger.core.processors.longbow.request; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.types.Row; import io.odpf.dagger.core.processors.longbow.LongbowSchema; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactoryTest.java index 005733cbc..c4935b044 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/longbow/request/PutRequestFactoryTest.java @@ -1,10 +1,10 @@ package io.odpf.dagger.core.processors.longbow.request; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.apache.flink.types.Row; import io.odpf.dagger.core.processors.longbow.LongbowSchema; import io.odpf.dagger.core.processors.longbow.storage.PutRequest; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/SinkOrchestratorTest.java index 595f81cfa..54d9ae481 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/SinkOrchestratorTest.java @@ -1,6 +1,8 @@ package io.odpf.dagger.core.sink; +import io.odpf.dagger.core.sink.bigquery.BigquerySink; import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaSink; import io.odpf.dagger.common.configuration.Configuration; @@ -12,10 +14,7 @@ import org.junit.Test; import org.mockito.Mock; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; +import java.util.*; import static io.odpf.dagger.common.core.Constants.*; import static io.odpf.dagger.core.utils.Constants.SINK_KAFKA_BROKERS_KEY; @@ -104,4 +103,13 @@ public void shouldReturnSinkMetrics() { sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator); assertEquals(expectedMetrics, sinkOrchestrator.getTelemetry()); } + + @Test + public void shouldReturnBigquerySink() { + when(configuration.getString(eq("SINK_TYPE"), anyString())).thenReturn("bigquery"); + when(configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "")).thenReturn("some.class"); + when(configuration.getParam()).thenReturn(ParameterTool.fromMap(Collections.emptyMap())); + Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator); + assertThat(sinkFunction, instanceOf(BigquerySink.class)); + } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilderTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilderTest.java new file mode 100644 index 000000000..ca93ac269 --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkBuilderTest.java @@ -0,0 +1,26 @@ +package io.odpf.dagger.core.sink.bigquery; + +import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.StencilClientOrchestrator; +import org.apache.flink.api.java.utils.ParameterTool; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.HashMap; + + +public class BigquerySinkBuilderTest { + + @Test + public void shouldBuildBigquerySink() { + StencilClientOrchestrator stencilClientOrchestrator = Mockito.mock(StencilClientOrchestrator.class); + BigquerySinkBuilder builder = BigquerySinkBuilder.create(); + builder.setColumnNames(new String[]{"test", "some_column"}); + builder.setConfiguration(new Configuration(ParameterTool.fromMap(new HashMap() {{ + put("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", "test"); + }}))); + builder.setStencilClientOrchestrator(stencilClientOrchestrator); + Assert.assertNotNull(builder.build()); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkTest.java new file mode 100644 index 000000000..42dfc491e --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkTest.java @@ -0,0 +1,53 @@ +package io.odpf.dagger.core.sink.bigquery; + +import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import io.odpf.depot.OdpfSink; +import io.odpf.depot.bigquery.BigQuerySinkFactory; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class BigquerySinkTest { + + @Test + public void shouldReturnCommittersAndSerializer() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + BigQuerySinkFactory sinkFactory = Mockito.mock(BigQuerySinkFactory.class); + Configuration configuration = new Configuration(ParameterTool.fromMap(Collections.emptyMap())); + BigquerySink sink = new BigquerySink(configuration, protoSerializer, sinkFactory); + Assert.assertEquals(Optional.empty(), sink.createCommitter()); + Assert.assertEquals(Optional.empty(), sink.getWriterStateSerializer()); + Assert.assertEquals(Optional.empty(), sink.createGlobalCommitter()); + Assert.assertEquals(Optional.empty(), sink.getCommittableSerializer()); + Assert.assertEquals(Optional.empty(), sink.getGlobalCommittableSerializer()); + } + + @Test + public void shouldCreateSinkWriter() { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + BigQuerySinkFactory sinkFactory = Mockito.mock(BigQuerySinkFactory.class); + Sink.InitContext context = Mockito.mock(Sink.InitContext.class); + SinkWriterMetricGroup metricGroup = Mockito.mock(SinkWriterMetricGroup.class); + Mockito.when(context.metricGroup()).thenReturn(metricGroup); + OdpfSink odpfSink = Mockito.mock(OdpfSink.class); + Map configMap = new HashMap<>(); + Configuration configuration = new Configuration(ParameterTool.fromMap(configMap)); + Mockito.when(sinkFactory.create()).thenReturn(odpfSink); + BigquerySink sink = new BigquerySink(configuration, protoSerializer, sinkFactory); + SinkWriter writer = sink.createWriter(context, null); + Assert.assertTrue(writer instanceof BigquerySinkWriter); + Mockito.verify(sinkFactory, Mockito.times(1)).create(); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java new file mode 100644 index 000000000..efe4e19fa --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java @@ -0,0 +1,152 @@ +package io.odpf.dagger.core.sink.bigquery; + +import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import io.odpf.dagger.core.metrics.reporters.ErrorReporter; +import io.odpf.depot.OdpfSink; +import io.odpf.depot.OdpfSinkResponse; +import io.odpf.depot.error.ErrorInfo; +import io.odpf.depot.error.ErrorType; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.*; + +public class BigquerySinkWriterTest { + + @Test + public void shouldWriteToOdpfSinkInBatches() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, null, null); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(response.hasErrors()).thenReturn(false); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenReturn(response); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + Mockito.verify(sink, Mockito.times(2)).pushToSink(Mockito.anyList()); + Mockito.verify(response, Mockito.times(2)).hasErrors(); + } + + @Test + public void shouldNotWriteIfCurrentSizeIsLessThanTheBatchSize() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 10, null, null); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + Mockito.verify(sink, Mockito.times(0)).pushToSink(Mockito.anyList()); + } + + @Test + public void shouldThrowExceptionWhenSinkResponseHasErrors() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + ErrorReporter reporter = Mockito.mock(ErrorReporter.class); + Set errorTypesForFailing = new HashSet() {{ + add(ErrorType.DESERIALIZATION_ERROR); + add(ErrorType.SINK_4XX_ERROR); + }}; + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, reporter, errorTypesForFailing); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(response.hasErrors()).thenReturn(true); + Map errorInfoMap = new HashMap() {{ + put(1L, new ErrorInfo(new Exception("test1"), ErrorType.DESERIALIZATION_ERROR)); + put(2L, new ErrorInfo(new Exception("test2"), ErrorType.SINK_4XX_ERROR)); + }}; + Mockito.when(response.getErrors()).thenReturn(errorInfoMap); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenReturn(response); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + IOException ioException = Assert.assertThrows(IOException.class, () -> bigquerySinkWriter.write(row, null)); + Assert.assertEquals("Error occurred during writing to Bigquery", ioException.getMessage()); + Mockito.verify(sink, Mockito.times(1)).pushToSink(Mockito.anyList()); + Mockito.verify(response, Mockito.times(1)).hasErrors(); + Mockito.verify(reporter, Mockito.times(0)).reportNonFatalException(Mockito.any()); + Mockito.verify(reporter, Mockito.times(1)).reportFatalException(errorInfoMap.get(1L).getException()); + Mockito.verify(reporter, Mockito.times(1)).reportFatalException(errorInfoMap.get(2L).getException()); + } + + @Test + public void shouldNotThrowExceptionIfErrorTypeNotConfigured() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + ErrorReporter reporter = Mockito.mock(ErrorReporter.class); + Set errorTypesForFailing = Collections.emptySet(); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, reporter, errorTypesForFailing); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(response.hasErrors()).thenReturn(true); + Map errorInfoMap = new HashMap() {{ + put(1L, new ErrorInfo(new Exception("test1"), ErrorType.DESERIALIZATION_ERROR)); + put(2L, new ErrorInfo(new Exception("test2"), ErrorType.SINK_4XX_ERROR)); + }}; + Mockito.when(response.getErrors()).thenReturn(errorInfoMap); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenReturn(response); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + Mockito.verify(sink, Mockito.times(1)).pushToSink(Mockito.anyList()); + Mockito.verify(response, Mockito.times(1)).hasErrors(); + Mockito.verify(reporter, Mockito.times(0)).reportFatalException(Mockito.any()); + Mockito.verify(reporter, Mockito.times(1)).reportNonFatalException(errorInfoMap.get(1L).getException()); + Mockito.verify(reporter, Mockito.times(1)).reportNonFatalException(errorInfoMap.get(2L).getException()); + } + + @Test + public void shouldCallClose() throws Exception { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, null, null); + bigquerySinkWriter.close(); + Mockito.verify(sink, Mockito.times(1)).close(); + } + + @Test + public void shouldReportExceptionThrownFromSinkConnector() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + ErrorReporter reporter = Mockito.mock(ErrorReporter.class); + Set errorTypesForFailing = Collections.emptySet(); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, reporter, errorTypesForFailing); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenThrow(new RuntimeException("test")); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + RuntimeException thrown = Assert.assertThrows(RuntimeException.class, () -> bigquerySinkWriter.write(row, null)); + Assert.assertEquals("test", thrown.getMessage()); + Mockito.verify(sink, Mockito.times(1)).pushToSink(Mockito.anyList()); + Mockito.verify(response, Mockito.times(0)).hasErrors(); + Mockito.verify(reporter, Mockito.times(1)).reportFatalException(thrown); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilderTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilderTest.java index 1285f5a18..137644827 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilderTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/kafka/builder/KafkaProtoSerializerBuilderTest.java @@ -4,7 +4,7 @@ import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; -import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer; +import io.odpf.dagger.common.serde.proto.serialization.KafkaProtoSerializer; import io.odpf.dagger.core.utils.Constants; import org.junit.Assert; import org.junit.Before; @@ -38,7 +38,7 @@ public void shouldCreateProtoSerializer() { KafkaProtoSerializerBuilder kafkaProtoSerializerBuilder = new KafkaProtoSerializerBuilder(configuration, stencilClientOrchestrator, new String[]{"test-col"}); KafkaRecordSerializationSchema kafkaSerializerSchema = kafkaProtoSerializerBuilder.build(); - Assert.assertTrue(kafkaSerializerSchema instanceof ProtoSerializer); + Assert.assertTrue(kafkaSerializerSchema instanceof KafkaProtoSerializer); } @Test From 39f8906a399ff0d85cb464ad529be80dc45de8c2 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Thu, 28 Jul 2022 12:30:30 +0100 Subject: [PATCH 2/5] chore: fix checkstyle --- .../odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java index 2e914bbe6..7e42656a6 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java @@ -14,7 +14,11 @@ import org.apache.flink.types.Row; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; @Slf4j From c580b09ae2515e95ae52701a994fd2586ba26b9b Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Tue, 2 Aug 2022 15:23:21 +0100 Subject: [PATCH 3/5] feat: Checkpointing on bq sink (#187) * feat: prepare for commit * fix: clear the messages for pushing to bq --- dagger-core/build.gradle | 1 + .../sink/bigquery/BigquerySinkWriter.java | 24 ++++++++++------- .../sink/bigquery/BigquerySinkWriterTest.java | 27 +++++++++++++++++++ 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 6d1d7404b..f961f58d2 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -65,6 +65,7 @@ dependencies { exclude group: 'org.apache.httpcomponents' exclude module: 'stencil', group: 'io.odpf' exclude group: 'com.google.protobuf' + exclude group: 'com.datadoghq' } compileOnly 'org.projectlombok:lombok:1.18.8' annotationProcessor 'org.projectlombok:lombok:1.18.8' diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java index 7e42656a6..be60020f5 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java @@ -94,12 +94,21 @@ protected void logErrors(OdpfSinkResponse sinkResponse, List sentMe errorInfo.getException().getMessage(), errorInfo.getErrorType().name()); }); - } + /** + * This will be called before we checkpoint the Writer's state in Streaming execution mode. + * + * @param flush – Whether flushing the un-staged data or not + * @return The data is ready to commit. + * @throws IOException – if fail to prepare for a commit. + */ @Override - public List prepareCommit(boolean flush) throws IOException, InterruptedException { - return null; + public List prepareCommit(boolean flush) throws IOException { + pushToBq(); + messages.clear(); + currentBatchSize = 0; + return Collections.emptyList(); } @Override @@ -108,13 +117,8 @@ public void close() throws Exception { } @Override - public List snapshotState(long checkpointId) throws IOException { - try { - pushToBq(); - } catch (Exception exception) { - errorReporter.reportFatalException(exception); - throw exception; - } + public List snapshotState(long checkpointId) { + // We don't snapshot anything return Collections.emptyList(); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java index efe4e19fa..405491cf6 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java @@ -149,4 +149,31 @@ public void shouldReportExceptionThrownFromSinkConnector() throws IOException { Mockito.verify(response, Mockito.times(0)).hasErrors(); Mockito.verify(reporter, Mockito.times(1)).reportFatalException(thrown); } + + @Test + public void shouldFlushWhilePrepareForCommit() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, null, null); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(response.hasErrors()).thenReturn(false); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenReturn(response); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.prepareCommit(true); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + Mockito.verify(sink, Mockito.times(4)).pushToSink(Mockito.anyList()); + } } From 4d02b72a03b7bd8a2ea10f992800c67cfb872eb8 Mon Sep 17 00:00:00 2001 From: Sumit Aich Date: Fri, 5 Aug 2022 11:21:07 +0530 Subject: [PATCH 4/5] docs: add documentation for BQ sink in Dagger - [https://github.com/odpf/dagger/issues/188] --- docs/docs/concepts/architecture.md | 5 +- docs/docs/concepts/lifecycle.md | 2 +- docs/docs/guides/create_dagger.md | 110 +++++++++++++++++---------- docs/docs/guides/overview.md | 2 +- docs/docs/guides/query_examples.md | 19 +++++ docs/docs/reference/configuration.md | 21 ++++- docs/docs/reference/metrics.md | 2 +- 7 files changed, 115 insertions(+), 46 deletions(-) diff --git a/docs/docs/concepts/architecture.md b/docs/docs/concepts/architecture.md index 084e13497..74adec50f 100644 --- a/docs/docs/concepts/architecture.md +++ b/docs/docs/concepts/architecture.md @@ -65,9 +65,11 @@ _**Telemetry Processor**_ _**Sink and Serializer**_ - After the data is processed and results are materialized they need to be sinked to some external persistent storage. -- Dagger supports Kafka and InfluxDB as supported sinks where the unbounded results are pushed at the end of the lifecycle. +- Dagger supports Kafka, BigQuery and InfluxDB as supported sinks where the unbounded results are pushed at the end of the lifecycle. - In the case of Kafka Sink the final result is protobuf encoded. So the result goes through a serialization stage on some defined output schema. The serializer module of the proto-handler does this. Results in Kafka can be used via any Kafka consumer. - Influx Sink helps in real-time analytics and dashboarding. In the case of Influx Sink dagger, converts results in Flink Row to InfluxDB points and add `tag`/`labels` as specified in the SQL. +- BigQuery is a data warehouse capable of running SQL queries over large datasets. Bigquery Sink is created using the [ODPF Depot](https://github.com/odpf/depot/tree/main/docs) library. Depot is a sink connector, which acts as a bridge between data processing systems and real sink. In BigQuery Sink, each Flink Row is converted into one BigQuery table row. The schema, table and partitioning details of the table are fetched from user supplied configuration. + ### Schema Handling @@ -115,3 +117,4 @@ notified of/updating with the latest schema is abstracted through a homegrown li - InfluxDB - time-series database for real-time analytics. - Kafka - Replayable queue to easy use of generated results. +- BigQuery - data warehouse capable of running SQL queries over large datasets. \ No newline at end of file diff --git a/docs/docs/concepts/lifecycle.md b/docs/docs/concepts/lifecycle.md index 88335ceb1..97b88f146 100644 --- a/docs/docs/concepts/lifecycle.md +++ b/docs/docs/concepts/lifecycle.md @@ -9,4 +9,4 @@ Architecturally after the creation of Dagger, it goes through several stages bef - `Stage-3` : Before executing the streaming SQL, Dagger undergoes a Pre-processor stage. Pre-processor is similar to post processors and currently support only transformers. They can be used to do some filtration of data or to do some basic processing before SQL. - `Stage-4` : In this stage, Dagger has to register custom UDFs. After that, it executes the SQL on the input stream and produces a resultant Output stream. But in the case of complex business requirements, SQL is not just enough to handle the complexity of the problem. So there comes the Post Processors. - `Stage-5` : In the fourth stage or Post Processors stage, the output of the previous stage is taken as input and some complex transformation logic can be applied on top of it to produce the final result. There can be three types of Post Processors: external, internal, and transformers. You can find more on post processors and their responsibilities here. -- `Stage-6` : In the final stage or sink stage, the final output is serialized accordingly and sent to a downstream (which can either be Kafka or InfluxDB) for further use cases. +- `Stage-6` : In the final stage or sink stage, the final output is serialized accordingly and sent to a downstream (which can either be Kafka, BigQuery or InfluxDB) for further use cases. diff --git a/docs/docs/guides/create_dagger.md b/docs/docs/guides/create_dagger.md index 46baf1b2c..7e3dce57a 100644 --- a/docs/docs/guides/create_dagger.md +++ b/docs/docs/guides/create_dagger.md @@ -16,8 +16,8 @@ Dagger currently supports 3 kinds of Data Sources. Here are the requirements for ##### `KAFKA_SOURCE` and `KAFKA_CONSUMER` -Both these sources use [Kafka](https://kafka.apache.org/) as the source of data. So you need to set up Kafka(1.0+) either -in a local or clustered environment. Follow this [quick start](https://kafka.apache.org/quickstart) to set up Kafka in +Both these sources use [Kafka](https://kafka.apache.org/) as the source of data. So you need to set up Kafka(1.0+) either +in a local or clustered environment. Follow this [quick start](https://kafka.apache.org/quickstart) to set up Kafka in the local machine. If you have a clustered Kafka you can configure it to use in Dagger directly. ##### `PARQUET_SOURCE` @@ -47,37 +47,37 @@ root_folder ``` -The file paths can be either in the local file system or in GCS bucket. When parquet files are provided from GCS bucket, -Dagger will require a `core_site.xml` to be configured in order to connect and read from GCS. A sample `core_site.xml` is +The file paths can be either in the local file system or in GCS bucket. When parquet files are provided from GCS bucket, +Dagger will require a `core_site.xml` to be configured in order to connect and read from GCS. A sample `core_site.xml` is present in dagger and looks like this: ```xml - - google.cloud.auth.service.account.enable - true - - - google.cloud.auth.service.account.json.keyfile - /Users/dummy/secrets/google_service_account.json - - - fs.gs.requester.pays.mode - CUSTOM - true - - - fs.gs.requester.pays.buckets - my_sample_bucket_name - true - - - fs.gs.requester.pays.project.id - my_billing_project_id - true - + + google.cloud.auth.service.account.enable + true + + + google.cloud.auth.service.account.json.keyfile + /Users/dummy/secrets/google_service_account.json + + + fs.gs.requester.pays.mode + CUSTOM + true + + + fs.gs.requester.pays.buckets + my_sample_bucket_name + true + + + fs.gs.requester.pays.project.id + my_billing_project_id + true + ``` -You can look into the official [GCS Hadoop Connectors](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md) +You can look into the official [GCS Hadoop Connectors](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md) documentation to know more on how to edit this xml as per your needs. #### `Flink [optional]` @@ -104,22 +104,22 @@ $ java -jar dagger-core/build/libs/dagger-core--fat.jar ConfigFi #### `Protobuf Schema` -- Dagger exclusively supports [protobuf](https://developers.google.com/protocol-buffers) encoded data. That is, for a -source reading from Kafka, Dagger consumes protobuf data from Kafka topics and does the processing. For a source reading -from Parquet Files, dagger uses protobuf schema to parse the Row Group. When pushing the results to a sink, Dagger produces -data as per the output protobuf schema to a Kafka topic(when the sink is Kafka). -- When using Kafka as a source, you can push data to a Kafka topic as per protobuf format using any of the Kafka client -libraries. You can follow this [tutorial](https://www.conduktor.io/how-to-produce-and-consume-protobuf-records-in-apache-kafka/). -- For all kinds of sources, you need to define the -[java compiled protobuf schema](https://developers.google.com/protocol-buffers/docs/javatutorial) in the classpath or -use our in-house schema registry tool like [Stencil](https://github.com/odpf/stencil) to let dagger know about the data -schema. Stencil is an event schema registry that provides an abstraction layer for schema handling, schema caching, and -dynamic schema updates. [These configurations](../reference/configuration.md#schema-registry) needs to be set if you are -using stencil for proto schema handling. +- Dagger exclusively supports [protobuf](https://developers.google.com/protocol-buffers) encoded data. That is, for a + source reading from Kafka, Dagger consumes protobuf data from Kafka topics and does the processing. For a source reading + from Parquet Files, dagger uses protobuf schema to parse the Row Group. When pushing the results to a sink, Dagger produces + data as per the output protobuf schema to a Kafka topic(when the sink is Kafka). +- When using Kafka as a source, you can push data to a Kafka topic as per protobuf format using any of the Kafka client + libraries. You can follow this [tutorial](https://www.conduktor.io/how-to-produce-and-consume-protobuf-records-in-apache-kafka/). +- For all kinds of sources, you need to define the + [java compiled protobuf schema](https://developers.google.com/protocol-buffers/docs/javatutorial) in the classpath or + use our in-house schema registry tool like [Stencil](https://github.com/odpf/stencil) to let dagger know about the data + schema. Stencil is an event schema registry that provides an abstraction layer for schema handling, schema caching, and + dynamic schema updates. [These configurations](../reference/configuration.md#schema-registry) needs to be set if you are + using stencil for proto schema handling. #### `Sinks` -- The current version of dagger supports Log, InfluxDB and Kafka and as supported sinks to push the data after processing. You need to set up the desired sinks beforehand so that data can be pushed seamlessly. +- The current version of dagger supports Log, BigQuery, InfluxDB and Kafka as supported sinks to push the data after processing. You need to set up the desired sinks beforehand so that data can be pushed seamlessly. ##### `Influx Sink` @@ -132,6 +132,12 @@ using stencil for proto schema handling. - With Kafka sink dagger pushes the processed data as protobuf to a Kafka topic. - If you have a Kafka cluster set up you are good to run a Kafka sink dagger. Enable auto topic creation in Kafka or create a Kafka topic beforehand to push the data. + ##### `BigQuery Sink` : + - BigQuery is a fully managed enterprise data warehouse that helps you manage and analyze your data with built-in features like machine learning, geospatial analysis, and business intelligence.BigQuery's serverless architecture lets you use SQL queries to answer your organization's biggest questions with zero infrastructure management. BigQuery's scalable, distributed analysis engine lets you query terabytes in seconds and petabytes in minutes. + - Bigquery Sink is created using the ODPF Depot library. + - Depot is a sink connector, which acts as a bridge between data processing systems and real sink. You can check out the Depot Github repository [here](https://github.com/odpf/depot/tree/main/docs). + + ## Common Configurations - These configurations are mandatory for dagger creation and are sink independent. Here you need to set configurations such as the source details, the protobuf schema class, the SQL query to be applied on the streaming data, etc. In local execution, they would be set inside [`local.properties`](https://github.com/odpf/dagger/blob/main/dagger-core/env/local.properties) file. In the clustered environment they can be passed as job parameters to the Flink exposed job creation API. @@ -230,6 +236,28 @@ OUTPUT_KAFKA_TOPIC=test-kafka-output - Dimensions & metrics from the SELECT section in the query need to be mapped to field names in the output proto. - Find more examples on Kafka sink SQL queries [here](./guides/query_examples.md#kafka-sink). +## Bigquery Sink + + + +- BigQuery is a data warehouse capable of quickly running SQL queries over large datasets. +- Bigquery Sink is created using the ODPF Depot library. Depot is a sink connector, which acts as a bridge between data processing systems and real sink. +- You can check out the BigQuery Sink Connector in the Depot Github repository [here](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md). + + +### BigQuery Sink Features: +- [Datatype Protobuf](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#datatype-protobuf) +- [Datatype JSON](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#datatype-json) + +- [Bigquery Table Schema Update](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#bigquery-table-schema-update) +- [Protobuf - Bigquery Table Type Mapping](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#protobuf---bigquery-table-type-mapping) + +- [Partitioning](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#partitioning) +- [Metadata](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#metadata) +- [Default columns for json data type](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#default-columns-for-json-data-type) +- [Errors Handling](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#errors-handling) +- [Google Cloud Bigquery IAM Permission](https://github.com/odpf/depot/blob/main/docs/sinks/bigquery.md#google-cloud-bigquery-iam-permission) + ## Advanced Data Processing - Dagger's inbuilt SQL enables users to do some complex stream processing like aggregations, joins, windowing, union etc. Find more of the sample queries [here](./guides/query_examples.md). diff --git a/docs/docs/guides/overview.md b/docs/docs/guides/overview.md index b5fb2e03b..f28a2c377 100644 --- a/docs/docs/guides/overview.md +++ b/docs/docs/guides/overview.md @@ -4,7 +4,7 @@ The following section describes how to manage Dagger throughout its lifecycle. ### [Creating Dagger](./guides/create_dagger.md) -Dagger currently supports InfluxDB, Kafka as supported sinks. This section explains +Dagger currently supports InfluxDB, Kafka and BigQuery as supported sinks. This section explains how you can create a dagger and configure different settings. ### [Deploying Dagger](./guides/deployment.md) diff --git a/docs/docs/guides/query_examples.md b/docs/docs/guides/query_examples.md index f8c87efba..9d6184de3 100644 --- a/docs/docs/guides/query_examples.md +++ b/docs/docs/guides/query_examples.md @@ -72,6 +72,25 @@ from Here booking denotes the booking events stream with [sample booking schema](#sample-booking-event-schema). +```SQL +SELECT + order_number, + service_type, + status +from + `booking` +``` +## BigQuery Sink + +- `Tag_` prefix should not be used before the dimensions. +- Ensure that sink type is selected as bigquery. +- Dimensions & metrics from the SELECT section in the query should be mapped exactly to the field names in the output protobuf type configured for bigquery sink +- Data types of the selected fields should exactly match to the output protobuf type configured for bigquery sink + +### Example query + +Here booking denotes the booking events stream with [sample booking schema](#sample-booking-event-schema). + ```SQL SELECT order_number, diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index acefca31a..baa04bdd3 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -7,6 +7,7 @@ This page contains references for all the application configurations for Dagger. * [Generic](configuration.md#generic) * [Influx Sink](configuration.md#influx-sink) * [Kafka Sink](configuration.md#kafka-sink) +* [BigQuery Sink](configuration.md#bigquery-sink) * [Schema Registry](configuration.md#schema-registry) * [Flink](configuration.md#flink) * [Darts](configuration.md#darts) @@ -201,7 +202,7 @@ STREAMS = [ #### `SINK_TYPE` -Defines the Dagger sink type. At present, we support `log`, `influx`, `kafka` +Defines the Dagger sink type. At present, we support `log`, `influx`, `kafka`, `bigquery` * Example value: `log` * Type: `required` @@ -355,6 +356,24 @@ Enable/Disable to produce large messages to Kafka. by default, it's configuratio * Type: `optional` * Default value: `false` +### BigQuery Sink + +A BigQuery sink Dagger (`SINK_TYPE=bigquery`) requires following env variables to be set along with the Generic Dagger env variables, as well as the +[Generic](https://github.com/odpf/depot/blob/main/docs/reference/configuration/generic.md) and [BigQuery ](https://github.com/odpf/depot/blob/main/docs/reference/configuration/bigquery-sink.md)env variables in the ODPF Depot repository, since Dagger uses the BigQuery sink connector implementation available in [Depot](https://github.com/odpf/depot) repository. + +#### `SINK_BIGQUERY_BATCH_SIZE` + +Controls how many records are loaded into the BigQuery Sink in one network call +- Example value: 500 +- Type: `required` + +#### `SINK_ERROR_TYPES_FOR_FAILURE` + +Contains the error types for which the dagger should throw an exception if such an error occurs during runtime. The possible error types are `DESERIALIZATION_ERROR`, `INVALID_MESSAGE_ERROR`, `UNKNOWN_FIELDS_ERROR`, `SINK_4XX_ERROR`, `SINK_5XX_ERROR`, `SINK_UNKNOWN_ERROR`, `DEFAULT_ERROR` . The error types should be comma-separated. +- Example value: `UNKNOWN_FIELDS_ERROR` +- Type: `optional` + + ### Schema Registry Stencil is dynamic schema registry for protobuf. Find more details about Stencil [here](https://github.com/odpf/stencil#stencil). diff --git a/docs/docs/reference/metrics.md b/docs/docs/reference/metrics.md index 498ce0c28..8aa3bb5bb 100644 --- a/docs/docs/reference/metrics.md +++ b/docs/docs/reference/metrics.md @@ -34,7 +34,7 @@ Some of the most important metrics related to a dagger that gives you an overvie ### `Sink` -- The configured sink type for dagger. Can be Log/Kafka/Influx. +- The configured sink type for dagger. Can be Log/BigQuery/Kafka/Influx. ### `Full restarts` From 6fae33c1d3c77a1e1700648778c0418e3f86975c Mon Sep 17 00:00:00 2001 From: lavkesh Date: Thu, 11 Aug 2022 11:48:29 +0100 Subject: [PATCH 5/5] chore: version bump of depot --- dagger-core/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index f961f58d2..4d62657b7 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -61,7 +61,7 @@ configurations { dependencies { minimalJar project(path: ':dagger-common', configuration: 'minimalCommonJar') minimalJar project(path: ':dagger-functions', configuration: 'minimalFunctionsJar') - minimalJar('io.odpf:depot:0.1.6') { + minimalJar('io.odpf:depot:0.2.0') { exclude group: 'org.apache.httpcomponents' exclude module: 'stencil', group: 'io.odpf' exclude group: 'com.google.protobuf'