Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ Official website of ChunJun: https://dtstack.github.io/chunjun/
ChunJun abstracts different databases into reader/source plugins, writer/sink plugins and lookup plugins, and it has the following features:

- Based on the real-time computing engine--Flink, and supports JSON template and SQL script configuration tasks. The SQL script is compatible with Flink SQL syntax;
- Support distributed operation, support flink-standalone, yarn-session, yarn-per job and other submission methods;
- Support Docker one-click deployment, support deploy and run on k8s;
- Supports distributed operation, support flink-standalone, yarn-session, yarn-per job and other submission methods;
- Supports Docker one-click deployment, support deploy and run on k8s;
- Supports a variety of heterogeneous data sources, and supports synchronization and calculation of more than 20 data sources such as MySQL, Oracle, SQLServer, Hive, Kudu, etc.
- Easy to expand, highly flexible, newly expanded data source plugins can integrate with existing data source plugins instantly, plugin developers do not need to care about the code logic of other plugins;
- Not only supports full synchronization, but also supports incremental synchronization and interval training;
- Not only supports offline synchronization and calculation, but also compatible with real-time scenarios;
- Support dirty data storage, and provide indicator monitoring, etc.;
- Supports dirty data storage, and provide indicator monitoring, etc.;
- Cooperate with the flink checkpoint mechanism to achieve breakpoint resuming, task disaster recovery;
- Not only supports synchronizing DML data, but also supports DDL synchronization, like 'CREATE TABLE', 'ALTER COLUMN', etc.;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.converter.ISerializationConverter;
import com.dtstack.chunjun.util.ExternalDataUtil;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

Expand All @@ -45,6 +52,7 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -68,15 +76,15 @@ public class ElasticsearchRowConverter

public ElasticsearchRowConverter(RowType rowType) {
super(rowType);
List<String> fieldNames = rowType.getFieldNames();
for (int i = 0; i < rowType.getFieldCount(); i++) {
List<RowType.RowField> fields = rowType.getFields();

for (int i = 0; i < fields.size(); i++) {
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i))));
wrapIntoNullableInternalConverter(createInternalConverter(fields.get(i))));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldTypes[i]), fieldTypes[i]));
typeIndexList.add(new Tuple3<>(fieldNames.get(i), i, rowType.getTypeAt(i)));
createExternalConverter(fields.get(i)), fields.get(i).getType()));
typeIndexList.add(new Tuple3<>(fields.get(i).getName(), i, fields.get(i).getType()));
}
}

Expand Down Expand Up @@ -135,9 +143,9 @@ public Map<String, Object> toExternal(RowData rowData, Map<String, Object> outpu
return output;
}

@Override
protected ISerializationConverter<Map<String, Object>> createExternalConverter(
LogicalType type) {
RowType.RowField rowField) {
LogicalType type = rowField.getType();
switch (type.getTypeRoot()) {
case TINYINT:
return (val, index, output) ->
Expand Down Expand Up @@ -203,69 +211,183 @@ protected ISerializationConverter<Map<String, Object>> createExternalConverter(
throw new RuntimeException("Converter error.", e);
}
};
case ROW:
return (val, index, output) -> {
List<RowType.RowField> fields = ((RowType) type).getFields();
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < fields.size(); i++) {
ExternalDataUtil.rowDataToExternal(
val.getRow(index, fields.size()),
i,
fields.get(i).getType(),
map,
fields.get(i).getName());
}
output.put(typeIndexList.get(index)._1(), map);
};

case ARRAY:
return (val, index, output) -> {
ArrayData array = val.getArray(index);
Object[] obj = new Object[array.size()];
ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, array);
output.put(typeIndexList.get(index)._1(), obj);
};
case MAP:
return (val, index, output) -> {
MapData map = val.getMap(index);
Map<Object, Object> resultMap = new HashMap<>();
ExternalDataUtil.mapDataToExternal(
map,
((MapType) type).getKeyType(),
((MapType) type).getValueType(),
resultMap);
output.put(typeIndexList.get(index)._1(), resultMap);
};
case MULTISET:
return (val, index, output) -> {
MapData map = val.getMap(index);
ArrayData arrayData = map.keyArray();
Object[] obj = new Object[arrayData.size()];
ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, arrayData);
output.put(typeIndexList.get(index)._1(), obj);
};
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}

