Skip to content

Commit 74fc7b1

Browse files
committed
TopK optimization to not keep all values, instead have max of k values per window
1 parent f409a32 commit 74fc7b1

9 files changed

Lines changed: 597 additions & 15 deletions

File tree

Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ protected SortedMultisetAggregateBase(IComparerExpression<T> comparer, QueryCont
1717
Expression<Func<Func<SortedDictionary<T, long>>, SortedMultiSet<T>>> template
1818
= (g) => new SortedMultiSet<T>(g);
1919
var replaced = template.ReplaceParametersInBody(generator);
20-
initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
20+
this.initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
2121
}
2222

2323
private readonly Expression<Func<SortedMultiSet<T>>> initialState;

Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,37 @@
66
using System.Collections.Generic;
77
using System.Diagnostics.Contracts;
88
using System.Linq.Expressions;
9+
using Microsoft.StreamProcessing.Internal;
910

1011
namespace Microsoft.StreamProcessing.Aggregates
1112
{
12-
internal sealed class TopKAggregate<T> : SortedMultisetAggregateBase<T, List<RankedEvent<T>>>
13+
internal sealed class TopKAggregate<T> : IAggregate<T, ITopKState<T>, List<RankedEvent<T>>>
1314
{
1415
private readonly Comparison<T> compiledRankComparer;
1516
private readonly int k;
1617

17-
public TopKAggregate(int k, QueryContainer container) : this(k, ComparerExpression<T>.Default, container) { }
18+
public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container, long hoppingWindowSize)
19+
: this(k, rankComparer, ComparerExpression<T>.Default, container, hoppingWindowSize) { }
1820

19-
public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container)
20-
: this(k, rankComparer, ComparerExpression<T>.Default, container) { }
21-
22-
public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer, QueryContainer container)
23-
: base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
21+
public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer,
22+
QueryContainer container, long hoppingWindowSize)
2423
{
2524
Contract.Requires(rankComparer != null);
2625
Contract.Requires(overallComparer != null);
2726
Contract.Requires(k > 0);
2827
this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
2928
this.k = k;
29+
30+
Expression<Func<Func<SortedDictionary<T, long>>, ITopKState<T>>> template;
31+
if (hoppingWindowSize > 0 && hoppingWindowSize < 1000000)
32+
template = (g) => new HoppingTopKState<T>(k, compiledRankComparer, (int)hoppingWindowSize, g);
33+
else
34+
template = (g) => new SimpleTopKState<T>(g);
35+
36+
var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer);
37+
var generator = combinedComparer.CreateSortedDictionaryGenerator<T, long>(container);
38+
var replaced = template.ReplaceParametersInBody(generator);
39+
this.initialState = Expression.Lambda<Func<ITopKState<T>>>(replaced);
3040
}
3141

3242
private static IComparerExpression<T> Reverse(IComparerExpression<T> comparer)
@@ -53,10 +63,11 @@ private static IComparerExpression<T> ThenOrderBy(IComparerExpression<T> compare
5363
return new ComparerExpression<T>(newExpression);
5464
}
5565

56-
public override Expression<Func<SortedMultiSet<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);
66+
public Expression<Func<ITopKState<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);
5767

58-
private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)
68+
private List<RankedEvent<T>> GetTopK(ITopKState<T> state)
5969
{
70+
var set = state.GetSortedValues();
6071
int count = (int)Math.Min(this.k, set.TotalCount);
6172
var result = new List<RankedEvent<T>>(count);
6273
int nextRank = 1;
@@ -82,5 +93,20 @@ private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)
8293

