diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java index b87a49fdd1..0ec59d71b2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java @@ -55,10 +55,9 @@ public ToObservableList(Observable that) { public Subscription onSubscribe(final Observer> observer) { return that.subscribe(new Observer() { - final ConcurrentLinkedQueue list = new ConcurrentLinkedQueue(); + final List list = new ArrayList(); public void onNext(T value) { - // onNext can be concurrently executed so list must be thread-safe list.add(value); } @@ -68,16 +67,10 @@ public void onError(Throwable ex) { public void onCompleted() { try { - // copy from LinkedQueue to List since ConcurrentLinkedQueue does not implement the List interface - ArrayList l = new ArrayList(list.size()); - for (T t : list) { - l.add(t); - } - // benjchristensen => I want to make this list immutable but some clients are sorting this // instead of using toSortedList() and this change breaks them until we migrate their code. // observer.onNext(Collections.unmodifiableList(l)); - observer.onNext(l); + observer.onNext(new ArrayList(list)); observer.onCompleted(); } catch (Throwable e) { onError(e); diff --git a/rxjava-core/src/test/java/rx/operators/OperationToObservableListTest.java b/rxjava-core/src/test/java/rx/operators/OperationToObservableListTest.java index 1124ea6211..767c05acfb 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationToObservableListTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationToObservableListTest.java @@ -66,4 +66,17 @@ public void testListMultipleObservers() { verify(o2, Mockito.never()).onError(any(Throwable.class)); verify(o2, times(1)).onCompleted(); } + + @Test + public void testListWithNullValue() { + Observable w = Observable.from("one", null, "three"); + Observable> observable = Observable.create(toObservableList(w)); + + @SuppressWarnings("unchecked") + Observer> aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(Arrays.asList("one", null, "three")); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } }