Skip to content

Commit a991cfd

Browse files
committed
feat: add AWS S3 sidelining (aka claim check)
Add feature to extract and send (sideline) EventBridge event value, which is the content of the Kafka record value, to S3 by a given JSON Path. The event with the transformed value is send to EventBridge with the reference to the S3 object and the used JSON Path. The feature is activated if the S3 bucket (e.g. 'my-bucket') is configured for the connector by: ```json5 { "config": { // other required configuration "aws.eventbridge.offloading.s3.default.bucket": "my-bucket" } } ``` The user must have write access to the configured bucket. The default JSON Path is `$.detail.value` referencing a EventBridge entry. A valid JSON Path must start with `$.detail.value` and must reference a single JSON object, array or value. The JSON Path can be configured for the connector by: ```json5 { "config": { // other required configuration "aws.eventbridge.offloading.default.fieldref": "$.detail.value" } } ``` If the JSON Path does not match, nothing is send to S3. Otherwise, if the JSON Path matches a non `null` JSON object, array or value, this value is send to S3 and the JSON attribute is removed. If the value is `null` and matches the JSON Path, the JSON attribute with it's `null` value is keept and nothing send to S3. If the feature is activated and a value was send to S3, the EventBridge event contains the attribute `$.detail.datarefJsonPath` with the configured JSON Path and the S3 object reference at `$.detail.dataref`. Otherwise the EventBridge event is unchanged. A FIFO cache with a maximum size of 10.000 is used to look the uploaded content to S3 by it's SHA512 and re-use the S3 object key whenever possible. Closes #110.
1 parent 714b868 commit a991cfd

21 files changed

+2014
-268
lines changed

e2e/connect-config-s3.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "eventbridge-e2e-s3",
3+
"config": {
4+
"auto.offset.reset": "earliest",
5+
"connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
6+
"topics": "eventbridge-e2e",
7+
"aws.eventbridge.connector.id": "eventbridge-e2e-connector",
8+
"aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:000000000000:event-bus/eventbridge-e2e",
9+
"aws.eventbridge.region": "us-east-1",
10+
"aws.eventbridge.endpoint.uri": "http://localstack:4566",
11+
"aws.eventbridge.offloading.s3.default.bucket": "test-bucket",
12+
"aws.eventbridge.offloading.default.fieldref": "$.detail.value.message",
13+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
14+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
15+
"value.converter.schemas.enable": false
16+
}
17+
}

