diff --git a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java index 001058763b..c2799df758 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java +++ b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java @@ -220,7 +220,7 @@ void drain() { final int delayErrorMode = this.delayErrorMode; - do { + for (;;) { if (actual.isUnsubscribed()) { return; } @@ -273,6 +273,8 @@ void drain() { if (source instanceof ScalarSynchronousObservable) { ScalarSynchronousObservable scalarSource = (ScalarSynchronousObservable) source; + active = true; + arbiter.setProducer(new ConcatMapInnerScalarProducer(scalarSource.get(), this)); } else { ConcatMapInnerSubscriber innerSubscriber = new ConcatMapInnerSubscriber(this); @@ -286,11 +288,17 @@ void drain() { return; } } + request(1); + } else { + request(1); + continue; } - request(1); } } - } while (wip.decrementAndGet() != 0); + if (wip.decrementAndGet() == 0) { + break; + } + } } void drainError(Throwable mapperError) { @@ -352,7 +360,7 @@ public ConcatMapInnerScalarProducer(R value, ConcatMapSubscriber parent) { @Override public void request(long n) { - if (!once) { + if (!once && n > 0L) { once = true; ConcatMapSubscriber p = parent; p.innerNext(value); diff --git a/src/test/java/rx/internal/operators/OperatorConcatTest.java b/src/test/java/rx/internal/operators/OperatorConcatTest.java index 65aa1f5307..a824374659 100644 --- a/src/test/java/rx/internal/operators/OperatorConcatTest.java +++ b/src/test/java/rx/internal/operators/OperatorConcatTest.java @@ -850,4 +850,64 @@ public Observable call(Integer t) { } } + @Test + public void scalarAndRangeBackpressured() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).concatWith(Observable.range(2, 3)).subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(5); + + ts.assertValues(1, 2, 3, 4); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void scalarAndEmptyBackpressured() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.just(1).concatWith(Observable.empty()).subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(5); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void rangeAndEmptyBackpressured() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.range(1, 2).concatWith(Observable.empty()).subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(5); + + ts.assertValues(1, 2); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void emptyAndScalarBackpressured() { + TestSubscriber ts = TestSubscriber.create(0); + + Observable.empty().concatWith(Observable.just(1)).subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(5); + + ts.assertValue(1); + ts.assertCompleted(); + ts.assertNoErrors(); + } + } \ No newline at end of file