Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.dtstack.chunjun.element.column.BooleanColumn;
import com.dtstack.chunjun.element.column.ByteColumn;
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;
Expand Down Expand Up @@ -132,8 +133,7 @@ protected IDeserializationConverter createInternalConverter(String type) {
case "BOOLEAN":
return (IDeserializationConverter<Boolean, AbstractBaseColumn>) BooleanColumn::new;
case "TINYINT":
return (IDeserializationConverter<Byte, AbstractBaseColumn>)
val -> new BigDecimalColumn(val.toString());
return (IDeserializationConverter<Byte, AbstractBaseColumn>) ByteColumn::new;
case "SMALLINT":
return (IDeserializationConverter<Short, AbstractBaseColumn>)
val -> new BigDecimalColumn(val.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.dtstack.chunjun.element.column.BooleanColumn;
import com.dtstack.chunjun.element.column.ByteColumn;
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;
Expand Down Expand Up @@ -135,6 +136,7 @@ protected IDeserializationConverter createInternalConverter(String type) {
case "BOOLEAN":
return (IDeserializationConverter<Boolean, AbstractBaseColumn>) BooleanColumn::new;
case "TINYINT":
return (IDeserializationConverter<Byte, AbstractBaseColumn>) ByteColumn::new;
case "SMALLINT":
case "INT":
return (IDeserializationConverter<Integer, AbstractBaseColumn>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.dtstack.chunjun.element.column.BooleanColumn;
import com.dtstack.chunjun.element.column.ByteColumn;
import com.dtstack.chunjun.element.column.SqlDateColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;
Expand Down Expand Up @@ -122,6 +123,7 @@ protected IDeserializationConverter createInternalConverter(String type) {
return (IDeserializationConverter<String, AbstractBaseColumn>)
val -> new BooleanColumn(Boolean.parseBoolean(val));
case "TINYINT":
return (IDeserializationConverter<Byte, AbstractBaseColumn>) ByteColumn::new;
case "SMALLINT":
case "INT":
case "BIGINT":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ public HttpSourceFactory(SyncConf config, StreamExecutionEnvironment env) {
|| org.apache.commons.lang3.StringUtils.isBlank(
syncConf.getTransformer().getTransformSql())) {
typeInformation =
TableUtil.getTypeInformation(Collections.emptyList(), getRawTypeConverter());
TableUtil.getTypeInformation(
Collections.emptyList(), getRawTypeConverter(), true);
} else {
typeInformation =
TableUtil.getTypeInformation(
subColumns(httpRestConfig.getColumn()), getRawTypeConverter());
subColumns(httpRestConfig.getColumn()), getRawTypeConverter(), false);
useAbstractBaseColumn = false;
}
super.initCommonConf(httpRestConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
inceptorFileConf.getColumn(),
getRawTypeConverter());

builder.setRowConverter(rowConverter);
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
return createOutput(dataSet, builder.finish());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
TableUtil.createRowType(jdbcConf.getColumn(), getRawTypeConverter());
rowConverter = jdbcDialect.getRowConverter(rowType);
}
builder.setRowConverter(rowConverter);
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
if (builder instanceof InceptorHdfsOutputFormatBuilder) {
// InceptorHdfsOutputFormatBuilder 只有实时任务或者离线任务的事务表才会调用 所以此处设置为true,其余的orc text
// parquet通过文件方式写入
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public DataStream<RowData> createSource() {
inceptorFileConf.getColumn(),
getRawTypeConverter());

builder.setRowConverter(rowConverter);
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
return createInput(builder.finish());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ protected JdbcInputFormatBuilder getInputFormatBuilder() {
return null;
}

@Override
protected void validateConfigOptions(ReadableConfig config) {
String jdbcUrl = config.get(URL);
checkState(true, "Cannot handle such jdbc url: " + jdbcUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,6 @@ protected List<JdbcInputSplit> createRangeSplits(
protected void executeQuery(String startLocation) throws SQLException {
if (currentJdbcInputSplit.isPolling()) {
if (StringUtils.isBlank(startLocation)) {
// Get the startLocation from the database
queryPollingWithOutStartLocation();
// Concatenated sql statement for next polling query
StringBuilder builder = new StringBuilder(128);
Expand Down Expand Up @@ -889,7 +888,7 @@ public void initPrepareStatement(String querySql) throws SQLException {
ps.setQueryTimeout(jdbcConf.getQueryTimeOut());
}
/**
* 间隔轮询查询起始位置
* polling mode first query when startLocation is not set
*
* @throws SQLException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public static String buildSplitFilterSql(
sql = jdbcDialect.getSplitModFilter(jdbcInputSplit, splitColumn);
}
if (jdbcInputSplit.getSplitNumber() == 0) {
sql += " OR " + jdbcDialect.quoteIdentifier(splitColumn) + " IS NULL";
sql = "(" + sql + " OR " + jdbcDialect.quoteIdentifier(splitColumn) + " IS NULL)";
}
return sql;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import com.dtstack.chunjun.converter.ISerializationConverter;
import com.dtstack.chunjun.element.AbstractBaseColumn;
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.element.column.ArrayColumn;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.dtstack.chunjun.element.column.BooleanColumn;
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.SqlArrayColumn;
import com.dtstack.chunjun.element.column.SqlDateColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimeColumn;
Expand Down Expand Up @@ -89,7 +89,7 @@ public FieldNamedPreparedStatement toExternal(
String field = ((ColumnRowData) rowData).getField(index).asString();
Array array =
new PgArray(connection, arrayType.get(fieldTypeList.get(index)), field);
AbstractBaseColumn arrayColumn = new ArrayColumn(array);
AbstractBaseColumn arrayColumn = new SqlArrayColumn(array);
((ColumnRowData) rowData).setField(index, arrayColumn);
}
toExternalConverters.get(index).serialize(rowData, index, statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.dtstack.chunjun.connector.postgresql.source;

import com.dtstack.chunjun.conf.SyncConf;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect;

Expand All @@ -37,9 +36,4 @@ public class PostgresqlSourceFactory extends JdbcSourceFactory {
public PostgresqlSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) {
super(syncConf, env, new PostgresqlDialect());
}

@Override
protected JdbcInputFormatBuilder getBuilder() {
return new JdbcInputFormatBuilder(new PostgresqlInputFormat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package com.dtstack.chunjun.connector.postgresql.table;

import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory;
import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect;
import com.dtstack.chunjun.connector.postgresql.source.PostgresqlInputFormat;

/**
* @program chunjun
Expand All @@ -42,9 +40,4 @@ public String factoryIdentifier() {
protected JdbcDialect getDialect() {
return new PostgresqlDialect();
}

@Override
protected JdbcInputFormatBuilder getInputFormatBuilder() {
return new JdbcInputFormatBuilder(new PostgresqlInputFormat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
TableUtil.createRowType(jdbcConf.getColumn(), getRawTypeConverter());
rowConverter = jdbcDialect.getRowConverter(rowType);
typeInformation =
TableUtil.getTypeInformation(Collections.emptyList(), getRawTypeConverter());
TableUtil.getTypeInformation(
Collections.emptyList(), getRawTypeConverter(), false);
} else {
List<FieldConf> fieldList = syncConf.getWriter().getFieldList();
typeInformation = TableUtil.getTypeInformation(fieldList, getRawTypeConverter());
typeInformation = TableUtil.getTypeInformation(fieldList, getRawTypeConverter(), true);
}
builder.setRowConverter(rowConverter, useAbstractBaseColumn);

Expand Down
2 changes: 1 addition & 1 deletion chunjun-core/src/main/java/com/dtstack/chunjun/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private static DataStream<RowData> syncStreamToTable(

DataType[] tableDataTypes = adaptTable.getSchema().getFieldDataTypes();
String[] tableFieldNames = adaptTable.getSchema().getFieldNames();
TypeInformation<RowData> typeInformation =
TypeInformation<? extends RowData> typeInformation =
TableUtil.getTypeInformation(tableDataTypes, tableFieldNames);
DataStream<RowData> dataStream =
tableEnv.toRetractStream(adaptTable, typeInformation).map(f -> f.f1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public ColumnRowData(RowKind kind, int arity) {
byteSize = 1;
}

public ColumnRowData(RowKind kind, int arity, int byteSize) {
this.columnList = new ArrayList<>(arity);
this.kind = kind;
this.byteSize = byteSize;
}

public ColumnRowData(int arity) {
this(RowKind.INSERT, arity);
}
Expand All @@ -78,6 +84,10 @@ public void addHeader(String name) {
byteSize += getStringSize(name);
}

public void setHeader(Map<String, Integer> header) {
this.header = header;
}

public void replaceHeader(String original, String another) {
if (this.header == null || !this.header.containsKey(original)) {
addHeader(another);
Expand Down Expand Up @@ -143,6 +153,10 @@ public void addField(AbstractBaseColumn value) {
}
}

public void addFieldWithOutByteSize(AbstractBaseColumn value) {
this.columnList.add(value);
}

public void addAllField(List<AbstractBaseColumn> list) {
for (AbstractBaseColumn column : list) {
addField(column);
Expand Down Expand Up @@ -274,7 +288,7 @@ public RowData getRow(int pos, int numFields) {
return null;
}

public long getByteSize() {
public int getByteSize() {
return byteSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ public BigDecimalColumn(BigDecimal bigDecimal, String data) {
super(bigDecimal, getStringSize(data));
}

public BigDecimalColumn(BigDecimal data, int byteSize) {
super(data, byteSize);
}

public static BigDecimalColumn from(BigDecimal data) {
return new BigDecimalColumn(data, 0);
}

@Override
public String asString() {
if (null == data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public BooleanColumn(boolean data) {
super(data, 1);
}

public BooleanColumn(boolean data, int byteSize) {
super(data, byteSize);
}

public static BooleanColumn from(boolean data) {
return new BooleanColumn(data, 0);
}

@Override
public Boolean asBoolean() {
if (null == data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ public ByteColumn(byte data) {
super(data, 1);
}

public ByteColumn(byte data, int byteSize) {
super(data, byteSize);
}

public static ByteColumn from(byte data) {
return new ByteColumn(data, 0);
}

public ByteColumn(char data) {
super(data, 1);
}
Expand All @@ -56,7 +64,7 @@ public String asString() {

@Override
public BigDecimal asBigDecimal() {
throw new CastException("byte", "BigDecimal", String.valueOf(data));
return new BigDecimal((byte) data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public BytesColumn(byte[] data, String encoding) {
this.encoding = encoding;
}

public BytesColumn(byte[] data, int byteSize) {
super(data, byteSize);
}

public static BytesColumn from(byte[] data) {
return new BytesColumn(data, 0);
}

@Override
public Boolean asBoolean() {
if (null == data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public MapColumn(Map<String, Object> data) {
}
}

public MapColumn(Map<String, Object> data, int byteSize) {
super(data, byteSize);
}

public static MapColumn from(Map<String, Object> data) {
return new MapColumn(data, 0);
}

@Override
public String asString() {
if (null == data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
* @author shitou
* @date 2022/4/15
*/
public class ArrayColumn extends AbstractBaseColumn {
public class SqlArrayColumn extends AbstractBaseColumn {

private static final long serialVersionUID = 1L;
protected Array data;

public ArrayColumn(final Array data) {
public SqlArrayColumn(final Array data) {
super(data, data.toString().length());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ public SqlDateColumn(Date data) {
super(data, 8);
}

public SqlDateColumn(Date data, int byteSize) {
super(data, 0);
}

public SqlDateColumn(long data) {
super(Date.valueOf(LocalDate.ofEpochDay(data)), 8);
}

public static SqlDateColumn from(Date date) {
return new SqlDateColumn(date, 0);
}

@Override
public Boolean asBoolean() {
if (null == data) {
Expand Down
Loading