Conversation
| // 如果断开就重新连接 | ||
| if (this.client == null || !this.client.isConnected()) { | ||
| // 如果关闭的话,就重新连接 | ||
| this.openInternal(1, 1); |
There was a problem hiding this comment.
重新创建连接, 直接调用 MqttConnectUtil.getMqttClient(...)
There was a problem hiding this comment.
这种创建连接,有问题,我不知道底层为什么行不通,但是我测试的结果告诉我,不行!!还得是 this.openInternal(1, 1);
| public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) { | ||
| MqttClient client = null; | ||
| for (int i = 0; i <= 2; i++) { | ||
| for (int i = 0; i <= 60; i++) { |
| protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException { | ||
| try { | ||
| // 如果断开就重新连接 | ||
| if (this.client == null || !this.client.isConnected()) { |
There was a problem hiding this comment.
如果写入每条数据都进行client.isConnected()判断 的话,十分消耗性能
可以对client.publish()进行捕获, 产生错误的时候,再重新建立连接
There was a problem hiding this comment.
隔了两个星期正式回复一下,我之前提交了两次代码,按照您的指导。 但是我亲自测试,最后的版本运行不超过6h就会失败,报错是未连接! 然后我不断的改地方,发现,如果在异常产生错误的时候,做判断,然后建立连接,是行不通的!!! 然后在每次写入数据的时候判断,测试几个小时,发现还是不行!!!! 然后我把{重新创建连接, 直接调用 MqttConnectUtil.getMqttClient(...) } 改成第一版本的this.openInternal(1, 1); 运行1day都没问题,现在还在运行不报错。第一个版本,之前测试了一个星期都没有问题。 这个是我两个星期测试的结果反馈
mintty.exe.stackdump
Outdated
| @@ -0,0 +1,19 @@ | |||
| Stack trace: | |||
| throw new RuntimeException(interruptedException); | ||
| } | ||
| if (i == 2) { | ||
| if (i == 60) { |
| Set<ConfigOption<?>> requiredOptions = new HashSet<>(); | ||
| requiredOptions.add(BROKER); | ||
| requiredOptions.add(TOPIC); | ||
| requiredOptions.add(TIMES); |
There was a problem hiding this comment.
重试次数不是必须的配置项, 没有配置用默认的重试次数即可
| .defaultValue("writer") | ||
| .withDescription("dclient.id.pre"); | ||
| /** 重连接的次数 * */ | ||
| public static final ConfigOption<Integer> TIMES = |
There was a problem hiding this comment.
变量命名可以规范一点, CONNECT_RETRY_TIMES
| /** emq codec */ | ||
| private String codec = "plain"; | ||
| /** emqx reconnect times */ | ||
| private int times = 10; |
| private boolean isCleanSession = true; | ||
| /** emq EXACTLY_ONCE */ | ||
| private int qos = 2; | ||
| private int qos = 1; |
2、设置重连次数为非必要配置项 3、把60改为变量 4、删除了额外的文件 5、把每次写入数据都判断状态这种耗资源的写法改为捕获错误的时候再重新建立连接
|
1、把写入每条数据都进行client.isConnected()判断 ,十分消耗性能 改为 |
|
1、emqx模块 代码中文注释变英文注释 |
|
@15656215623 commit信息还需要完善一下
|
| MqttConnectUtil.getMqttClient( | ||
| emqxConf, | ||
| CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId); | ||
| } |
There was a problem hiding this comment.
只重新建立了连接,没有把那条数据也publish, 上面这条数据也需要publish一下
There was a problem hiding this comment.
这里行不通的,不超过几个小时就会连接失败,更不要提数据的完整性了!!!
There was a problem hiding this comment.
因为业务需求,我一直在测试emqx这个插件的稳定性,根据你的建议,反复的测试,验证,做了很多实验,发现行不通!!! 实验一、最后的版本(在抛出异常的时候判断连接状态,然后建立连接用 MqttConnectUtil.getMqttClient(,把连接次数搞成可调节的参数) 运行不超过3h 实验二、(把判断连接状态放在每次写入数据那里,和实验一相比,其他的不动) 运行不超过5h 实验三、在实验二的基础上,把连接次数改成固定的60 也失败; 实验四 、 在实验三的基础上,把MqttConnectUtil.getMqttClient( 改成this.openInternal(1, 1); 也就是完全的第一个版本。 运行没有问题,超过1d 之前测试都7d了
There was a problem hiding this comment.
这些是我实践的结果,但是我不理解,为什么按照你说的行不通,我也觉得很合理!!!!!如果想明白为什么,可以告诉我,我再测试看看,
|
目前我用的是第一版本,因为稳定性是我现在最需要的 |
Purpose of this pull request
Which issue you fix
Fixes emqx断开不能重连bug修复
Checklist: