diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index f9dd5b9428..ea465379a9 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -7231,6 +7231,97 @@ public Observable> gro return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector)); } + /** + * Return an Observable which groups the items emitted by this Observable according to a specified key + * selector function until the duration Observable expires for the key or + * the total number of active groups exceeds the maxGroups value. + * + * @param the group key type + * @param the duration element type + * @param keySelector a function to extract the key for each item + * @param durationSelector a function to signal the expiration of a group + * @param maxGroups the maximum allowed concurrent groups + * @return + */ + public Observable> groupByUntil( + Func1 keySelector, + Func1, ? extends Observable> durationSelector, + int maxGroups) { + return groupByUntil(keySelector, Functions.identity(), durationSelector, maxGroups); + } + + /** + * Return an Observable which groups the items emitted by this Observable according to specified key and + * value selector functions until the duration Observable expires for the + * key. + * @param the group key type + * @param the value type within the groups + * @param the duration element type + * @param keySelector a function to extract the key for each item + * @param valueSelector a function to map each source item to an item + * emitted by an Observable group + * @param durationSelector a function to signal the expiration of a group + * @param maxGroups the maximum allowed concurrent groups + * @return + */ + public Observable> groupByUntil( + Func1 keySelector, + Func1 valueSelector, + Func1, ? extends Observable> durationSelector, + int maxGroups) { + if (maxGroups < 0) { + throw new IllegalArgumentException("maxGroups >= 0 required"); + } + return create(new OperationGroupByUntil(this, keySelector, valueSelector, durationSelector, maxGroups)); + } + + /** + * Groups the items emitted by an Observable according to a specified + * criterion, and emits these grouped items as {@link GroupedObservable}s, + * one GroupedObservable per group and limits the number of active groups. + *

+ * + * + * @param keySelector a function that extracts the key from an item + * @param elementSelector a function to map a source item to an item in a + * {@link GroupedObservable} + * @param maxGroups the maximum number of active groups. + * @param the key type + * @param the type of items emitted by the resulting + * {@link GroupedObservable}s + * @return an Observable that emits {@link GroupedObservable}s, each of + * which corresponds to a unique key value and emits items + * representing items from the source Observable that share that key + * value + * @see RxJava Wiki: groupBy + */ + public Observable> groupBy( + final Func1 keySelector, final Func1 elementSelector, + int maxGroups) { + return groupByUntil(keySelector, elementSelector, Functions.just1(never()), maxGroups); + } + + /** + * Groups the items emitted by an Observable according to a specified + * criterion, and emits these grouped items as {@link GroupedObservable}s, + * one GroupedObservable per group and limits the number of active groups. + *

+ * + * + * @param keySelector a function that extracts the key for each item + * @param maxGroups the maximum number of active groups. + * @param the key type + * @return an Observable that emits {@link GroupedObservable}s, each of + * which corresponds to a unique key value and emits items + * representing items from the source Observable that share that key + * value + * @see RxJava Wiki: groupBy + */ + public Observable> groupBy(final Func1 keySelector, int maxGroups) { + return groupByUntil(keySelector, Functions.just1(never()), maxGroups); + } + + /** * Invokes the specified function asynchronously and returns an Observable * that emits the result. diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java index 63a271a688..9bf3eff415 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupByUntil.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -29,7 +30,6 @@ import rx.subjects.Subject; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; /** @@ -43,14 +43,19 @@ public class OperationGroupByUntil implements final Func1 keySelector; final Func1 valueSelector; final Func1, ? extends Observable> durationSelector; + /** Number of active groups at once. */ + final int capacity; public OperationGroupByUntil(Observable source, Func1 keySelector, Func1 valueSelector, - Func1, ? extends Observable> durationSelector) { + Func1, ? extends Observable> durationSelector, + int capacity + ) { this.source = source; this.keySelector = keySelector; this.valueSelector = valueSelector; this.durationSelector = durationSelector; + this.capacity = capacity; } @Override @@ -61,17 +66,35 @@ public Subscription onSubscribe(Observer { + final class ResultSink implements Observer { /** Guarded by gate. */ protected final Observer> observer; protected final Subscription cancel; protected final CompositeSubscription group = new CompositeSubscription(); protected final Object gate = new Object(); /** Guarded by gate. */ - protected final Map> map = new HashMap>(); + protected final Map> map; public ResultSink(Observer> observer, Subscription cancel) { this.observer = observer; this.cancel = cancel; + Map> map0; + if (capacity < 0) { + map0 = new HashMap>(); + } else { + map0 = new LinkedHashMap>() { + + @Override + protected boolean removeEldestEntry(Map.Entry> eldest) { + if (size() > capacity) { + eldest.getValue().onCompleted(); + return true; + } + return false; + } + + }; + } + this.map = map0; } /** Prepare the subscription tree. */ public Subscription run() { @@ -173,7 +196,7 @@ public void expire(TKey key, Subscription handle) { handle.unsubscribe(); } /** Observe the completion of a group. */ - class DurationObserver implements Observer { + final class DurationObserver implements Observer { final TKey key; final Subscription handle; public DurationObserver(TKey key, Subscription handle) { @@ -197,27 +220,14 @@ public void onCompleted() { } } - protected static OnSubscribeFunc neverSubscribe() { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer t1) { - return Subscriptions.empty(); - } - }; - } /** A grouped observable with subject-like behavior. */ - public static class GroupSubject extends GroupedObservable implements Observer { + public static final class GroupSubject extends GroupedObservable implements Observer { protected final Subject publish; public GroupSubject(K key, Subject publish) { - super(key, OperationGroupByUntil.neverSubscribe()); + super(key, OperationReplay.subscriberOf(publish)); this.publish = publish; } - @Override - public Subscription subscribe(Observer observer) { - return publish.subscribe(observer); - } - @Override public void onNext(V args) { publish.onNext(args); diff --git a/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java b/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java index ca04ba70cd..1833902d9c 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationGroupByUntilTest.java @@ -15,8 +15,10 @@ */ package rx.operators; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import junit.framework.Assert; @@ -29,6 +31,7 @@ import rx.Observable; import rx.Observer; import rx.observables.GroupedObservable; +import rx.subjects.PublishSubject; import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Functions; @@ -299,4 +302,68 @@ public void onCompleted() { verify(observer, never()).onCompleted(); verify(observer, never()).onNext(any()); } -} \ No newline at end of file + + @Test + @SuppressWarnings("unchecked") + public void testAsGroupByMaxGroups() { + PublishSubject source = PublishSubject.create(); + + int n = 10; + int k = 3; + + final List> observers = new ArrayList>(); + // capture group events + final Observer main = mock(Observer.class); + observers.add(main); + + for (int i = 1; i <= n; i++) { + Observer o = mock(Observer.class); + observers.add(o); + } + + InOrder inOrder = inOrder(observers.toArray()); + + Observable> result = source.groupBy( + Functions.identity(), k); + + result.subscribe(new Observer>() { + @Override + public void onNext(GroupedObservable args) { + main.onNext(args.getKey()); + args.subscribe(observers.get(args.getKey())); + } + + @Override + public void onError(Throwable e) { + main.onError(e); + } + + @Override + public void onCompleted() { + main.onCompleted(); + } + }); + + for (int i = 1; i <= n; i++) { + source.onNext(i); + } + source.onCompleted(); + + for (int i = 1; i <= n; i++) { + if (i > k) { + inOrder.verify(observers.get(i - k)).onCompleted(); + } + inOrder.verify(main).onNext(i); + inOrder.verify(observers.get(i)).onNext(i); + } + + for (int i = n - k + 1; i <= n; i++) { + inOrder.verify(observers.get(i)).onCompleted(); + } + inOrder.verify(main).onCompleted(); + + for (Observer o : observers) { + verify(o, never()).onError(any(Throwable.class)); + } + } +}