Skip to content

Commit 9f18c87

Browse files
ll076110otxiyin
andauthored
[feat-#1244][sybase] add sybase reader plugin (#1245)
Co-authored-by: otxiyin <ptsxndg@gmail.com>
1 parent 0610bc7 commit 9f18c87

12 files changed

Lines changed: 948 additions & 1 deletion

File tree

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>chunjun-connectors</artifactId>
7+
<groupId>com.dtstack.chunjun</groupId>
8+
<version>1.12-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>chunjun-connector-sybase</artifactId>
13+
<name>ChunJun : Connectors : sybase</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>com.dtstack.chunjun</groupId>
18+
<artifactId>chunjun-connector-jdbc-base</artifactId>
19+
<version>${project.version}</version>
20+
</dependency>
21+
<dependency>
22+
<groupId>net.sourceforge.jtds</groupId>
23+
<artifactId>jtds</artifactId>
24+
<version>1.3.1</version>
25+
</dependency>
26+
</dependencies>
27+
28+
<build>
29+
<plugins>
30+
<plugin>
31+
<groupId>org.apache.maven.plugins</groupId>
32+
<artifactId>maven-shade-plugin</artifactId>
33+
<version>3.1.0</version>
34+
<executions>
35+
<execution>
36+
<phase>package</phase>
37+
<goals>
38+
<goal>shade</goal>
39+
</goals>
40+
<configuration>
41+
<createDependencyReducedPom>false</createDependencyReducedPom>
42+
<artifactSet>
43+
<excludes>
44+
<exclude>org.slf4j:slf4j-api</exclude>
45+
<exclude>log4j:log4j</exclude>
46+
<exclude>ch.qos.logback:*</exclude>
47+
</excludes>
48+
</artifactSet>
49+
<filters>
50+
<filter>
51+
<artifact>*:*</artifact>
52+
<excludes>
53+
<exclude>META-INF/*.SF</exclude>
54+
<exclude>META-INF/*.DSA</exclude>
55+
<exclude>META-INF/*.RSA</exclude>
56+
</excludes>
57+
</filter>
58+
</filters>
59+
</configuration>
60+
</execution>
61+
</executions>
62+
</plugin>
63+
64+
<plugin>
65+
<artifactId>maven-antrun-plugin</artifactId>
66+
<executions>
67+
<execution>
68+
<id>copy-resources</id>
69+
<!-- here the phase you need -->
70+
<phase>package</phase>
71+
<goals>
72+
<goal>run</goal>
73+
</goals>
74+
<configuration>
75+
<tasks>
76+
<copy todir="${basedir}/../../${dist.dir}/connector/sybase/"
77+
file="${basedir}/target/${project.artifactId}-${project.version}.jar"/>
78+
<move file="${basedir}/../../${dist.dir}/connector/sybase/${project.artifactId}-${project.version}.jar"
79+
tofile="${basedir}/../../${dist.dir}/connector/sybase/${project.artifactId}.jar"/>
80+
<delete>
81+
<fileset dir="${basedir}/../../${dist.dir}/connector/sybase/"
82+
includes="${project.artifactId}-*.jar"
83+
excludes="${project.artifactId}.jar"/>
84+
</delete>
85+
</tasks>
86+
</configuration>
87+
</execution>
88+
</executions>
89+
</plugin>
90+
</plugins>
91+
</build>
92+
93+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.sybase.converter;
20+
21+
import com.dtstack.chunjun.conf.ChunJunCommonConf;
22+
import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter;
23+
import com.dtstack.chunjun.converter.IDeserializationConverter;
24+
import com.dtstack.chunjun.element.AbstractBaseColumn;
25+
import com.dtstack.chunjun.element.column.BigDecimalColumn;
26+
import com.dtstack.chunjun.element.column.BooleanColumn;
27+
import com.dtstack.chunjun.element.column.BytesColumn;
28+
import com.dtstack.chunjun.element.column.SqlDateColumn;
29+
import com.dtstack.chunjun.element.column.StringColumn;
30+
import com.dtstack.chunjun.element.column.TimeColumn;
31+
import com.dtstack.chunjun.element.column.TimestampColumn;
32+
33+
import org.apache.flink.table.types.logical.LogicalType;
34+
import org.apache.flink.table.types.logical.RowType;
35+
import org.apache.flink.table.types.logical.TimestampType;
36+
import org.apache.flink.table.types.logical.YearMonthIntervalType;
37+
38+
import java.math.BigDecimal;
39+
import java.math.BigInteger;
40+
import java.sql.Date;
41+
import java.sql.Time;
42+
import java.sql.Timestamp;
43+
44+
/** @Author OT @Date 2022/6/16 17:52 @Version 1.0 */
45+
public class SybaseColumnConverter extends JdbcColumnConverter {
46+
public SybaseColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
47+
super(rowType, commonConf);
48+
}
49+
50+
@Override
51+
protected IDeserializationConverter createInternalConverter(LogicalType type) {
52+
switch (type.getTypeRoot()) {
53+
case BOOLEAN:
54+
return val -> {
55+
// compatible with BIT(>1)
56+
if (val instanceof byte[]) {
57+
return new BytesColumn((byte[]) val);
58+
} else {
59+
return new BooleanColumn(Boolean.parseBoolean(val.toString()));
60+
}
61+
};
62+
case TINYINT:
63+
return val -> new BigDecimalColumn(((Integer) val).byteValue());
64+
case SMALLINT:
65+
case INTEGER:
66+
return val -> new BigDecimalColumn((Integer) val);
67+
case INTERVAL_YEAR_MONTH:
68+
return (IDeserializationConverter<Object, AbstractBaseColumn>)
69+
val -> {
70+
YearMonthIntervalType yearMonthIntervalType =
71+
(YearMonthIntervalType) type;
72+
switch (yearMonthIntervalType.getResolution()) {
73+
case YEAR:
74+
return new BigDecimalColumn(
75+
Integer.parseInt(String.valueOf(val).substring(0, 4)));
76+
case MONTH:
77+
case YEAR_TO_MONTH:
78+
default:
79+
throw new UnsupportedOperationException(
80+
"jdbc converter only support YEAR");
81+
}
82+
};
83+
case FLOAT:
84+
return val -> {
85+
if (val instanceof Double) {
86+
BigDecimal b = new BigDecimal(String.valueOf(val));
87+
return new BigDecimalColumn(b.doubleValue());
88+
}
89+
return new BigDecimalColumn((Float) val);
90+
};
91+
case DOUBLE:
92+
return val -> new BigDecimalColumn((Double) val);
93+
case BIGINT:
94+
return val -> {
95+
if (val instanceof Integer) {
96+
return new BigDecimalColumn((Integer) val);
97+
}
98+
return new BigDecimalColumn((Long) val);
99+
};
100+
case DECIMAL:
101+
return val -> {
102+
if (val instanceof BigInteger) {
103+
return new BigDecimalColumn((BigInteger) val);
104+
}
105+
return new BigDecimalColumn((BigDecimal) val);
106+
};
107+
case CHAR:
108+
case VARCHAR:
109+
return val -> new StringColumn((String) val);
110+
case DATE:
111+
return val -> new SqlDateColumn((Date) val);
112+
case TIME_WITHOUT_TIME_ZONE:
113+
return val -> new TimeColumn((Time) val);
114+
case TIMESTAMP_WITH_TIME_ZONE:
115+
case TIMESTAMP_WITHOUT_TIME_ZONE:
116+
return (IDeserializationConverter<Object, AbstractBaseColumn>)
117+
val ->
118+
new TimestampColumn(
119+
(Timestamp) val, ((TimestampType) (type)).getPrecision());
120+
121+
case BINARY:
122+
case VARBINARY:
123+
return val -> new BytesColumn((byte[]) val);
124+
default:
125+
throw new UnsupportedOperationException("Unsupported type:" + type);
126+
}
127+
}
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.sybase.converter;
20+
21+
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
22+
23+
import org.apache.flink.table.api.DataTypes;
24+
import org.apache.flink.table.types.DataType;
25+
26+
import java.util.Locale;
27+
28+
/** @Author OT @Date 2022/6/13 17:59 @Version 1.0 */
29+
public class SybaseRawTypeConverter {
30+
public static DataType apply(String type) {
31+
switch (type.toUpperCase(Locale.ENGLISH)) {
32+
case "BIGINT":
33+
case "UNSIGNED INT":
34+
return DataTypes.BIGINT();
35+
case "INT":
36+
case "INTEGER":
37+
case "UNSIGNED SMALLINT":
38+
return DataTypes.INT();
39+
case "SMALLINT":
40+
return DataTypes.SMALLINT();
41+
case "TINYINT":
42+
return DataTypes.TINYINT();
43+
case "UNSIGNED BIGINT":
44+
return DataTypes.DECIMAL(20, 0);
45+
case "NUMERIC":
46+
case "DECIMAL":
47+
return DataTypes.DECIMAL(38, 18);
48+
case "NUMERIC IDENTITY":
49+
return DataTypes.DECIMAL(38, 0);
50+
case "FLOAT":
51+
case "REAL":
52+
return DataTypes.FLOAT();
53+
case "DOUBLE":
54+
return DataTypes.DOUBLE();
55+
case "SMALLMONEY":
56+
return DataTypes.DECIMAL(10, 4);
57+
case "MONEY":
58+
return DataTypes.DECIMAL(19, 4);
59+
case "SMALLDATETIME":
60+
return DataTypes.TIMESTAMP(0);
61+
case "DATETIME":
62+
case "BIGDATETIME":
63+
return DataTypes.TIMESTAMP(3);
64+
case "DATE":
65+
return DataTypes.DATE();
66+
case "TIME":
67+
case "BIGTIME":
68+
return DataTypes.TIME();
69+
case "CHAR":
70+
case "VARCHAR":
71+
case "UNICHAR":
72+
case "UNIVARCHAR":
73+
case "NCHAR":
74+
case "NVARCHAR":
75+
case "TEXT":
76+
case "UNITEXT":
77+
case "LONGSYSNAME":
78+
case "STRING":
79+
return DataTypes.STRING();
80+
case "BINARY":
81+
case "TIMESTAMP":
82+
case "VARBINARY":
83+
case "IMAGE":
84+
return DataTypes.BYTES();
85+
case "BIT":
86+
return DataTypes.BOOLEAN();
87+
default:
88+
throw new UnsupportedTypeException(type);
89+
}
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.sybase.dialect;
20+
21+
import com.dtstack.chunjun.conf.ChunJunCommonConf;
22+
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
23+
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputSplit;
24+
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
25+
import com.dtstack.chunjun.connector.sybase.converter.SybaseColumnConverter;
26+
import com.dtstack.chunjun.connector.sybase.converter.SybaseRawTypeConverter;
27+
import com.dtstack.chunjun.converter.AbstractRowConverter;
28+
import com.dtstack.chunjun.converter.RawTypeConverter;
29+
30+
import org.apache.flink.table.types.logical.LogicalType;
31+
import org.apache.flink.table.types.logical.RowType;
32+
33+
import io.vertx.core.json.JsonArray;
34+
35+
import java.sql.ResultSet;
36+
import java.util.Optional;
37+
38+
/** @Author OT @Date 2022/6/16 13:54 @Version 1.0 */
39+
public class SybaseDialect implements JdbcDialect {
40+
private static final String DIALECT_NAME = "Sybase";
41+
private static final String DRIVER_NAME = "net.sourceforge.jtds.jdbc.Driver";
42+
43+
@Override
44+
public String dialectName() {
45+
return DIALECT_NAME;
46+
}
47+
48+
@Override
49+
public boolean canHandle(String url) {
50+
return url.startsWith("jdbc:jtds:sybase:");
51+
}
52+
53+
@Override
54+
public RawTypeConverter getRawTypeConverter() {
55+
return SybaseRawTypeConverter::apply;
56+
}
57+
58+
@Override
59+
public Optional<String> defaultDriverName() {
60+
return Optional.of(DRIVER_NAME);
61+
}
62+
63+
@Override
64+
public String quoteIdentifier(String identifier) {
65+
return "\"" + identifier + "\"";
66+
}
67+
68+
@Override
69+
public AbstractRowConverter<ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType>
70+
getColumnConverter(RowType rowType, ChunJunCommonConf commonConf) {
71+
return new SybaseColumnConverter(rowType, commonConf);
72+
}
73+
74+
@Override
75+
public String getSplitModFilter(JdbcInputSplit split, String splitPkName) {
76+
return String.format(
77+
"%s %% %s = %s",
78+
quoteIdentifier(splitPkName), split.getTotalNumberOfSplits(), split.getMod());
79+
}
80+
}

0 commit comments

Comments
 (0)