pom.xml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<commons-codec.version>1.17.0</commons-codec.version>
2828
<commons-logging.version>1.3.1</commons-logging.version>
2929
<connect.api.version>3.7.0</connect.api.version>
30+
<equalsverifier.version>3.16.1</equalsverifier.version>
3031
<joda-time.version>2.12.7</joda-time.version>
3132
<org.apache.commons.version>3.14.0</org.apache.commons.version>
3233
<org.assertj.version>3.25.3</org.assertj.version>
@@ -36,6 +37,7 @@
3637
<org.junit.jupiter.version>5.10.2</org.junit.jupiter.version>
3738
<org.mockito.version>5.11.0</org.mockito.version>
3839
<org.slf4j.version>2.0.13</org.slf4j.version>
40+
<org.skyscreamer.version>1.5.1</org.skyscreamer.version>
3941
<org.testcontainers.version>1.19.7</org.testcontainers.version>
4042
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4143
</properties>
@@ -46,11 +48,20 @@
4648
<artifactId>logback-classic</artifactId>
4749
<scope>test</scope>
4850
</dependency>
51+
<dependency>
52+
<groupId>com.jayway.jsonpath</groupId>
53+
<artifactId>json-path</artifactId>
54+
</dependency>
4955
<dependency>
5056
<groupId>com.jayway.jsonpath</groupId>
5157
<artifactId>json-path-assert</artifactId>
5258
<scope>test</scope>
5359
</dependency>
60+
<dependency>
61+
<groupId>nl.jqno.equalsverifier</groupId>
62+
<artifactId>equalsverifier</artifactId>
63+
<scope>test</scope>
64+
</dependency>
5465
<dependency>
5566
<groupId>org.apache.commons</groupId>
5667
<artifactId>commons-lang3</artifactId>
@@ -100,6 +111,11 @@
100111
<artifactId>testcontainers</artifactId>
101112
<scope>test</scope>
102113
</dependency>
114+
<dependency>
115+
<groupId>org.skyscreamer</groupId>
116+
<artifactId>jsonassert</artifactId>
117+
<scope>test</scope>
118+
</dependency>
103119
<dependency>
104120
<groupId>software.amazon.awssdk</groupId>
105121
<artifactId>auth-crt</artifactId>
@@ -122,6 +138,10 @@
122138
</exclusion>
123139
</exclusions>
124140
</dependency>
141+
<dependency>
142+
<groupId>software.amazon.awssdk</groupId>
143+
<artifactId>s3</artifactId>
144+
</dependency>
125145
<dependency>
126146
<groupId>software.amazon.awssdk</groupId>
127147
<artifactId>http-client-spi</artifactId>
@@ -157,6 +177,11 @@
157177
<scope>import</scope>
158178
<type>pom</type>
159179
</dependency>
180+
<dependency>
181+
<groupId>com.jayway.jsonpath</groupId>
182+
<artifactId>json-path</artifactId>
183+
<version>${com.jayway.jsonpath.version}</version>
184+
</dependency>
160185
<dependency>
161186
<groupId>com.jayway.jsonpath</groupId>
162187
<artifactId>json-path-assert</artifactId>
@@ -177,6 +202,11 @@
177202
<artifactId>joda-time</artifactId>
178203
<version>${joda-time.version}</version>
179204
</dependency>
205+
<dependency>
206+
<groupId>nl.jqno.equalsverifier</groupId>
207+
<artifactId>equalsverifier</artifactId>
208+
<version>${equalsverifier.version}</version>
209+
</dependency>
180210
<dependency>
181211
<groupId>org.apache.commons</groupId>
182212
<artifactId>commons-lang3</artifactId>
@@ -240,6 +270,11 @@
240270
<artifactId>testcontainers</artifactId>
241271
<version>${org.testcontainers.version}</version>
242272
</dependency>
273+
<dependency>
274+
<groupId>org.skyscreamer</groupId>
275+
<artifactId>jsonassert</artifactId>
276+
<version>${org.skyscreamer.version}</version>
277+
</dependency>
243278
<dependency>
244279
<groupId>software.amazon.awssdk</groupId>
245280
<artifactId>auth-crt</artifactId>

src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
import org.apache.kafka.common.config.ConfigDef.Type;
1414
import org.slf4j.Logger;
1515
import software.amazon.event.kafkaconnector.logging.ContextAwareLoggerFactory;
16+
import software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading;
1617

