Skip to content

Commit f0f43b8

Browse files
authored
Merge pull request #451 from agebhar1/feature/gh-447-s3-endpoint-uri
fix: add dedicated configuration for S3 endpoint override
2 parents 3b7f559 + a0c2ea6 commit f0f43b8

File tree

4 files changed

+36
-6
lines changed

4 files changed

+36
-6
lines changed

e2e/connect-config-s3.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:000000000000:event-bus/eventbridge-e2e",
99
"aws.eventbridge.region": "us-east-1",
1010
"aws.eventbridge.endpoint.uri": "http://localstack:4566",
11+
"aws.eventbridge.offloading.default.s3.endpoint.uri": "http://localstack:4566",
1112
"aws.eventbridge.offloading.default.s3.bucket": "test-bucket",
1213
"aws.eventbridge.offloading.default.fieldref": "$.detail.value.message",
1314
"key.converter": "org.apache.kafka.connect.storage.StringConverter",

src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class EventBridgeSinkConfig extends AbstractConfig {
4040
static final String AWS_DETAIL_TYPES_MAPPER_CLASS = "aws.eventbridge.detail.types.mapper.class";
4141
static final String AWS_TIME_MAPPER_CLASS = "aws.eventbridge.time.mapper.class";
4242
static final String AWS_EVENTBUS_RESOURCES_CONFIG = "aws.eventbridge.eventbus.resources";
43+
static final String AWS_OFFLOADING_DEFAULT_S3_ENDPOINT_URI =
44+
"aws.eventbridge.offloading.default.s3.endpoint.uri";
4345
static final String AWS_OFFLOADING_DEFAULT_S3_BUCKET =
4446
"aws.eventbridge.offloading.default.s3.bucket";
4547
static final String AWS_OFFLOADING_DEFAULT_FIELDREF =
@@ -67,6 +69,8 @@ public class EventBridgeSinkConfig extends AbstractConfig {
6769
private static final String AWS_PROFILE_NAME_CONFIG_DOC =
6870
"The profile to use from the configuration and credentials files to retrieve IAM credentials";
6971
public static final String AWS_DETAIL_TYPES_DEFAULT = "kafka-connect-${topic}";
72+
public static final String AWS_OFFLOADING_S3_DEFAULT_ENDPOINT_URI_DOC =
73+
"An optional service endpoint URI used to connect to S3.";
7074
public static final String AWS_OFFLOADING_S3_DEFAULT_BUCKET_DOC =
7175
"The S3 bucket to offload matched record value by JSON Path";
7276
public static final String AWS_OFFLOADING_DEFAULT_FIELDREF_DOC =
@@ -106,6 +110,7 @@ public class EventBridgeSinkConfig extends AbstractConfig {
106110
public final List<String> resources;
107111
public final int maxRetries;
108112
public final long retriesDelay;
113+
public final String offloadingDefaultS3EndpointURI;
109114
public Map<String, String> detailTypeByTopic;
110115
public String detailType;
111116
public String detailTypeMapperClass;
@@ -129,6 +134,7 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
129134
this.resources = getList(AWS_EVENTBUS_RESOURCES_CONFIG);
130135
this.detailTypeMapperClass = getString(AWS_DETAIL_TYPES_MAPPER_CLASS);
131136
this.timeMapperClass = getString(AWS_TIME_MAPPER_CLASS);
137+
this.offloadingDefaultS3EndpointURI = getString(AWS_OFFLOADING_DEFAULT_S3_ENDPOINT_URI);
132138
this.offloadingDefaultS3Bucket = getString(AWS_OFFLOADING_DEFAULT_S3_BUCKET);
133139
this.offloadingDefaultFieldRef = getString(AWS_OFFLOADING_DEFAULT_FIELDREF);
134140

@@ -145,7 +151,8 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
145151
"EventBridge properties: connectorId={} eventBusArn={} eventBusRegion={} eventBusEndpointURI={} "
146152
+ "eventBusMaxRetries={} eventBusRetriesDelay={} eventBusResources={} "
147153
+ "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={} "
148-
+ "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={} detailTypeMapperClass={} timeMapperClass={}",
154+
+ "offloadingDefaultS3EndpointURI={} offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={} "
155+
+ "detailTypeMapperClass={} timeMapperClass={}",
149156
connectorId,
150157
eventBusArn,
151158
region,
@@ -157,6 +164,7 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
157164
roleArn,
158165
connectorId,
159166
externalId,
167+
offloadingDefaultS3EndpointURI,
160168
offloadingDefaultS3Bucket,
161169
offloadingDefaultFieldRef,
162170
detailTypeMapperClass,
@@ -228,6 +236,12 @@ private static void addParams(final ConfigDef configDef) {
228236
AWS_DETAIL_TYPES_MAPPER_CLASS_DEFAULT,
229237
Importance.MEDIUM,
230238
AWS_DETAIL_TYPES_MAPPER_DOC);
239+
configDef.define(
240+
AWS_OFFLOADING_DEFAULT_S3_ENDPOINT_URI,
241+
Type.STRING,
242+
"",
243+
Importance.MEDIUM,
244+
AWS_OFFLOADING_S3_DEFAULT_ENDPOINT_URI_DOC);
231245
configDef.define(
232246
AWS_OFFLOADING_DEFAULT_S3_BUCKET,
233247
Type.STRING,

src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfigValidator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ public static void validate(ConfigValue configValue, EnvVarGetter getenv) {
107107
break;
108108
}
109109

110+
case AWS_OFFLOADING_DEFAULT_S3_ENDPOINT_URI:
111+
{
112+
validateURI(configValue);
113+
break;
114+
}
115+
110116
case AWS_OFFLOADING_DEFAULT_S3_BUCKET:
111117
{
112118
nonStrictValidateOffloadingDefaultS3Bucket(configValue);

src/main/java/software/amazon/event/kafkaconnector/EventBridgeWriter.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class EventBridgeWriter {
7070
public EventBridgeWriter(EventBridgeSinkConfig config) {
7171
this.config = config;
7272

73-
var endpointUri =
73+
var ebEndpointUri =
7474
StringUtils.trim(this.config.endpointURI).isBlank()
7575
? null
7676
: URI.create(this.config.endpointURI);
@@ -96,7 +96,7 @@ public EventBridgeWriter(EventBridgeSinkConfig config) {
9696
var client =
9797
EventBridgeAsyncClient.builder()
9898
.region(Region.of(this.config.region))
99-
.endpointOverride(endpointUri)
99+
.endpointOverride(ebEndpointUri)
100100
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
101101
.overrideConfiguration(clientConfig)
102102
.credentialsProvider(credentialsProvider)
@@ -108,11 +108,17 @@ public EventBridgeWriter(EventBridgeSinkConfig config) {
108108
this.batching = new DefaultEventBridgeBatching();
109109

110110
if ((config.offloadingDefaultS3Bucket != null) && !config.offloadingDefaultS3Bucket.isEmpty()) {
111+
112+
var s3EndpointUri =
113+
StringUtils.trim(this.config.offloadingDefaultS3EndpointURI).isBlank()
114+
? null
115+
: URI.create(this.config.offloadingDefaultS3EndpointURI);
116+
111117
var s3client =
112118
S3AsyncClient.builder()
113119
.credentialsProvider(credentialsProvider)
114-
.endpointOverride(endpointUri)
115-
.forcePathStyle(endpointUri != null)
120+
.endpointOverride(s3EndpointUri)
121+
.forcePathStyle(s3EndpointUri != null)
116122
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
117123
.overrideConfiguration(clientConfig)
118124
.region(Region.of(this.config.region))
@@ -121,7 +127,10 @@ public EventBridgeWriter(EventBridgeSinkConfig config) {
121127
var jsonPathExp = StringUtils.trim(config.offloadingDefaultFieldRef);
122128

123129
log.info(
124-
"S3 offloading is activated with bucket: {} and JSON path: {}", bucketName, jsonPathExp);
130+
"S3 offloading is activated with bucket: {}, JSON path: {} and endpoint override: {}",
131+
bucketName,
132+
jsonPathExp,
133+
s3EndpointUri == null ? "-" : s3EndpointUri);
125134
offloading = new S3EventBridgeEventDetailValueOffloading(s3client, bucketName, jsonPathExp);
126135
} else {
127136
log.info("S3 offloading is deactivated");

0 commit comments

Comments
 (0)