Skip to content

Commit 7375fba

Browse files
author
Nikita Sokolov
committed
GeneratorFrontType#nodePartition
1 parent 5f152a3 commit 7375fba

2 files changed

Lines changed: 9 additions & 10 deletions

File tree

examples/src/main/java/com/spbsu/flamestream/example/nexmark/GeneratorFrontType.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.github.nexmark.flink.generator.GeneratorConfig;
55
import com.github.nexmark.flink.generator.NexmarkGenerator;
66
import com.spbsu.flamestream.core.data.PayloadDataItem;
7-
import com.spbsu.flamestream.core.data.meta.EdgeId;
87
import com.spbsu.flamestream.core.data.meta.GlobalTime;
98
import com.spbsu.flamestream.core.data.meta.Meta;
109
import com.spbsu.flamestream.runtime.FlameRuntime;
@@ -20,18 +19,18 @@
2019

2120
public class GeneratorFrontType implements FlameRuntime.FrontType<GeneratorFrontType.Front, Void> {
2221
private final NexmarkConfiguration nexmarkConfiguration;
23-
private final Map<EdgeId, Integer> edgePartition;
22+
private final Map<String, Integer> nodePartition;
2423
private final long baseTime;
2524
private final long maxEvents;
2625

2726
public GeneratorFrontType(
2827
NexmarkConfiguration nexmarkConfiguration,
29-
Map<EdgeId, Integer> edgePartition,
28+
Map<String, Integer> nodePartition,
3029
long baseTime,
3130
long maxEvents
3231
) {
3332
this.nexmarkConfiguration = nexmarkConfiguration;
34-
this.edgePartition = edgePartition;
33+
this.nodePartition = nodePartition;
3534
this.baseTime = baseTime;
3635
this.maxEvents = maxEvents;
3736
}
@@ -57,7 +56,7 @@ public static class Front implements com.spbsu.flamestream.runtime.edge.Front {
5756
public Front(EdgeContext edgeContext, GeneratorFrontType type) {
5857
this.edgeContext = edgeContext;
5958
this.type = type;
60-
partition = type.edgePartition.get(edgeContext.edgeId());
59+
partition = type.nodePartition.get(edgeContext.edgeId().nodeId());
6160
}
6261

6362
@Override
@@ -73,7 +72,7 @@ public void onStart(Consumer<Object> consumer, GlobalTime from) {
7372
1,
7473
type.maxEvents,
7574
1
76-
).split(type.edgePartition.size()).get(partition);
75+
).split(type.nodePartition.size()).get(partition);
7776
final var executor =
7877
Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, edgeContext.edgeId().toString()));
7978
executor.submit(() -> {

examples/src/test/java/com/spbsu/flamestream/example/nexmark/Query8Test.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ public void test() throws Exception {
3939
flame.attachRear("rear", new TimingsRearType<>(new SimpleRearType(), 10)).collect(Collectors.toList());
4040

4141
flame.attachFront("front", new GeneratorFrontType(nexmarkConfiguration, Map.ofEntries(
42-
Map.entry(new EdgeId("front", "node-0"), 0),
43-
Map.entry(new EdgeId("front", "node-1"), 1),
44-
Map.entry(new EdgeId("front", "node-2"), 2),
45-
Map.entry(new EdgeId("front", "node-3"), 3)
42+
Map.entry("node-0", 0),
43+
Map.entry("node-1", 1),
44+
Map.entry("node-2", 2),
45+
Map.entry("node-3", 3)
4646
), currentTimeMillis, 10000));
4747
for (final var rear : rears) {
4848
rear.await();

0 commit comments

Comments
 (0)