@Override
protected IDeserializationConverter createInternalConverter(LogicalType type) {

protected IDeserializationConverter createInternalConverter(RowType.RowField rowField) {
LogicalType type = rowField.getType();
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
return val -> new Boolean(String.valueOf(val));
return val -> val == null ? null : new Boolean(String.valueOf(val));
case FLOAT:
return val -> new Float(String.valueOf(val));
return val -> val == null ? null : new Float(String.valueOf(val));
case DOUBLE:
return val -> new Double(String.valueOf(val));
return val -> val == null ? null : new Double(String.valueOf(val));
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return val -> Time.valueOf(String.valueOf(val));
return val -> val == null ? null : Time.valueOf(String.valueOf(val));
case INTEGER:
return val -> new Integer(String.valueOf(val));
return val -> val == null ? null : new Integer(String.valueOf(val));
case BIGINT:
return val -> new Long(String.valueOf(val));
return val -> val == null ? null : new Long(String.valueOf(val));
case TINYINT:
return val -> new Integer(String.valueOf(val)).byteValue();
return val -> val == null ? null : new Integer(String.valueOf(val)).byteValue();
case SMALLINT:
// Converter for small type that casts value to int and then return short value,
// since
// JDBC 1.0 use int type for small values.
return val -> new Integer(String.valueOf(val)).shortValue();
return val -> val == null ? null : new Integer(String.valueOf(val)).shortValue();
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
// using decimal(20, 0) to support db type bigint unsigned, user should define
// decimal(20, 0) in SQL,
// but other precision like decimal(30, 0) can work too from lenient consideration.
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal(
new BigDecimal(String.valueOf(val)), precision, scale);
val == null
? null
: val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0),
precision,
scale)
: DecimalData.fromBigDecimal(
new BigDecimal(String.valueOf(val)),
precision,
scale);
case DATE:
return val ->
(int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay());
val == null
? null
: (int)
((Date.valueOf(String.valueOf(val)))
.toLocalDate()
.toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return val ->
(int)
((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay()
/ 1_000_000L);
val == null
? null
: (int)
((Time.valueOf(String.valueOf(val)))
.toLocalTime()
.toNanoOfDay()
/ 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> TimestampData.fromTimestamp(Timestamp.valueOf(String.valueOf(val)));
return val ->
val == null
? null
: TimestampData.fromTimestamp(
Timestamp.valueOf(String.valueOf(val)));
case CHAR:
case VARCHAR:
return val -> StringData.fromString(val.toString());
return val -> val == null ? null : StringData.fromString(val.toString());
case BINARY:
case VARBINARY:
return val -> (byte[]) val;
case ARRAY:
return val -> val == null ? null : (byte[]) val;
case ROW:
return val -> {
List<RowType.RowField> childrenFields = ((RowType) type).getFields();
Map<String, Object> val1 = (Map<String, Object>) val;
GenericRowData genericRowData = new GenericRowData(childrenFields.size());
for (int i = 0; i < childrenFields.size(); i++) {
if (val1.get(childrenFields.get(i).getName()) == null) {
genericRowData.setField(i, null);
} else {
Object value =
createInternalConverter(childrenFields.get(i))
.deserialize(val1.get(childrenFields.get(i).getName()));
genericRowData.setField(i, value);
}
}
return genericRowData;
};
case ARRAY:
return (val) -> {
ArrayList<Object> list = (ArrayList<Object>) val;
Object[] result = new Object[list.size()];
LogicalType logicalType = type.getChildren().get(0);
RowType.RowField rowField1 = new RowType.RowField("", logicalType, "");
IDeserializationConverter internalConverter =
createInternalConverter(rowField1);
for (int i = 0; i < list.size(); i++) {
if (list.get(i) == null) {
result[i] = null;
} else {
Object value = internalConverter.deserialize(list.get(i));
result[i] = value;
}
}
return new GenericArrayData(result);
};
case MAP:
return val -> {
if (val == null) {
return null;
}
HashMap<Object, Object> resultMap = new HashMap<>();
Map map = GsonUtil.GSON.fromJson(val.toString(), Map.class);
LogicalType keyType = ((MapType) type).getKeyType();
LogicalType valueType = ((MapType) type).getValueType();
RowType.RowField keyRowField = new RowType.RowField("", keyType, "");
RowType.RowField valueRowField = new RowType.RowField("", valueType, "");
IDeserializationConverter keyInternalConverter =
createInternalConverter(keyRowField);
IDeserializationConverter valueInternalConverter =
createInternalConverter(valueRowField);
for (Object key : map.keySet()) {
resultMap.put(
keyInternalConverter.deserialize(key),
valueInternalConverter.deserialize(map.get(key)));
}

return new GenericMapData(resultMap);
};
case MULTISET:
case RAW:
default:
Expand Down
Loading