8394
return result;
8495
}
96+
97+
private readonly Expression<Func<ITopKState<T>>> initialState;
98+
public Expression<Func<ITopKState<T>>> InitialState() => initialState;
99+
100+
private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> acc
101+
= (state, timestamp, input) => state.Add(input, timestamp);
102+
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Accumulate() => acc;
103+
104+
private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> dec
105+
= (state, timestamp, input) => state.Remove(input, timestamp);
106+
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Deaccumulate() => dec;
107+
108+
private static readonly Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> diff
109+
= (leftState, rightState) => leftState.RemoveAll(rightState);
110+
public Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> Difference() => diff;
85111
}
86112
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
5+
namespace Microsoft.StreamProcessing.Aggregates
6+
{
7+
/// <summary>
8+
/// State used by TopK Aggregate
9+
/// </summary>
10+
/// <typeparam name="T"></typeparam>
11+
public interface ITopKState<T>
12+
{
13+
/// <summary>
14+
/// Add a single entry
15+
/// </summary>
16+
/// <param name="input"></param>
17+
/// <param name="timestamp"></param>
18+
ITopKState<T> Add(T input, long timestamp);
19+
20+
/// <summary>
21+
/// Removes the specified entry
22+
/// </summary>
23+
/// <param name="input"></param>
24+
/// <param name="timestamp"></param>
25+
ITopKState<T> Remove(T input, long timestamp);
26+
27+
/// <summary>
28+
/// Removes entries from other
29+
/// </summary>
30+
/// <param name="other"></param>
31+
ITopKState<T> RemoveAll(ITopKState<T> other);
32+
33+
/// <summary>
34+
/// Gets the values as sorted set
35+
/// </summary>
36+
/// <returns></returns>
37+
SortedMultiSet<T> GetSortedValues();
38+
39+
/// <summary>
40+
/// Returns total number of values in the set
41+
/// </summary>
42+
long Count { get; }
43+
}
44+
45+
internal class SimpleTopKState<T> : ITopKState<T>
46+
{
47+
private SortedMultiSet<T> values;
48+
49+
public SimpleTopKState(Func<SortedDictionary<T, long>> generator)
50+
{
51+
this.values = new SortedMultiSet<T>(generator);
52+
}
53+
54+
public long Count => this.values.TotalCount;
55+
56+
public ITopKState<T> Add(T input, long timestamp)
57+
{
58+
this.values.Add(input);
59+
return this;
60+
}
61+
62+
public SortedMultiSet<T> GetSortedValues() => this.values;
63+
64+
public ITopKState<T> Remove(T input, long timestamp)
65+
{
66+
this.values.Remove(input);
67+
return this;
68+
}
69+
70+
public ITopKState<T> RemoveAll(ITopKState<T> other)
71+
{
72+
this.values.RemoveAll(other.GetSortedValues());
73+
return this;
74+
}
75+
}
76+
77+
internal class HoppingTopKState<T> : ITopKState<T>
78+
{
79+
public long currentTimestamp;
80+
81+
public CircularBuffer<ValueTuple<long, SortedMultiSet<T>>> previousValues;
82+
public SortedMultiSet<T> currentValues;
83+
84+
public int k;
85+
86+
public Comparison<T> rankComparer;
87+
private Func<SortedDictionary<T, long>> generator;
88+
private ItemAndCount<T> minValue; // The minimum threshold value in TopK
89+
90+
public HoppingTopKState(int k, Comparison<T> rankComparer, int hoppingWindowSize, Func<SortedDictionary<T, long>> generator)
91+
{
92+
this.k = k;
93+
this.rankComparer = rankComparer;
94+
this.currentValues = new SortedMultiSet<T>(generator);
95+
this.previousValues = new CircularBuffer<ValueTuple<long, SortedMultiSet<T>>>(hoppingWindowSize);
96+
this.generator = generator;
97+
}
98+
99+
public ITopKState<T> Add(T input, long timestamp)
100+
{
101+
// Verify that input is added in non-decreasing order
102+
if (timestamp < this.currentTimestamp)
103+
{
104+
throw new ArgumentException("Invalid timestamp");
105+
}
106+
107+
// First entry in new hop window, just add the value
108+
if (timestamp > this.currentTimestamp)
109+
{
110+
MergeCurrentToPrevious();
111+
this.currentTimestamp = timestamp;
112+
this.currentValues.Add(input);
113+
this.minValue = new ItemAndCount<T>(input, 1);
114+
return this;
115+
}
116+
117+
// these are subsequent entries
118+
int compare = rankComparer(input, this.minValue.Item);
119+
120+
if (this.currentValues.TotalCount < this.k) // if we have not reached k yet, add it
121+
{
122+
if (compare > 0)
123+
this.minValue = new ItemAndCount<T>(input, 1);
124+
else if (compare == 0)
125+
this.minValue.Count++;
126+
127+
this.currentValues.Add(input);
128+
return this;
129+
}
130+
else if (compare > 0) // We have reached k and new input is smaller than minimum
131+
{
132+
return this;
133+
}
134+
else if (compare == 0) // We have reached k and new input is equal to the minimum
135+
{
136+
this.currentValues.Add(input); // add to get ties
137+
this.minValue.Count++;
138+
return this;
139+
}
140+
else // The new item is less than minValue, so we need to remove some entries to make place for the new entry
141+
{
142+
this.currentValues.Add(input);
143+
var toRemove = this.currentValues.TotalCount - this.k;
144+
if (toRemove >= minValue.Count)
145+
{
146+
this.currentValues.RemoveAll(this.minValue.Item);
147+
this.minValue = this.currentValues.GetMinItem();
148+
}
149+
return this;
150+
}
151+
}
152+
153+
public ITopKState<T> Remove(T input, long timestamp)
154+
{
155+
throw new NotImplementedException("Cannot remove single elements from this state");
156+
}
157+
158+
public ITopKState<T> RemoveAll(ITopKState<T> other)
159+
{
160+
if (other.Count != 0)
161+
{
162+
if (other is HoppingTopKState<T> otherTopK)
163+
{
164+
if (otherTopK.currentTimestamp > this.currentTimestamp)
165+
{
166+
throw new ArgumentException("Cannot remove entries with current or future timestamp");
167+
}
168+
else if (otherTopK.currentTimestamp == this.currentTimestamp)
169+
{
170+
if (this.currentValues.TotalCount != otherTopK.currentValues.TotalCount)
171+
throw new InvalidOperationException("Invalid removal");
172+
173+
this.currentValues.Clear();
174+
this.previousValues.Clear();
175+
}
176+
else
177+
{
178+
while (this.previousValues.Count > 0)
179+
{
180+
var first = this.previousValues.PeekFirst();
181+
182+
if (first.Item1 > otherTopK.currentTimestamp)
183+
{
184+
break;
185+
}
186+
187+
if (first.Item1 == otherTopK.currentTimestamp &&
188+
first.Item2.TotalCount != otherTopK.currentValues.TotalCount)
189+
throw new InvalidOperationException("Invalid removal");
190+
191+
this.previousValues.Dequeue();
192+
}
193+
}
194+
}
195+
else
196+
{
197+
throw new InvalidOperationException("Cannot remove non-HoppingTopKState from HoppingTopKState");
198+
}
199+
}
200+
return this;
201+
}
202+
203+
// This function merges the current values to previous and is expensive
204+
// Currently it is only called by ComputeResult
205+
public SortedMultiSet<T> GetSortedValues()
206+
{
207+
var sortedMultiset = new SortedMultiSet<T>(generator);
208+
209+
foreach (var dictItem in this.previousValues.Iterate())
210+
{
211+
sortedMultiset.AddAll(dictItem.Item2);
212+
}
213+
sortedMultiset.AddAll(this.currentValues);
214+
215+
return sortedMultiset;
216+
}
217+
218+
private void MergeCurrentToPrevious()
219+
{
220+
if (!this.currentValues.IsEmpty)
221+
{
222+
var newEntry = ValueTuple.Create(this.currentTimestamp, this.currentValues);
223+
this.previousValues.Enqueue(ref newEntry);
224+
this.currentValues = new SortedMultiSet<T>(generator);
225+
}
226+
}
227+
228+
public long Count => this.currentValues.TotalCount + this.previousValues.Iterate().Sum(e => e.Item2.TotalCount);
229+
}
230+
}

Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ public T Dequeue()
105105
[EditorBrowsable(EditorBrowsableState.Never)]
106106
public bool IsEmpty() => this.head == this.tail;
107107

108+
/// <summary>
109+
/// Removes alll elements from the list - do not use directly.
110+
/// </summary>
111+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
112+
[EditorBrowsable(EditorBrowsableState.Never)]
113+
public void Clear() => this.head = this.tail = 0;
114+
108115
/// <summary>
109116
/// Currently for internal use only - do not use directly.
110117
/// </summary>
@@ -260,6 +267,17 @@ public IEnumerator<T> GetEnumerator()
260267
}
261268
}
262269

270+
/// <summary>
271+
/// Currently for internal use only - do not use directly.
272+
/// </summary>
273+
[EditorBrowsable(EditorBrowsableState.Never)]
274+
public void Clear()
275+
{
276+
this.tail = this.head = this.buffers.First;
277+
this.head.Value.Clear();
278+
this.Count = 0;
279+
}
280+
263281
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
264282
}
265283
}

0 commit comments

Comments
 (0)