From 9d0ecdfc45706a64f1f16e1ec6e45cece408c92a Mon Sep 17 00:00:00 2001 From: Ruhshan Date: Wed, 2 Aug 2023 00:45:00 +0600 Subject: [PATCH 1/3] [ISSUE #4266]: Retore interrupted state for interrupted exception in open function sinc connector --- .../sink/connector/OpenFunctionSinkConnector.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java index 80052f4edf..459f2d26b8 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java @@ -74,7 +74,10 @@ public void put(List sinkRecords) { try { queue.put(connectRecord); } catch (InterruptedException e) { - throw new RuntimeException(e); + Thread currentThread = Thread.currentThread(); + log.warn("[OpenFunctionSinkConnector] Interrupting thread {} due to exception {}", + currentThread.getName(), e.getMessage()); + currentThread.interrupt(); } } } From 9a2e60e0325ba72809d16b74b63aea332bd3e119 Mon Sep 17 00:00:00 2001 From: Ruhshan Date: Wed, 2 Aug 2023 00:46:24 +0600 Subject: [PATCH 2/3] [ISSUE #4266]: Retore interrupted state for interrupted exception in open function source connector --- .../source/connector/OpenFunctionSourceConnector.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java index c259485758..163c9b1209 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java @@ -88,8 +88,10 @@ public List poll() { } connectRecords.add(connectRecord); } catch (InterruptedException e) { - // nothing to do - break; + Thread currentThread = Thread.currentThread(); + log.warn("[OpenFunctionSourceConnector] Interrupting thread {} due to exception {}", + currentThread.getName(), e.getMessage()); + currentThread.interrupt(); } } return connectRecords; From e137ad4acbfc12ce2d6fdb7d571a25783cc6ad15 Mon Sep 17 00:00:00 2001 From: Ruhshan Date: Wed, 2 Aug 2023 00:48:46 +0600 Subject: [PATCH 3/3] [ISSUE #4266]: Retore interrupted state for interrupted exception in RocketMQSink connector --- .../rocketmq/sink/connector/RocketMQSinkConnector.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java index f33e604e2f..a65ecb26c4 100644 --- a/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/sink/connector/RocketMQSinkConnector.java @@ -77,6 +77,11 @@ public void put(List sinkRecords) { Message message = convertRecordToMessage(connectRecord); try { SendResult sendResult = producer.send(message); + } catch (InterruptedException e) { + Thread currentThread = Thread.currentThread(); + log.warn("[RocketMQSinkConnector] Interrupting thread {} due to exception {}", + currentThread.getName(), e.getMessage()); + currentThread.interrupt(); } catch (Exception e) { log.error("[RocketMQSinkConnector] sendResult has error : ", e); }