1718
public class EventBridgeSinkConfig extends AbstractConfig {
1819

20+
private static final Logger log =
21+
ContextAwareLoggerFactory.getLogger(EventBridgeSinkConfig.class);
22+
1923
// used in event source and IAM session role name
2024
static final String AWS_CONNECTOR_ID_CONFIG = "aws.eventbridge.connector.id";
2125
static final String AWS_REGION_CONFIG = "aws.eventbridge.region";
@@ -33,6 +37,11 @@ public class EventBridgeSinkConfig extends AbstractConfig {
3337
static final String AWS_DETAIL_TYPES_CONFIG = "aws.eventbridge.detail.types";
3438
static final String AWS_DETAIL_TYPES_MAPPER_CLASS = "aws.eventbridge.detail.types.mapper.class";
3539
static final String AWS_EVENTBUS_RESOURCES_CONFIG = "aws.eventbridge.eventbus.resources";
40+
static final String AWS_OFFLOADING_S3_DEFAULT_BUCKET =
41+
"aws.eventbridge.offloading.s3.default.bucket";
42+
static final String AWS_OFFLOADING_DEFAULT_FIELDREF =
43+
"aws.eventbridge.offloading.default.fieldref";
44+
3645
private static final String AWS_CONNECTOR_ID_DOC =
3746
"The unique ID of this connector (used in the event source field to uniquely identify a connector).";
3847
private static final String AWS_REGION_DOC = "The AWS region of the event bus.";
@@ -53,6 +62,12 @@ public class EventBridgeSinkConfig extends AbstractConfig {
5362
private static final String AWS_PROFILE_NAME_CONFIG_DOC =
5463
"The profile to use from the configuration and credentials files to retrieve IAM credentials";
5564
public static final String AWS_DETAIL_TYPES_DEFAULT = "kafka-connect-${topic}";
65+
public static final String AWS_OFFLOADING_S3_DEFAULT_BUCKET_DOC =
66+
"The S3 bucket to offload matched record value by JSON Path";
67+
public static final String AWS_OFFLOADING_DEFAULT_FIELDREF_DOC =
68+
"The JSON Path to offload record value";
69+
public static final String AWS_OFFLOADING_DEFAULT_FIELDREF_DEFAULT =
70+
S3EventBridgeEventDetailValueOffloading.JSON_PATH_PREFIX;
5671

5772
private static final String AWS_DETAIL_TYPES_MAPPER_CLASS_DEFAULT =
5873
"software.amazon.event.kafkaconnector.mapping.DefaultDetailTypeMapper";
@@ -81,10 +96,11 @@ public class EventBridgeSinkConfig extends AbstractConfig {
8196
public final List<String> resources;
8297
public final int maxRetries;
8398
public final long retriesDelay;
84-
private final Logger log = ContextAwareLoggerFactory.getLogger(EventBridgeSinkConfig.class);
8599
public Map<String, String> detailTypeByTopic;
86100
public String detailType;
87101
public String detailTypeMapperClass;
102+
public String offloadingS3defaultBucket;
103+
public String offloadingDefaultFieldRef;
88104

89105
public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
90106
super(CONFIG_DEF, originalProps);
@@ -100,6 +116,8 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
100116
this.retriesDelay = getInt(AWS_RETRIES_DELAY_CONFIG);
101117
this.resources = getList(AWS_EVENTBUS_RESOURCES_CONFIG);
102118
this.detailTypeMapperClass = getString(AWS_DETAIL_TYPES_MAPPER_CLASS);
119+
this.offloadingS3defaultBucket = getString(AWS_OFFLOADING_S3_DEFAULT_BUCKET);
120+
this.offloadingDefaultFieldRef = getString(AWS_OFFLOADING_DEFAULT_FIELDREF);
103121

104122
var detailTypes = getList(AWS_DETAIL_TYPES_CONFIG);
105123
if (detailTypes.size() > 1 || detailTypes.get(0).contains(":")) {
@@ -113,7 +131,8 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
113131
log.info(
114132
"EventBridge properties: connectorId={} eventBusArn={} eventBusRegion={} eventBusEndpointURI={} "
115133
+ "eventBusMaxRetries={} eventBusRetriesDelay={} eventBusResources={} "
116-
+ "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={}",
134+
+ "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={}"
135+
+ "offloadingS3defaultBucket={} offloadingDefaultFieldRef={}",
117136
connectorId,
118137
eventBusArn,
119138
region,
@@ -124,7 +143,9 @@ public EventBridgeSinkConfig(final Map<?, ?> originalProps) {
124143
endpointID,
125144
roleArn,
126145
connectorId,
127-
externalId);
146+
externalId,
147+
offloadingS3defaultBucket,
148+
offloadingDefaultFieldRef);
128149
}
129150

130151
private static ConfigDef createConfigDef() {
@@ -154,7 +175,6 @@ private static void addParams(final ConfigDef configDef) {
154175
AWS_ROLE_EXTERNAL_ID_CONFIG_DOC);
155176
configDef.define(
156177
AWS_PROFILE_NAME_CONFIG, Type.STRING, "", Importance.MEDIUM, AWS_PROFILE_NAME_CONFIG_DOC);
157-
;
158178
configDef.define(
159179
AWS_RETRIES_CONFIG, Type.INT, AWS_RETRIES_DEFAULT, Importance.MEDIUM, AWS_RETRIES_DOC);
160180
configDef.define(
@@ -181,5 +201,17 @@ private static void addParams(final ConfigDef configDef) {
181201
AWS_DETAIL_TYPES_MAPPER_CLASS_DEFAULT,
182202
Importance.MEDIUM,
183203
AWS_DETAIL_TYPES_MAPPER_DOC);
204+
configDef.define(
205+
AWS_OFFLOADING_S3_DEFAULT_BUCKET,
206+
Type.STRING,
207+
"",
208+
Importance.MEDIUM,
209+
AWS_OFFLOADING_S3_DEFAULT_BUCKET_DOC);
210+
configDef.define(
211+
AWS_OFFLOADING_DEFAULT_FIELDREF,
212+
Type.STRING,
213+
AWS_OFFLOADING_DEFAULT_FIELDREF_DEFAULT,
214+
Importance.MEDIUM,
215+
AWS_OFFLOADING_DEFAULT_FIELDREF_DOC);
184216
}
185217
}

src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfigValidator.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.common.config.ConfigValue;
2020
import software.amazon.awssdk.regions.Region;
2121
import software.amazon.awssdk.regions.RegionMetadata;
22+
import software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading;
2223

2324
public class EventBridgeSinkConfigValidator {
2425

@@ -92,6 +93,18 @@ public static void validate(ConfigValue configValue, EnvVarGetter getenv) {
9293
validateDetailTypeMapperClass(configValue);
9394
break;
9495
}
96+
97+
case AWS_OFFLOADING_S3_DEFAULT_BUCKET:
98+
{
99+
nonStrictValidateOffloadingS3DefaultBucket(configValue);
100+
break;
101+
}
102+
103+
case AWS_OFFLOADING_DEFAULT_FIELDREF:
104+
{
105+
validateOffloadingDefaultFieldRef(configValue);
106+
break;
107+
}
95108
}
96109
}
97110

@@ -208,4 +221,25 @@ private static void validateValueWithPattern(String key, String value, Pattern p
208221
key, value, pattern));
209222
}
210223
}
224+
225+
private static void nonStrictValidateOffloadingS3DefaultBucket(ConfigValue configValue) {
226+
var value = (String) configValue.value();
227+
if (value == null || value.isBlank()) return;
228+
229+
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
230+
var sufficient = Pattern.compile("^[a-z0-9][a-z0-9.-]{1,61}?[a-z0-9]$");
231+
if (!sufficient.matcher(value).find()) {
232+
throw new ConfigException(String.format("\"%s\" is not a valid S3 bucket name", value));
233+
}
234+
}
235+
236+
private static void validateOffloadingDefaultFieldRef(ConfigValue configValue) {
237+
var value = (String) configValue.value();
238+
if (value == null || value.isBlank()) return;
239+
try {
240+
S3EventBridgeEventDetailValueOffloading.validateJsonPath(value);
241+
} catch (IllegalArgumentException e) {
242+
throw new ConfigException(String.format("\"%s\" is not a valid offload JSON Path", value), e);
243+
}
244+
}
211245
}

