diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 6d1d7404b..f961f58d2 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -65,6 +65,7 @@ dependencies { exclude group: 'org.apache.httpcomponents' exclude module: 'stencil', group: 'io.odpf' exclude group: 'com.google.protobuf' + exclude group: 'com.datadoghq' } compileOnly 'org.projectlombok:lombok:1.18.8' annotationProcessor 'org.projectlombok:lombok:1.18.8' diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java index 7e42656a6..be60020f5 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriter.java @@ -94,12 +94,21 @@ protected void logErrors(OdpfSinkResponse sinkResponse, List sentMe errorInfo.getException().getMessage(), errorInfo.getErrorType().name()); }); - } + /** + * This will be called before we checkpoint the Writer's state in Streaming execution mode. + * + * @param flush – Whether flushing the un-staged data or not + * @return The data is ready to commit. + * @throws IOException – if fail to prepare for a commit. + */ @Override - public List prepareCommit(boolean flush) throws IOException, InterruptedException { - return null; + public List prepareCommit(boolean flush) throws IOException { + pushToBq(); + messages.clear(); + currentBatchSize = 0; + return Collections.emptyList(); } @Override @@ -108,13 +117,8 @@ public void close() throws Exception { } @Override - public List snapshotState(long checkpointId) throws IOException { - try { - pushToBq(); - } catch (Exception exception) { - errorReporter.reportFatalException(exception); - throw exception; - } + public List snapshotState(long checkpointId) { + // We don't snapshot anything return Collections.emptyList(); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java index efe4e19fa..405491cf6 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/sink/bigquery/BigquerySinkWriterTest.java @@ -149,4 +149,31 @@ public void shouldReportExceptionThrownFromSinkConnector() throws IOException { Mockito.verify(response, Mockito.times(0)).hasErrors(); Mockito.verify(reporter, Mockito.times(1)).reportFatalException(thrown); } + + @Test + public void shouldFlushWhilePrepareForCommit() throws IOException { + ProtoSerializer protoSerializer = Mockito.mock(ProtoSerializer.class); + OdpfSink sink = Mockito.mock(OdpfSink.class); + BigquerySinkWriter bigquerySinkWriter = new BigquerySinkWriter(protoSerializer, sink, 3, null, null); + Row row = new Row(1); + row.setField(0, "some field"); + Mockito.when(protoSerializer.serializeKey(row)).thenReturn("test".getBytes()); + Mockito.when(protoSerializer.serializeValue(row)).thenReturn("testMessage".getBytes()); + OdpfSinkResponse response = Mockito.mock(OdpfSinkResponse.class); + Mockito.when(response.hasErrors()).thenReturn(false); + Mockito.when(sink.pushToSink(Mockito.anyList())).thenReturn(response); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.prepareCommit(true); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + bigquerySinkWriter.write(row, null); + Mockito.verify(sink, Mockito.times(4)).pushToSink(Mockito.anyList()); + } }