-
Notifications
You must be signed in to change notification settings - Fork 8
Description
Describe the bug
VPC endpoint URI for AWS EventBridge is used as the VPC endpoint for S3 when large payload is offloaded to S3.
To Reproduce
Steps to reproduce the behavior:
Configure a VPC endpoint for AWS EventBridge (aws.eventbridge.endpoint.uri), enable S3 offload(aws.eventbridge.offloading.default.s3.bucket), test with a big payload (event request entry bigger than 256KB).
Expected behavior
Perhaps user should be allowed to specified the VPC endpoint URI for S3 through a connector configuration key - aws.eventbridge.offloading.default.s3.endpoint.uri. Use default s3 endpoint if s3 VPC endpoint is not provided.
Environment:
aws-kafka-eventbridge-sink-v1.3.3
Java 17
Additional context
Connector configuration
{
"name": "eventbridge-sink-connector-plain",
"topics": [
"awm-kafka-connect-poc-topic"
],
"config": {
"connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
"tasks.max": "2",
"topics": "awm-kafka-connect-poc-topic",
"aws.eventbridge.connector.id": "gkc-eventbridge-sink-connector",
"aws.eventbridge.region": "us-east-1",
....
"aws.eventbridge.auth.credentials_provider.role.arn": "arn:aws:iam::xxxxxxxxxx:role/112200-app-kafka-connector-worker",
"aws.eventbridge.endpoint.uri": https://vpce-0904f7bf046d70b9c-ojri2ydb.events.us-east-1.vpce.amazonaws.com/,
"aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:xxxxxxxxxx:event-bus/app-112200-gaia-kafka-connect-integration-bus",
"aws.eventbridge.offloading.default.s3.bucket": "app-id-112200-dep-id-115293-uu-id-8itjb4i0gup1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"auto.offset.reset": "latest",
"errors.tolerance":"all",
"errors.log.enable": "true",
"errors.deadletterqueue.topic.name":"awm-kafka-connect-poc-topic-dlq",
"errors.deadletterqueue.topic.replication.factor":1
}
}
Error Log
The following error log was captured for S3 offload. A 404 error is returned when a payload is attempted to be uploaded to an S3 bucket via the incorrect VPC endpoint for S3.
[2024-11-29 19:41:17,354] DEBUG [eventbridge-sink-connector-plain|task-0] Sending Request: DefaultSdkHttpFullRequest(httpMethod=PUT, protocol=https, host=vpce-0904f7bf046d70b9c-ojri2ydb.events.us-east-1.vpce.amazonaws.com, encodedPath=/app-id-112200-dep-id-115293-uu-id-8itjb4i0gup1/e1ed204f-2f87-4c7c-b040-0785aa281750, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, Expect, User-Agent], queryParameters=[]) (software.amazon.awssdk.request:85)
[2024-11-29 19:41:17,355] DEBUG [eventbridge-sink-connector-plain|task-0] Using SelectedAuthScheme: aws.auth#sigv4 (software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage:85)
[2024-11-29 19:41:17,358] DEBUG [eventbridge-sink-connector-plain|task-0] AWS4 Canonical Request: PUT
/app-id-112200-dep-id-115293-uu-id-8itjb4i0gup1/e1ed204f-2f87-4c7c-b040-0785aa281750
amz-sdk-invocation-id:8bc068ed-f4a3-8652-d20f-70bd54358415
amz-sdk-request:attempt=1; max=3
content-length:92
content-type:text/plain; charset=UTF-8
host:vpce-0904f7bf046d70b9c-ojri2ydb.events.us-east-1.vpce.amazonaws.com << this is the VPC endpoint for eventbridge
…
…
[2024-11-29 19:41:17,361] DEBUG [eventbridge-sink-connector-plain|task-0] Creating ConnectionPool for: URI:[https://vpce-0904f7bf046d70b9c-ojri2ydb.events.us-east-1.vpce.amazonaws.com:443](https://vpce-0904f7bf046d70b9c-ojri2ydb.events.us-east-1.vpce.amazonaws.com/), MaxConns: 50 (software.amazon.awssdk.http.crt.AwsCrtHttpClientBase:85)
[2024-11-29 19:41:17,568] DEBUG Received failed response: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341, Extended Request ID: not available (software.amazon.awssdk.requestId:85)
[2024-11-29 19:41:17,568] DEBUG Received failed response: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341, Extended Request ID: not available (software.amazon.awssdk.request:85)
[2024-11-29 19:41:17,585] WARN [eventbridge-sink-connector-plain|task-0] [@9d66194] Not sending events to EventBridge: offloading failed (software.amazon.event.kafkaconnector.EventBridgeWriter:178)
[2024-11-29 19:41:17,586] ERROR [eventbridge-sink-connector-plain|task-0] [@9d66194] Non-retryable failed put call: failing connector errorMessage=java.util.concurrent.ExecutionException: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341) (software.amazon.event.kafkaconnector.EventBridgeSinkTask:154)
[2024-11-29 19:41:17,586] ERROR [eventbridge-sink-connector-plain|task-0] WorkerSinkTask{id=eventbridge-sink-connector-plain-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: software.amazon.event.kafkaconnector.util.ThrowingFunctionApplyException: java.util.concurrent.ExecutionException: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341) (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.ConnectException: software.amazon.event.kafkaconnector.util.ThrowingFunctionApplyException: java.util.concurrent.ExecutionException: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341)
at software.amazon.event.kafkaconnector.EventBridgeSinkTask.handleFailedEntries(EventBridgeSinkTask.java:156)
at software.amazon.event.kafkaconnector.EventBridgeSinkTask.lambda$put$0(EventBridgeSinkTask.java:90)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at software.amazon.event.kafkaconnector.EventBridgeSinkTask.put(EventBridgeSinkTask.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: software.amazon.event.kafkaconnector.util.ThrowingFunctionApplyException: java.util.concurrent.ExecutionException: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341)
at software.amazon.event.kafkaconnector.util.ThrowingFunction.lambda$wrap$0(ThrowingFunction.java:19)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
at software.amazon.event.kafkaconnector.cache.FifoCache.computeIfAbsent(FifoCache.java:40)
at software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading.putS3Object(S3EventBridgeEventDetailValueOffloading.java:197)
at software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading.apply(S3EventBridgeEventDetailValueOffloading.java:167)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading.apply(S3EventBridgeEventDetailValueOffloading.java:142)
at software.amazon.event.kafkaconnector.EventBridgeWriter.putItems(EventBridgeWriter.java:176)
at software.amazon.event.kafkaconnector.EventBridgeSinkTask.put(EventBridgeSinkTask.java:88)
... 11 more
Caused by: java.util.concurrent.ExecutionException: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2024)
at software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading.lambda$putS3Object$1(S3EventBridgeEventDetailValueOffloading.java:209)
at software.amazon.event.kafkaconnector.util.ThrowingFunction.lambda$wrap$0(ThrowingFunction.java:17)
... 25 more
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 404, Request ID: e0ebe8d4-bcf7-40cf-85a9-396d4744f341)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:92)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:135)
at software.amazon.awssdk.core.internal.metrics.BytesReadTrackingPublisher$BytesReadTracker.onComplete(BytesReadTrackingPublisher.java:74)
at software.amazon.awssdk.utils.async.SimplePublisher.doProcessQueue(SimplePublisher.java:275)
at software.amazon.awssdk.utils.async.SimplePublisher.processEventQueue(SimplePublisher.java:224)
at software.amazon.awssdk.utils.async.SimplePublisher.complete(SimplePublisher.java:157)
at software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter.onSuccessfulResponseComplete(CrtResponseAdapter.java:124)
at software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter.onResponseComplete(CrtResponseAdapter.java:117)
at software.amazon.awssdk.crt.http.HttpStreamResponseHandlerNativeAdapter.onResponseComplete(HttpStreamResponseHandlerNativeAdapter.java:66)