src/main/java/software/amazon/event/kafkaconnector/EventBridgeWriter.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,17 @@
3838
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest;
3939
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry;
4040
import software.amazon.awssdk.services.eventbridge.model.PutEventsResponse;
41+
import software.amazon.awssdk.services.s3.S3AsyncClient;
4142
import software.amazon.awssdk.utils.StringUtils;
4243
import software.amazon.event.kafkaconnector.auth.EventBridgeCredentialsProvider;
4344
import software.amazon.event.kafkaconnector.batch.DefaultEventBridgeBatching;
4445
import software.amazon.event.kafkaconnector.batch.EventBridgeBatchingStrategy;
4546
import software.amazon.event.kafkaconnector.logging.ContextAwareLoggerFactory;
4647
import software.amazon.event.kafkaconnector.mapping.DefaultEventBridgeMapper;
4748
import software.amazon.event.kafkaconnector.mapping.EventBridgeMapper;
49+
import software.amazon.event.kafkaconnector.offloading.EventBridgeEventDetailValueOffloadingStrategy;
50+
import software.amazon.event.kafkaconnector.offloading.NoOpEventBridgeEventDetailValueOffloading;
51+
import software.amazon.event.kafkaconnector.offloading.S3EventBridgeEventDetailValueOffloading;
4852
import software.amazon.event.kafkaconnector.util.EventBridgeEventId;
4953
import software.amazon.event.kafkaconnector.util.MappedSinkRecord;
5054
import software.amazon.event.kafkaconnector.util.PropertiesUtil;
@@ -58,6 +62,7 @@ public class EventBridgeWriter {
5862
private final EventBridgeAsyncClient ebClient;
5963
private final EventBridgeMapper eventBridgeMapper;
6064
private final EventBridgeBatchingStrategy batching;
65+
private final EventBridgeEventDetailValueOffloadingStrategy offloading;
6166

6267
/**
6368
* @param config Configuration of Sink Client (AWS Region, Eventbus ARN etc.)
@@ -101,6 +106,27 @@ public EventBridgeWriter(EventBridgeSinkConfig config) {
101106
this.eventBridgeMapper = new DefaultEventBridgeMapper(config);
102107
this.batching = new DefaultEventBridgeBatching();
103108

109+
if ((config.offloadingS3defaultBucket != null) && !config.offloadingS3defaultBucket.isEmpty()) {
110+
var s3client =
111+
S3AsyncClient.builder()
112+
.credentialsProvider(credentialsProvider)
113+
.endpointOverride(endpointUri)
114+
.forcePathStyle(endpointUri != null)
115+
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
116+
.overrideConfiguration(clientConfig)
117+
.region(Region.of(this.config.region))
118+
.build();
119+
var bucketName = StringUtils.trim(config.offloadingS3defaultBucket);
120+
var jsonPathExp = StringUtils.trim(config.offloadingDefaultFieldRef);
121+
122+
log.info(
123+
"S3 offloading is activated with bucket: {} and JSON path: {}", bucketName, jsonPathExp);
124+
offloading = new S3EventBridgeEventDetailValueOffloading(s3client, bucketName, jsonPathExp);
125+
} else {
126+
log.info("S3 offloading is deactivated");
127+
offloading = new NoOpEventBridgeEventDetailValueOffloading();
128+
}
129+
104130
log.trace(
105131
"EventBridgeWriter client config: {}",
106132
ReflectionToStringBuilder.toString(
@@ -126,6 +152,7 @@ public EventBridgeWriter(EventBridgeAsyncClient ebClient, EventBridgeSinkConfig
126152
this.ebClient = ebClient;
127153
this.eventBridgeMapper = new DefaultEventBridgeMapper(config);
128154
this.batching = new DefaultEventBridgeBatching();
155+
this.offloading = new NoOpEventBridgeEventDetailValueOffloading();
129156
}
130157

131158
/**
@@ -143,9 +170,17 @@ public List<EventBridgeResult<EventBridgeEventId>> putItems(List<SinkRecord> rec
143170
return mappingResult.getErrorsAsResult();
144171
}
145172

173+
// NoOpEventBridgeEventDetailValueOffloading is used if
174+
// `aws.eventbridge.offloading.s3.default.bucket` is not configured
175+
var offloadingResult = offloading.apply(mappingResult.success);
176+
if (offloadingResult.success.isEmpty()) {
177+
log.warn("Not sending events to EventBridge: offloading failed");
178+
return offloadingResult.getErrorsAsResult();
179+
}
180+
146181
var sendItemResults =
147182
batching
148-
.apply(mappingResult.success.stream())
183+
.apply(offloadingResult.success.stream())
149184
.flatMap(this::sendToEventBridge)
150185
.collect(toList());
151186

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package software.amazon.event.kafkaconnector.cache;
6+
7+
import java.util.function.Function;
8+
9+
public interface Cache<K, V> {
10+
11+
V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction);
12+
}

0 commit comments

Comments
 (0)