-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGroupBy.h
More file actions
65 lines (58 loc) · 1.96 KB
/
GroupBy.h
File metadata and controls
65 lines (58 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#pragma once
#include <memory>
#include <functional>
#include <unordered_set>
template<typename DataT, typename KeyT>
class GroupByEmitter : public Emitter<std::pair<KeyT, DataStream<DataT> > > {
public:
GroupByEmitter(const Emitter<DataT>* emitter,
std::function<const KeyT( const DataT )> filter):
emitter(std::move(emitter->Copy())),
filter(filter),
ended(false)
{
}
GroupByEmitter(const GroupByEmitter& other):
emitter(std::move(other.emitter->CopyState())),
filter(other.filter),
ended(other.ended)
{
}
virtual const std::pair<KeyT, DataStream<DataT> > EmitNext() override {
KeyT key = filter(emitter->EmitNext());
for(;foundKeys.find(key) != foundKeys.end();
key = filter(emitter->EmitNext()))
{
}
foundKeys.insert({key});
return {key, std::make_unique<const WhereEmitter<DataT>>(emitter.get(),
[f=filter, key](const DataT t){ return f(t) == key; }).get()};
}
virtual bool Ended() override {
if(ended) {
return true;
}
while(!emitter->Ended()) {
if(foundKeys.find(filter(emitter->EmitNext())) == foundKeys.end()) {
emitter->Reset();
return false;
}
}
emitter->Reset();
ended = true;
return true;
}
virtual void Reset() override {
emitter->Reset();
foundKeys.clear();
}
virtual std::unique_ptr<Emitter<std::pair<KeyT, DataStream<DataT> > >> Copy() const
{ return std::make_unique<GroupByEmitter>(emitter.get(), filter); }
virtual std::unique_ptr<Emitter<std::pair<KeyT, DataStream<DataT> > >> CopyState() const
{ return std::make_unique<GroupByEmitter>(*this); }
private:
std::unique_ptr< Emitter<DataT> > emitter;
std::function<const KeyT( const DataT )> filter;
std::unordered_set<KeyT > foundKeys;
bool ended;
};