diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 1aeed4c6a9..d2f187bef3 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -11158,7 +11158,7 @@ public final Observable> toList() { * @see ReactiveX operators documentation: To */ public final Observable> toMap(Func1 keySelector) { - return lift(new OperatorToMap(keySelector, UtilityFunctions.identity())); + return create(new OnSubscribeToMap(this, keySelector, UtilityFunctions.identity())); } /** @@ -11188,7 +11188,7 @@ public final Observable> toMap(Func1 keySe * @see ReactiveX operators documentation: To */ public final Observable> toMap(Func1 keySelector, Func1 valueSelector) { - return lift(new OperatorToMap(keySelector, valueSelector)); + return create(new OnSubscribeToMap(this, keySelector, valueSelector)); } /** @@ -11217,7 +11217,7 @@ public final Observable> toMap(Func1 ke * @see ReactiveX operators documentation: To */ public final Observable> toMap(Func1 keySelector, Func1 valueSelector, Func0> mapFactory) { - return lift(new OperatorToMap(keySelector, valueSelector, mapFactory)); + return create(new OnSubscribeToMap(this, keySelector, valueSelector, mapFactory)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorToMap.java b/src/main/java/rx/internal/operators/OnSubscribeToMap.java similarity index 52% rename from src/main/java/rx/internal/operators/OperatorToMap.java rename to src/main/java/rx/internal/operators/OnSubscribeToMap.java index c346d7476b..a15014e1f3 100644 --- a/src/main/java/rx/internal/operators/OperatorToMap.java +++ b/src/main/java/rx/internal/operators/OnSubscribeToMap.java @@ -16,13 +16,15 @@ package rx.internal.operators; -import java.util.*; +import java.util.HashMap; +import java.util.Map; -import rx.Observable.Operator; +import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Subscriber; import rx.exceptions.Exceptions; -import rx.functions.*; -import rx.observers.Subscribers; +import rx.functions.Func0; +import rx.functions.Func1; /** * Maps the elements of the source observable into a java.util.Map instance and @@ -33,35 +35,25 @@ * @param the map-key type * @param the map-value type */ -public final class OperatorToMap implements Operator, T> { +public final class OnSubscribeToMap implements OnSubscribe>, Func0> { + + final Observable source; final Func1 keySelector; final Func1 valueSelector; - private final Func0> mapFactory; - - /** - * The default map factory. - * @param the key type - * @param the value type - */ - public static final class DefaultToMapFactory implements Func0> { - @Override - public Map call() { - return new HashMap(); - } - } + final Func0> mapFactory; /** * ToMap with key selector, value selector and default HashMap factory. * @param keySelector the function extracting the map-key from the main value * @param valueSelector the function extracting the map-value from the main value */ - public OperatorToMap( + public OnSubscribeToMap(Observable source, Func1 keySelector, Func1 valueSelector) { - this(keySelector, valueSelector, new DefaultToMapFactory()); + this(source, keySelector, valueSelector, null); } @@ -71,70 +63,72 @@ public OperatorToMap( * @param valueSelector the function extracting the map-value from the main value * @param mapFactory function that returns a Map instance to store keys and values into */ - public OperatorToMap( + public OnSubscribeToMap(Observable source, Func1 keySelector, Func1 valueSelector, Func0> mapFactory) { + this.source = source; this.keySelector = keySelector; this.valueSelector = valueSelector; - this.mapFactory = mapFactory; - + if (mapFactory == null) { + this.mapFactory = this; + } else { + this.mapFactory = mapFactory; + } } @Override - public Subscriber call(final Subscriber> subscriber) { - - Map localMap; - + public Map call() { + return new HashMap(); + } + + @Override + public void call(final Subscriber> subscriber) { + Map map; try { - localMap = mapFactory.call(); + map = mapFactory.call(); } catch (Throwable ex) { Exceptions.throwOrReport(ex, subscriber); - Subscriber parent = Subscribers.empty(); - parent.unsubscribe(); - return parent; + return; + } + new ToMapSubscriber(subscriber, map, keySelector, valueSelector) + .subscribeTo(source);; + } + + static final class ToMapSubscriber extends DeferredScalarSubscriberSafe> { + + final Func1 keySelector; + final Func1 valueSelector; + + ToMapSubscriber(Subscriber> actual, Map map, Func1 keySelector, + Func1 valueSelector) { + super(actual); + this.value = map; + this.hasValue = true; + this.keySelector = keySelector; + this.valueSelector = valueSelector; } - - final Map fLocalMap = localMap; - - return new Subscriber(subscriber) { - - private Map map = fLocalMap; - - @Override - public void onStart() { - request(Long.MAX_VALUE); - } - - @Override - public void onNext(T v) { - K key; - V value; - - try { - key = keySelector.call(v); - value = valueSelector.call(v); - } catch (Throwable ex) { - Exceptions.throwOrReport(ex, subscriber); - return; - } - - map.put(key, value); - } - @Override - public void onError(Throwable e) { - map = null; - subscriber.onError(e); + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + if (done) { + return; } - - @Override - public void onCompleted() { - Map map0 = map; - map = null; - subscriber.onNext(map0); - subscriber.onCompleted(); + try { + K key = keySelector.call(t); + V val = valueSelector.call(t); + value.put(key, val); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(ex); } - }; + } } + } diff --git a/src/test/java/rx/internal/operators/OperatorToMapTest.java b/src/test/java/rx/internal/operators/OnSubscribeToMapTest.java similarity index 66% rename from src/test/java/rx/internal/operators/OperatorToMapTest.java rename to src/test/java/rx/internal/operators/OnSubscribeToMapTest.java index 466cff0df8..0cdba86951 100644 --- a/src/test/java/rx/internal/operators/OperatorToMapTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeToMapTest.java @@ -15,22 +15,37 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verify; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; -import org.junit.*; -import org.mockito.*; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Producer; +import rx.Subscriber; import rx.exceptions.TestException; -import rx.functions.*; +import rx.functions.Action1; +import rx.functions.Func0; +import rx.functions.Func1; import rx.internal.util.UtilityFunctions; import rx.observers.TestSubscriber; +import rx.plugins.RxJavaHooks; -public class OperatorToMapTest { +public class OnSubscribeToMapTest { @Mock Observer objectObserver; @@ -281,4 +296,127 @@ public Map call() { ts.assertNoValues(); ts.assertNotCompleted(); } + + @Test + public void testFactoryFailureDoesNotAllowErrorAndCompletedEmissions() { + TestSubscriber> ts = TestSubscriber.create(0); + final RuntimeException e = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onCompleted(); + } + } + }); + } + }).toMap(new Func1() { + + @Override + public Integer call(Integer t) { + throw e; + } + }).unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + ts.assertNotCompleted(); + } + + @Test + public void testFactoryFailureDoesNotAllowTwoErrorEmissions() { + try { + final List list = new CopyOnWriteArrayList(); + RxJavaHooks.setOnError(new Action1() { + + @Override + public void call(Throwable t) { + list.add(t); + } + }); + TestSubscriber> ts = TestSubscriber.create(0); + final RuntimeException e1 = new RuntimeException(); + final RuntimeException e2 = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onError(e2); + } + } + }); + } + }).toMap(new Func1() { + + @Override + public Integer call(Integer t) { + throw e1; + } + }).unsafeSubscribe(ts); + ts.assertNoValues(); + assertEquals(Arrays.asList(e1), ts.getOnErrorEvents()); + assertEquals(Arrays.asList(e2), list); + ts.assertNotCompleted(); + } finally { + RxJavaHooks.reset(); + } + } + + @Test + public void testFactoryFailureDoesNotAllowErrorThenOnNextEmissions() { + TestSubscriber> ts = TestSubscriber.create(0); + final RuntimeException e = new RuntimeException(); + Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber sub) { + sub.setProducer(new Producer() { + + @Override + public void request(long n) { + if (n > 1) { + sub.onNext(1); + sub.onNext(2); + } + } + }); + } + }).toMap(new Func1() { + + @Override + public Integer call(Integer t) { + throw e; + } + }).unsafeSubscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + ts.assertNotCompleted(); + } + + @Test + public void testBackpressure() { + TestSubscriber ts = TestSubscriber.create(0); + Observable + .just("a", "bb", "ccc", "dddd") + .toMap(lengthFunc) + .subscribe(ts); + ts.assertNoErrors(); + ts.assertNotCompleted(); + ts.assertNoValues(); + ts.requestMore(1); + ts.assertValueCount(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } }