-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFlattenTuple.java
More file actions
73 lines (59 loc) · 2.07 KB
/
FlattenTuple.java
File metadata and controls
73 lines (59 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.linkedin.beam.transform;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import java.util.ArrayList;
import java.util.List;
/**
* In case we need this in the future.
*/
public class FlattenTuple extends PTransform<PCollectionTuple, PCollection<CoGbkResult>> {
public static FlattenTuple create() {
return new FlattenTuple();
}
private FlattenTuple() { }
@Override
public PCollection<CoGbkResult> expand(PCollectionTuple input) {
// First build the union coder.
// TODO: Look at better integration of union types with the
// schema specified in the input.
List<Coder<?>> codersList = new ArrayList<>();
for (PCollection<?> pCollection: input.getAll().values()) {
codersList.add(pCollection.getCoder());
}
UnionCoder unionCoder = UnionCoder.of(codersList);
PCollectionList<RawUnionValue> unionTables =
PCollectionList.empty(input.getPipeline());
return null;
}
private <V> PCollection<RawUnionValue> makeUnionTable(
final int index,
PCollection<V> pCollection,
Coder<RawUnionValue> unionTableEncoder) {
return pCollection
.apply("MakeUnionTable" + index, ParDo.of(new ConstructUnionTableFn<>(index)))
.setCoder(unionTableEncoder);
}
/**
* A DoFn to construct a UnionTable.
*/
private static class ConstructUnionTableFn<V> extends
DoFn<V, RawUnionValue> {
private final int index;
public ConstructUnionTableFn(int index) {
this.index = index;
}
@ProcessElement
public void processElement(ProcessContext c) {
V v = c.element();
c.output(new RawUnionValue(index, v));
}
}
}