diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 0031f53f4d..55d4c79cc4 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -3455,6 +3455,50 @@ public static Flowable range(int start, int count) { return RxJavaPlugins.onAssembly(new FlowableRange(start, count)); } + /** + * Returns a Flowable that emits a sequence of Longs within a specified range. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and signals values on-demand (i.e., when requested).
+ *
Scheduler:
+ *
{@code rangeLong} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param start + * the value of the first Long in the sequence + * @param count + * the number of sequential Longs to generate + * @return a Flowable that emits a range of sequential Longs + * @throws IllegalArgumentException + * if {@code count} is less than zero, or if {@code start} + {@code count} − 1 exceeds + * {@code Long.MAX_VALUE} + * @see ReactiveX operators documentation: Range + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable rangeLong(long start, long count) { + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required but it was " + count); + } + + if (count == 0) { + return empty(); + } + + if (count == 1) { + return just(start); + } + + long end = start + (count - 1); + if (start > 0 && end < 0) { + throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE"); + } + + return RxJavaPlugins.onAssembly(new FlowableRangeLong(start, count)); + } + /** * Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the * same by comparing the items emitted by each Publisher pairwise. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 2ed857205f..fecbeefae5 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -2992,6 +2992,47 @@ public static Observable range(final int start, final int count) { return RxJavaPlugins.onAssembly(new ObservableRange(start, count)); } + /** + * Returns an Observable that emits a sequence of Longs within a specified range. + *

+ * + *

+ *
Scheduler:
+ *
{@code rangeLong} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param start + * the value of the first Long in the sequence + * @param count + * the number of sequential Longs to generate + * @return an Observable that emits a range of sequential Longs + * @throws IllegalArgumentException + * if {@code count} is less than zero, or if {@code start} + {@code count} − 1 exceeds + * {@code Long.MAX_VALUE} + * @see ReactiveX operators documentation: Range + */ + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable rangeLong(long start, long count) { + if (count < 0) { + throw new IllegalArgumentException("count >= 0 required but it was " + count); + } + + if (count == 0) { + return empty(); + } + + if (count == 1) { + return just(start); + } + + long end = start + (count - 1); + if (start > 0 && end < 0) { + throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE"); + } + + return RxJavaPlugins.onAssembly(new ObservableRangeLong(start, count)); + } + /** * Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the * same by comparing the items emitted by each ObservableSource pairwise. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java new file mode 100644 index 0000000000..f9efa4330f --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java @@ -0,0 +1,246 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import io.reactivex.Flowable; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscriptions.BasicQueueSubscription; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; +import org.reactivestreams.Subscriber; + +/** + * Emits a range of long values. + */ +public final class FlowableRangeLong extends Flowable { + final long start; + final long end; + + public FlowableRangeLong(long start, long count) { + this.start = start; + this.end = start + count; + } + + @Override + public void subscribeActual(Subscriber s) { + if (s instanceof ConditionalSubscriber) { + s.onSubscribe(new RangeConditionalSubscription( + (ConditionalSubscriber)s, start, end)); + } else { + s.onSubscribe(new RangeSubscription(s, start, end)); + } + } + + abstract static class BaseRangeSubscription extends BasicQueueSubscription { + + private static final long serialVersionUID = -2252972430506210021L; + + final long end; + + long index; + + volatile boolean cancelled; + + BaseRangeSubscription(long index, long end) { + this.index = index; + this.end = end; + } + + @Override + public final int requestFusion(int mode) { + return mode & SYNC; + } + + @Override + public final Long poll() { + long i = index; + if (i == end) { + return null; + } + index = i + 1; + return i; + } + + @Override + public final boolean isEmpty() { + return index == end; + } + + @Override + public final void clear() { + index = end; + } + + @Override + public final void request(long n) { + if (SubscriptionHelper.validate(n)) { + if (BackpressureHelper.add(this, n) == 0L) { + if (n == Long.MAX_VALUE) { + fastPath(); + } else { + slowPath(n); + } + } + } + } + + @Override + public final void cancel() { + cancelled = true; + } + + + abstract void fastPath(); + + abstract void slowPath(long r); + } + + static final class RangeSubscription extends BaseRangeSubscription { + + private static final long serialVersionUID = 2587302975077663557L; + + final Subscriber actual; + + RangeSubscription(Subscriber actual, long index, long end) { + super(index, end); + this.actual = actual; + } + + @Override + void fastPath() { + long f = end; + Subscriber a = actual; + + for (long i = index; i != f; i++) { + if (cancelled) { + return; + } + a.onNext(i); + } + if (cancelled) { + return; + } + a.onComplete(); + } + + @Override + void slowPath(long r) { + long e = 0; + long f = end; + long i = index; + Subscriber a = actual; + + for (;;) { + + while (e != r && i != f) { + if (cancelled) { + return; + } + + a.onNext(i); + + e++; + i++; + } + + if (i == f) { + if (!cancelled) { + a.onComplete(); + } + return; + } + + r = get(); + if (e == r) { + index = i; + r = addAndGet(-e); + if (r == 0L) { + return; + } + e = 0L; + } + } + } + } + + static final class RangeConditionalSubscription extends BaseRangeSubscription { + + + private static final long serialVersionUID = 2587302975077663557L; + + final ConditionalSubscriber actual; + + RangeConditionalSubscription(ConditionalSubscriber actual, long index, long end) { + super(index, end); + this.actual = actual; + } + + @Override + void fastPath() { + long f = end; + ConditionalSubscriber a = actual; + + for (long i = index; i != f; i++) { + if (cancelled) { + return; + } + a.tryOnNext(i); + } + if (cancelled) { + return; + } + a.onComplete(); + } + + @Override + void slowPath(long r) { + long e = 0; + long f = end; + long i = index; + ConditionalSubscriber a = actual; + + for (;;) { + + while (e != r && i != f) { + if (cancelled) { + return; + } + + if (a.tryOnNext(i)) { + e++; + } + + i++; + } + + if (i == f) { + if (!cancelled) { + a.onComplete(); + } + return; + } + + r = get(); + if (e == r) { + index = i; + r = addAndGet(-e); + if (r == 0) { + return; + } + e = 0; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java new file mode 100644 index 0000000000..c26e5fe903 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java @@ -0,0 +1,122 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.observable; + +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.internal.fuseable.QueueDisposable; +import java.util.concurrent.atomic.AtomicLong; + +public final class ObservableRangeLong extends Observable { + private final long start; + private final long count; + + public ObservableRangeLong(long start, long count) { + this.start = start; + this.count = count; + } + + @Override + protected void subscribeActual(Observer o) { + RangeDisposable parent = new RangeDisposable(o, start, start + count); + o.onSubscribe(parent); + parent.run(); + } + + static final class RangeDisposable + extends AtomicLong + implements QueueDisposable { + + private static final long serialVersionUID = 396518478098735504L; + + final Observer actual; + + final long end; + + long index; + + boolean fused; + + RangeDisposable(Observer actual, long start, long end) { + this.actual = actual; + this.index = start; + this.end = end; + } + + void run() { + if (fused) { + return; + } + Observer actual = this.actual; + long e = end; + for (long i = index; i != e && get() == 0; i++) { + actual.onNext(i); + } + if (get() == 0) { + lazySet(1); + actual.onComplete(); + } + } + + @Override + public boolean offer(Long value) { + throw new UnsupportedOperationException("Should not be called!"); + } + + @Override + public boolean offer(Long v1, Long v2) { + throw new UnsupportedOperationException("Should not be called!"); + } + + @Override + public Long poll() throws Exception { + long i = index; + if (i != end) { + index = i + 1; + return i; + } + lazySet(1); + return null; + } + + @Override + public boolean isEmpty() { + return index == end; + } + + @Override + public void clear() { + index = end; + lazySet(1); + } + + @Override + public void dispose() { + set(1); + } + + @Override + public boolean isDisposed() { + return get() != 0; + } + + @Override + public int requestFusion(int mode) { + if ((mode & SYNC) != 0) { + fused = true; + return SYNC; + } + return NONE; + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java new file mode 100644 index 0000000000..6267d08730 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeLongTest.java @@ -0,0 +1,295 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import io.reactivex.Flowable; +import io.reactivex.TestHelper; +import io.reactivex.functions.Consumer; +import io.reactivex.subscribers.DefaultSubscriber; +import io.reactivex.subscribers.TestSubscriber; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FlowableRangeLongTest { + + @Test + public void testRangeStartAt2Count3() { + Subscriber observer = TestHelper.mockSubscriber(); + + Flowable.rangeLong(2, 3).subscribe(observer); + + verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onNext(3L); + verify(observer, times(1)).onNext(4L); + verify(observer, never()).onNext(5L); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onComplete(); + } + + @Test + public void testRangeUnsubscribe() { + Subscriber observer = TestHelper.mockSubscriber(); + + final AtomicInteger count = new AtomicInteger(); + + Flowable.rangeLong(1, 1000).doOnNext(new Consumer() { + @Override + public void accept(Long t1) { + count.incrementAndGet(); + } + }) + .take(3).subscribe(observer); + + verify(observer, times(1)).onNext(1L); + verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onNext(3L); + verify(observer, never()).onNext(4L); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onComplete(); + assertEquals(3, count.get()); + } + + @Test + public void testRangeWithZero() { + Flowable.rangeLong(1, 0); + } + + @Test + public void testRangeWithOverflow2() { + Flowable.rangeLong(Long.MAX_VALUE, 0); + } + + @Test + public void testRangeWithOverflow3() { + Flowable.rangeLong(1, Long.MAX_VALUE); + } + + @Test(expected = IllegalArgumentException.class) + public void testRangeWithOverflow4() { + Flowable.rangeLong(2, Long.MAX_VALUE); + } + + @Test + public void testRangeWithOverflow5() { + assertFalse(Flowable.rangeLong(Long.MIN_VALUE, 0).blockingIterable().iterator().hasNext()); + } + + @Test + public void testBackpressureViaRequest() { + Flowable o = Flowable.rangeLong(1, Flowable.bufferSize()); + + TestSubscriber ts = new TestSubscriber(0L); + + ts.assertNoValues(); + ts.request(1); + + o.subscribe(ts); + + ts.assertValue(1L); + + ts.request(2); + ts.assertValues(1L, 2L, 3L); + + ts.request(3); + ts.assertValues(1L, 2L, 3L, 4L, 5L, 6L); + + ts.request(Flowable.bufferSize()); + ts.assertTerminated(); + } + + @Test + public void testNoBackpressure() { + ArrayList list = new ArrayList(Flowable.bufferSize() * 2); + for (long i = 1; i <= Flowable.bufferSize() * 2 + 1; i++) { + list.add(i); + } + + Flowable o = Flowable.rangeLong(1, list.size()); + + TestSubscriber ts = new TestSubscriber(0L); + + ts.assertNoValues(); + ts.request(Long.MAX_VALUE); // infinite + + o.subscribe(ts); + + ts.assertValueSequence(list); + ts.assertTerminated(); + } + void testWithBackpressureOneByOne(long start) { + Flowable source = Flowable.rangeLong(start, 100); + + TestSubscriber ts = new TestSubscriber(0L); + ts.request(1); + source.subscribe(ts); + + List list = new ArrayList(100); + for (long i = 0; i < 100; i++) { + list.add(i + start); + ts.request(1); + } + ts.assertValueSequence(list); + ts.assertTerminated(); + } + void testWithBackpressureAllAtOnce(long start) { + Flowable source = Flowable.rangeLong(start, 100); + + TestSubscriber ts = new TestSubscriber(0L); + ts.request(100); + source.subscribe(ts); + + List list = new ArrayList(100); + for (long i = 0; i < 100; i++) { + list.add(i + start); + } + ts.assertValueSequence(list); + ts.assertTerminated(); + } + @Test + public void testWithBackpressure1() { + for (long i = 0; i < 100; i++) { + testWithBackpressureOneByOne(i); + } + } + @Test + public void testWithBackpressureAllAtOnce() { + for (long i = 0; i < 100; i++) { + testWithBackpressureAllAtOnce(i); + } + } + @Test + public void testWithBackpressureRequestWayMore() { + Flowable source = Flowable.rangeLong(50, 100); + + TestSubscriber ts = new TestSubscriber(0L); + ts.request(150); + source.subscribe(ts); + + List list = new ArrayList(100); + for (long i = 0; i < 100; i++) { + list.add(i + 50); + } + + ts.request(50); // and then some + + ts.assertValueSequence(list); + ts.assertTerminated(); + } + + @Test + public void testRequestOverflow() { + final AtomicInteger count = new AtomicInteger(); + int n = 10; + Flowable.rangeLong(1, n).subscribe(new DefaultSubscriber() { + + @Override + public void onStart() { + request(2); + } + + @Override + public void onComplete() { + //do nothing + } + + @Override + public void onError(Throwable e) { + throw new RuntimeException(e); + } + + @Override + public void onNext(Long t) { + count.incrementAndGet(); + request(Long.MAX_VALUE - 1); + }}); + assertEquals(n, count.get()); + } + + @Test + public void testEmptyRangeSendsOnCompleteEagerlyWithRequestZero() { + final AtomicBoolean completed = new AtomicBoolean(false); + Flowable.rangeLong(1, 0).subscribe(new DefaultSubscriber() { + + @Override + public void onStart() { +// request(0); + } + + @Override + public void onComplete() { + completed.set(true); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Long t) { + + }}); + assertTrue(completed.get()); + } + + @Test(timeout = 1000) + public void testNearMaxValueWithoutBackpressure() { + TestSubscriber ts = new TestSubscriber(); + Flowable.rangeLong(Long.MAX_VALUE - 1L, 2L).subscribe(ts); + + ts.assertComplete(); + ts.assertNoErrors(); + ts.assertValues(Long.MAX_VALUE - 1L, Long.MAX_VALUE); + } + + @Test(timeout = 1000) + public void testNearMaxValueWithBackpressure() { + TestSubscriber ts = new TestSubscriber(3L); + Flowable.rangeLong(Long.MAX_VALUE - 1L, 2L).subscribe(ts); + + ts.assertComplete(); + ts.assertNoErrors(); + ts.assertValues(Long.MAX_VALUE - 1L, Long.MAX_VALUE); + } + + @Test + public void negativeCount() { + try { + Flowable.rangeLong(1L, -1L); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException ex) { + assertEquals("count >= 0 required but it was -1", ex.getMessage()); + } + } + + @Test + public void countOne() { + Flowable.rangeLong(5495454L, 1L) + .test() + .assertResult(5495454L); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java new file mode 100644 index 0000000000..688b308526 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRangeLongTest.java @@ -0,0 +1,170 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import io.reactivex.Flowable; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.TestHelper; +import io.reactivex.functions.Consumer; +import io.reactivex.observers.DefaultObserver; +import io.reactivex.observers.TestObserver; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ObservableRangeLongTest { + @Test + public void testRangeStartAt2Count3() { + Observer observer = TestHelper.mockObserver(); + + Observable.rangeLong(2, 3).subscribe(observer); + + verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onNext(3L); + verify(observer, times(1)).onNext(4L); + verify(observer, never()).onNext(5L); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onComplete(); + } + + @Test + public void testRangeUnsubscribe() { + Observer observer = TestHelper.mockObserver(); + + final AtomicInteger count = new AtomicInteger(); + + Observable.rangeLong(1, 1000).doOnNext(new Consumer() { + @Override + public void accept(Long t1) { + count.incrementAndGet(); + } + }) + .take(3).subscribe(observer); + + verify(observer, times(1)).onNext(1L); + verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onNext(3L); + verify(observer, never()).onNext(4L); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onComplete(); + assertEquals(3, count.get()); + } + + @Test + public void testRangeWithZero() { + Observable.rangeLong(1L, 0L); + } + + @Test + public void testRangeWithOverflow2() { + Observable.rangeLong(Long.MAX_VALUE, 0L); + } + + @Test + public void testRangeWithOverflow3() { + Observable.rangeLong(1L, Long.MAX_VALUE); + } + + @Test(expected = IllegalArgumentException.class) + public void testRangeWithOverflow4() { + Observable.rangeLong(2L, Long.MAX_VALUE); + } + + @Test + public void testRangeWithOverflow5() { + assertFalse(Observable.rangeLong(Long.MIN_VALUE, 0).blockingIterable().iterator().hasNext()); + } + + @Test + public void testNoBackpressure() { + ArrayList list = new ArrayList(Flowable.bufferSize() * 2); + for (long i = 1; i <= Flowable.bufferSize() * 2 + 1; i++) { + list.add(i); + } + + Observable o = Observable.rangeLong(1, list.size()); + + TestObserver ts = new TestObserver(); + + o.subscribe(ts); + + ts.assertValueSequence(list); + ts.assertTerminated(); + } + + @Test + public void testEmptyRangeSendsOnCompleteEagerlyWithRequestZero() { + final AtomicBoolean completed = new AtomicBoolean(false); + Observable.rangeLong(1L, 0L).subscribe(new DefaultObserver() { + + @Override + public void onStart() { +// request(0); + } + + @Override + public void onComplete() { + completed.set(true); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Long t) { + + }}); + assertTrue(completed.get()); + } + + @Test(timeout = 1000) + public void testNearMaxValueWithoutBackpressure() { + TestObserver ts = new TestObserver(); + Observable.rangeLong(Long.MAX_VALUE - 1L, 2L).subscribe(ts); + + ts.assertComplete(); + ts.assertNoErrors(); + ts.assertValues(Long.MAX_VALUE - 1, Long.MAX_VALUE); + } + + @Test + public void negativeCount() { + try { + Observable.rangeLong(1L, -1L); + fail("Should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException ex) { + assertEquals("count >= 0 required but it was -1", ex.getMessage()); + } + } + + @Test + public void countOne() { + Observable.rangeLong(5495454L, 1L) + .test() + .assertResult(5495454L); + } +}