Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7231,6 +7231,97 @@ public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> gro
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
}

/**
* Return an Observable which groups the items emitted by this Observable according to a specified key
* selector function until the duration Observable expires for the key or
* the total number of active groups exceeds the maxGroups value.
*
* @param <TKey> the group key type
* @param <TDuration> the duration element type
* @param keySelector a function to extract the key for each item
* @param durationSelector a function to signal the expiration of a group
* @param maxGroups the maximum allowed concurrent groups
* @return
*/
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(
Func1<? super T, ? extends TKey> keySelector,
Func1<? super GroupedObservable<TKey, T>, ? extends Observable<TDuration>> durationSelector,
int maxGroups) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match the Rx.Net signature: http://msdn.microsoft.com/en-us/library/hh211932(v=vs.103).aspx

public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector
)

Why is maxGroups being added to the signature?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this added in Rx 2.2.2: http://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs

I thought it was about limiting the groups, but apparently the ConcurrentDictionary uses the capacity to specify the concurrency level, not the group number.

I guess this PR can be ignored.

return groupByUntil(keySelector, Functions.<T>identity(), durationSelector, maxGroups);
}

/**
* Return an Observable which groups the items emitted by this Observable according to specified key and
* value selector functions until the duration Observable expires for the
* key.
* @param <TKey> the group key type
* @param <TValue> the value type within the groups
* @param <TDuration> the duration element type
* @param keySelector a function to extract the key for each item
* @param valueSelector a function to map each source item to an item
* emitted by an Observable group
* @param durationSelector a function to signal the expiration of a group
* @param maxGroups the maximum allowed concurrent groups
* @return
*/
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(
Func1<? super T, ? extends TKey> keySelector,
Func1<? super T, ? extends TValue> valueSelector,
Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector,
int maxGroups) {
if (maxGroups < 0) {
throw new IllegalArgumentException("maxGroups >= 0 required");
}
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector, maxGroups));
}

/**
* Groups the items emitted by an Observable according to a specified
* criterion, and emits these grouped items as {@link GroupedObservable}s,
* one GroupedObservable per group and limits the number of active groups.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png">
*
* @param keySelector a function that extracts the key from an item
* @param elementSelector a function to map a source item to an item in a
* {@link GroupedObservable}
* @param maxGroups the maximum number of active groups.
* @param <K> the key type
* @param <R> the type of items emitted by the resulting
* {@link GroupedObservable}s
* @return an Observable that emits {@link GroupedObservable}s, each of
* which corresponds to a unique key value and emits items
* representing items from the source Observable that share that key
* value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
*/
public <K, R> Observable<GroupedObservable<K, R>> groupBy(
final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector,
int maxGroups) {
return groupByUntil(keySelector, elementSelector, Functions.just1(never()), maxGroups);
}

/**
* Groups the items emitted by an Observable according to a specified
* criterion, and emits these grouped items as {@link GroupedObservable}s,
* one GroupedObservable per group and limits the number of active groups.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png">
*
* @param keySelector a function that extracts the key for each item
* @param maxGroups the maximum number of active groups.
* @param <K> the key type
* @return an Observable that emits {@link GroupedObservable}s, each of
* which corresponds to a unique key value and emits items
* representing items from the source Observable that share that key
* value
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
*/
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector, int maxGroups) {
return groupByUntil(keySelector, Functions.just1(never()), maxGroups);
}


/**
* Invokes the specified function asynchronously and returns an Observable
* that emits the result.
Expand Down
50 changes: 30 additions & 20 deletions rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -29,7 +30,6 @@
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
Expand All @@ -43,14 +43,19 @@ public class OperationGroupByUntil<TSource, TKey, TResult, TDuration> implements
final Func1<? super TSource, ? extends TKey> keySelector;
final Func1<? super TSource, ? extends TResult> valueSelector;
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector;
/** Number of active groups at once. */
final int capacity;
public OperationGroupByUntil(Observable<TSource> source,
Func1<? super TSource, ? extends TKey> keySelector,
Func1<? super TSource, ? extends TResult> valueSelector,
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector) {
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector,
int capacity
) {
this.source = source;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.durationSelector = durationSelector;
this.capacity = capacity;
}

