Skip to content

Commit 3a2e91b

Browse files
authored
Merge e137ad4 into dd40227
2 parents dd40227 + e137ad4 commit 3a2e91b

3 files changed

Lines changed: 13 additions & 3 deletions

File tree

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@ public void put(List<ConnectRecord> sinkRecords) {
7474
try {
7575
queue.put(connectRecord);
7676
} catch (InterruptedException e) {
77-
throw new RuntimeException(e);
77+
Thread currentThread = Thread.currentThread();
78+
log.warn("[OpenFunctionSinkConnector] Interrupting thread {} due to exception {}",
79+
currentThread.getName(), e.getMessage());
80+
currentThread.interrupt();
7881
}
7982
}
8083
}

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ public List<ConnectRecord> poll() {
8888
}
8989
connectRecords.add(connectRecord);
9090
} catch (InterruptedException e) {
91-
// nothing to do
92-
break;
91+
Thread currentThread = Thread.currentThread();
92+
log.warn("[OpenFunctionSourceConnector] Interrupting thread {} due to exception {}",
93+
currentThread.getName(), e.getMessage());
94+
currentThread.interrupt();
9395
}
9496
}
9597
return connectRecords;

eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public void put(List<ConnectRecord> sinkRecords) {
7777
Message message = convertRecordToMessage(connectRecord);
7878
try {
7979
SendResult sendResult = producer.send(message);
80+
} catch (InterruptedException e) {
81+
Thread currentThread = Thread.currentThread();
82+
log.warn("[RocketMQSinkConnector] Interrupting thread {} due to exception {}",
83+
currentThread.getName(), e.getMessage());
84+
currentThread.interrupt();
8085
} catch (Exception e) {
8186
log.error("[RocketMQSinkConnector] sendResult has error : ", e);
8287
}

0 commit comments

Comments
 (0)