diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandler.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandler.java index 5b84c4803..2691b9f77 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandler.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandler.java @@ -25,14 +25,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Map; - -import static org.apache.http.HttpStatus.SC_OK; +import java.util.regex.Pattern; /** * The Http response handler. */ public class HttpResponseHandler extends AsyncCompletionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(HttpResponseHandler.class.getName()); + + protected static final String SUCCESS_CODE_PATTERN = "^2.*"; private final RowManager rowManager; private ColumnNameManager columnNameManager; private Descriptors.Descriptor descriptor; @@ -80,7 +81,8 @@ public void startTimer() { @Override public Object onCompleted(Response response) { int statusCode = response.getStatusCode(); - if (statusCode == SC_OK) { + boolean isSuccess = Pattern.compile(SUCCESS_CODE_PATTERN).matcher(String.valueOf(statusCode)).matches(); + if (isSuccess) { successHandler(response); } else { postResponseTelemetry.validateResponseCode(meterStatsManager, statusCode); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java index 7fa9548a9..16db27da2 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java @@ -400,4 +400,29 @@ public void shouldNotPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTyp verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); } + + @Test + public void shouldHandleAnySuccessResponseCodeOtherThan200() { + outputMapping.put("surge_factor", new OutputMapping("$.surge")); + outputColumnNames = Collections.singletonList("surge_factor"); + columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); + Row resultStreamData = new Row(2); + Row outputData = new Row(2); + outputData.setField(0, 0.732f); + resultStreamData.setField(0, inputData); + resultStreamData.setField(1, outputData); + when(response.getStatusCode()).thenReturn(201); + when(response.getResponseBody()).thenReturn("{\n" + + " \"surge\": 0.732\n" + + "}"); + + httpResponseHandler.startTimer(); + httpResponseHandler.onCompleted(response); + + verify(meterStatsManager, times(1)).markEvent(SUCCESS_RESPONSE); + verify(meterStatsManager, times(1)).updateHistogram(any(Aspects.class), any(Long.class)); + verify(resultFuture, times(1)).complete(Collections.singleton(resultStreamData)); + } } diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java index cc4918a29..6c259c808 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java @@ -191,7 +191,7 @@ public void shouldPopulateFieldFromPostgresWithCorrespondingDataType() throws Ex + "}"; configurationMap.put(PROCESSOR_POSTPROCESSOR_CONFIG_KEY, postProcessorConfigString); - Configuration configuration = new Configuration(ParameterTool.fromMap(configurationMap)); + configuration = new Configuration(ParameterTool.fromMap(configurationMap)); stencilClientOrchestrator = new StencilClientOrchestrator(configuration); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -249,7 +249,7 @@ public void shouldPopulateFieldFromPostgresWithSuccessResponseWithExternalAndInt + "}"; configurationMap.put(PROCESSOR_POSTPROCESSOR_CONFIG_KEY, postProcessorConfigWithInternalSourceString); - Configuration configuration = new Configuration(ParameterTool.fromMap(configurationMap)); + configuration = new Configuration(ParameterTool.fromMap(configurationMap)); stencilClientOrchestrator = new StencilClientOrchestrator(configuration); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -324,7 +324,7 @@ public void shouldPopulateFieldFromPostgresOnSuccessResponseWithAllThreeSourcesI + "}"; configurationMap.put(PROCESSOR_POSTPROCESSOR_CONFIG_KEY, postProcessorConfigWithTransformerString); - Configuration configuration = new Configuration(ParameterTool.fromMap(configurationMap)); + configuration = new Configuration(ParameterTool.fromMap(configurationMap)); stencilClientOrchestrator = new StencilClientOrchestrator(configuration); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/version.txt b/version.txt index 09a3acfa1..7ceb04048 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.6.0 \ No newline at end of file +0.6.1 \ No newline at end of file