From d68beb5df5952a5a820165752018bce2038e80a4 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Thu, 28 Jul 2022 12:27:32 +0100 Subject: [PATCH 1/2] feat: prepare for commit --- .../sink/bigquery/BigquerySinkWriter.java | 29 ++++++++++++------- .../sink/bigquery/BigquerySinkWriterTest.java | 27 +++++++++++++++++ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java index 2e914bbe6..da5279234 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java @@ -14,7 +14,11 @@ import org.apache.flink.types.Row; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; @Slf4j @@ -93,9 +97,19 @@ protected void logErrors(OdpfSinkResponse sinkResponse, List sentMe } + /** + * This will be called before we checkpoint the Writer's state in Streaming execution mode. + * + * @param flush – Whether flushing the un-staged data or not + * @return The data is ready to commit. + * @throws IOException – if fail to prepare for a commit. + */ @Override - public List prepareCommit(boolean flush) throws IOException, InterruptedException { - return null; + public List prepareCommit(boolean flush) throws IOException { + if (flush) { + pushToBq(); + } + return Collections.emptyList(); } @Override @@ -104,13 +118,8 @@ public void close() throws Exception { } @Override - public List snapshotState(long checkpointId) throws IOException { - try { - pushToBq(); - } catch (Exception exception) { - errorReporter.reportFatalException(exception); - throw exception; - } + public List snapshotState(long checkpointId) { + // We don't snapshot anything return Collections.emptyList(); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java index efe4e19fa..405491cf6 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java @@ -149,4 +149,31 @@ public void shouldReportExceptionThrownFromSinkConnector() throws IOException { Mockito.verify(response, Mockito.times(0)).hasErrors(); Mockito.verify(reporter, Mockito.times(1)).reportFatalException(thrown); } + + @Test + public void shouldFlushWhilePrepareForCommit() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, null, null); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(response.hasErrors()).thenReturn(false); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenReturn(response); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.prepareCommit(true); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + Mockito.verify(sink, Mockito.times(4)).pushToSink(Mockito.anyList()); + } } From 67d37be2da81e6d18629247744fcf10869c41b0f Mon Sep 17 00:00:00 2001 From: lavkesh Date: Sat, 30 Jul 2022 14:09:54 +0100 Subject: [PATCH 2/2] fix: clear the messages for pushing to bq --- dagger-core/build.gradle | 1 + .../odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 6d1d7404b..f961f58d2 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -65,6 +65,7 @@ dependencies { exclude group: 'org.apache.httpcomponents' exclude module: 'stencil', group: 'io.odpf' exclude group: 'com.google.protobuf' + exclude group: 'com.datadoghq' } compileOnly 'org.projectlombok:lombok:1.18.8' annotationProcessor 'org.projectlombok:lombok:1.18.8' diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java index da5279234..be60020f5 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java @@ -94,7 +94,6 @@ protected void logErrors(OdpfSinkResponse sinkResponse, List sentMe errorInfo.getException().getMessage(), errorInfo.getErrorType().name()); }); - } /** @@ -106,9 +105,9 @@ protected void logErrors(OdpfSinkResponse sinkResponse, List sentMe */ @Override public List prepareCommit(boolean flush) throws IOException { - if (flush) { - pushToBq(); - } + pushToBq(); + messages.clear(); + currentBatchSize = 0; return Collections.emptyList(); }