Skip to content

Commit 0deed20

Browse files
committed
[feat][stream] Add Stream Connector, print rowData RowKind in first Column.
1 parent 1cf0860 commit 0deed20

2 files changed

Lines changed: 13 additions & 5 deletions

File tree

flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/sink/StreamOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
5555
RowData row =
5656
(RowData)
5757
rowConverter.toExternal(
58-
rowData, new GenericRowData(rowData.getArity()));
58+
rowData,
59+
new GenericRowData(rowData.getRowKind(), rowData.getArity()));
5960
if (streamConf.getPrint()) {
6061
TablePrintUtil.printTable(row, getFieldNames(rowData));
6162
}

flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/util/TablePrintUtil.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,25 @@ public static void printTable(RowData row, String[] fieldNames) {
167167
}
168168

169169
List<String[]> data = new ArrayList<>(2);
170+
String[] recordStr = new String[genericRowData.getArity() + 1];
171+
recordStr[0] = row.getRowKind().toString();
170172
boolean emptyFieldNames = false;
171173
if (fieldNames == null) {
172-
fieldNames = new String[genericRowData.getArity()];
174+
fieldNames = new String[genericRowData.getArity() + 1];
173175
emptyFieldNames = true;
176+
} else {
177+
String[] newFieldNames = new String[genericRowData.getArity() + 1];
178+
System.arraycopy(fieldNames, 0, newFieldNames, 1, genericRowData.getArity());
179+
fieldNames = newFieldNames;
174180
}
175-
String[] recordStr = new String[genericRowData.getArity()];
181+
fieldNames[0] = "rowKind";
176182
for (int i = 0; i < genericRowData.getArity(); i++) {
177183
if (emptyFieldNames) {
178-
fieldNames[i] = "col" + i;
184+
fieldNames[i + 1] = "col" + i;
179185
}
180-
recordStr[i] = String.valueOf(genericRowData.getField(i));
186+
recordStr[i + 1] = String.valueOf(genericRowData.getField(i));
181187
}
188+
182189
data.add(fieldNames);
183190
data.add(recordStr);
184191
TablePrintUtil.build(data).print();

0 commit comments

Comments
 (0)