@Override
Expand All @@ -61,17 +66,35 @@ public Subscription onSubscribe(Observer<? super GroupedObservable<TKey, TResult
return cancel;
}
/** The source value sink and group manager. */
class ResultSink implements Observer<TSource> {
final class ResultSink implements Observer<TSource> {
/** Guarded by gate. */
protected final Observer<? super GroupedObservable<TKey, TResult>> observer;
protected final Subscription cancel;
protected final CompositeSubscription group = new CompositeSubscription();
protected final Object gate = new Object();
/** Guarded by gate. */
protected final Map<TKey, GroupSubject<TKey, TResult>> map = new HashMap<TKey, GroupSubject<TKey, TResult>>();
protected final Map<TKey, GroupSubject<TKey, TResult>> map;
public ResultSink(Observer<? super GroupedObservable<TKey, TResult>> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
Map<TKey, GroupSubject<TKey, TResult>> map0;
if (capacity < 0) {
map0 = new HashMap<TKey, GroupSubject<TKey, TResult>>();
} else {
map0 = new LinkedHashMap<TKey, GroupSubject<TKey, TResult>>() {

@Override
protected boolean removeEldestEntry(Map.Entry<TKey, GroupSubject<TKey, TResult>> eldest) {
if (size() > capacity) {
eldest.getValue().onCompleted();
return true;
}
return false;
}

};
}
this.map = map0;
}
/** Prepare the subscription tree. */
public Subscription run() {
Expand Down Expand Up @@ -173,7 +196,7 @@ public void expire(TKey key, Subscription handle) {
handle.unsubscribe();
}
/** Observe the completion of a group. */
class DurationObserver implements Observer<TDuration> {
final class DurationObserver implements Observer<TDuration> {
final TKey key;
final Subscription handle;
public DurationObserver(TKey key, Subscription handle) {
Expand All @@ -197,27 +220,14 @@ public void onCompleted() {

}
}
protected static <T> OnSubscribeFunc<T> neverSubscribe() {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return Subscriptions.empty();
}
};
}
/** A grouped observable with subject-like behavior. */
public static class GroupSubject<K, V> extends GroupedObservable<K, V> implements Observer<V> {
public static final class GroupSubject<K, V> extends GroupedObservable<K, V> implements Observer<V> {
protected final Subject<V, V> publish;
public GroupSubject(K key, Subject<V, V> publish) {
super(key, OperationGroupByUntil.<V>neverSubscribe());
super(key, OperationReplay.subscriberOf(publish));
this.publish = publish;
}

@Override
public Subscription subscribe(Observer<? super V> observer) {
return publish.subscribe(observer);
}

@Override
public void onNext(V args) {
publish.onNext(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package rx.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.Assert;
Expand All @@ -29,6 +31,7 @@
import rx.Observable;
import rx.Observer;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Functions;
Expand Down Expand Up @@ -299,4 +302,68 @@ public void onCompleted() {
verify(observer, never()).onCompleted();
verify(observer, never()).onNext(any());
}
}

@Test
@SuppressWarnings("unchecked")
public void testAsGroupByMaxGroups() {
PublishSubject<Integer> source = PublishSubject.create();

int n = 10;
int k = 3;

final List<Observer<Object>> observers = new ArrayList<Observer<Object>>();
// capture group events
final Observer<Object> main = mock(Observer.class);
observers.add(main);

for (int i = 1; i <= n; i++) {
Observer<Object> o = mock(Observer.class);
observers.add(o);
}

InOrder inOrder = inOrder(observers.toArray());

Observable<GroupedObservable<Integer, Integer>> result = source.groupBy(
Functions.<Integer>identity(), k);

result.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onNext(GroupedObservable<Integer, Integer> args) {
main.onNext(args.getKey());
args.subscribe(observers.get(args.getKey()));
}

@Override
public void onError(Throwable e) {
main.onError(e);
}

@Override
public void onCompleted() {
main.onCompleted();
}
});

for (int i = 1; i <= n; i++) {
source.onNext(i);
}
source.onCompleted();

for (int i = 1; i <= n; i++) {
if (i > k) {
inOrder.verify(observers.get(i - k)).onCompleted();
}
inOrder.verify(main).onNext(i);
inOrder.verify(observers.get(i)).onNext(i);
}

for (int i = n - k + 1; i <= n; i++) {
inOrder.verify(observers.get(i)).onCompleted();
}
inOrder.verify(main).onCompleted();

for (Observer<Object> o : observers) {
verify(o, never()).onError(any(Throwable.class));
}
}
}