|
29 | 29 | import com.dtstack.chunjun.enums.EWriteMode; |
30 | 30 | import com.dtstack.chunjun.enums.Semantic; |
31 | 31 | import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; |
32 | | -import com.dtstack.chunjun.throwable.ChunJunRuntimeException; |
33 | 32 | import com.dtstack.chunjun.throwable.WriteRecordException; |
34 | 33 | import com.dtstack.chunjun.util.ExceptionUtil; |
35 | 34 | import com.dtstack.chunjun.util.GsonUtil; |
36 | 35 | import com.dtstack.chunjun.util.JsonUtil; |
37 | 36 |
|
38 | 37 | import org.apache.flink.table.data.GenericRowData; |
39 | 38 | import org.apache.flink.table.data.RowData; |
40 | | -import org.apache.flink.util.FlinkRuntimeException; |
41 | 39 |
|
42 | 40 | import org.apache.commons.collections.CollectionUtils; |
43 | 41 | import org.apache.commons.lang3.StringUtils; |
@@ -181,29 +179,6 @@ protected void writeMultipleRecordsInternal() throws Exception { |
181 | 179 | } |
182 | 180 | } |
183 | 181 |
|
184 | | - @Override |
185 | | - public synchronized void writeRecord(RowData rowData) { |
186 | | - checkConnValid(); |
187 | | - super.writeRecord(rowData); |
188 | | - } |
189 | | - |
190 | | - public void checkConnValid() { |
191 | | - try { |
192 | | - LOG.debug("check db connection valid.."); |
193 | | - if (!dbConn.isValid(10)) { |
194 | | - if (Semantic.EXACTLY_ONCE == semantic) { |
195 | | - throw new FlinkRuntimeException( |
196 | | - "jdbc connection is valid!work's semantic is ExactlyOnce.To prevent data loss,we don't try to reopen the connection"); |
197 | | - } |
198 | | - LOG.info("db connection reconnect.."); |
199 | | - dbConn = getConnection(); |
200 | | - stmtProxy.reOpen(dbConn); |
201 | | - } |
202 | | - } catch (Exception e) { |
203 | | - throw new ChunJunRuntimeException("failed to check jdbcConnection valid", e); |
204 | | - } |
205 | | - } |
206 | | - |
207 | 182 | @Override |
208 | 183 | public void preCommit() throws Exception { |
209 | 184 | if (jdbcConf.getRestoreColumnIndex() > -1) { |
|
0 commit comments