diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java index 80052f4edf..459f2d26b8 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java @@ -74,7 +74,10 @@ public void put(List sinkRecords) { try { queue.put(connectRecord); } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread currentThread = Thread.currentThread(); + log.warn("[OpenFunctionSinkConnector] Interrupting thread {} due to exception {}", + currentThread.getName(), e.getMessage()); + currentThread.interrupt(); } } } diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java index c259485758..163c9b1209 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java @@ -88,8 +88,10 @@ public List poll() { } connectRecords.add(connectRecord); } catch (InterruptedException e) { - // nothing to do - break; + Thread currentThread = Thread.currentThread(); + log.warn("[OpenFunctionSourceConnector] Interrupting thread {} due to exception {}", + currentThread.getName(), e.getMessage()); + currentThread.interrupt(); } } return connectRecords; diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java index f33e604e2f..a65ecb26c4 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java @@ -77,6 +77,11 @@ public void put(List sinkRecords) { Message message = convertRecordToMessage(connectRecord); try { SendResult sendResult = producer.send(message); + } catch (InterruptedException e) { + Thread currentThread = Thread.currentThread(); + log.warn("[RocketMQSinkConnector] Interrupting thread {} due to exception {}", + currentThread.getName(), e.getMessage()); + currentThread.interrupt(); } catch (Exception e) { log.error("[RocketMQSinkConnector] sendResult has error : ", e); }