diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 8ed9ef7951..b6234d4c1d 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -7353,6 +7353,32 @@ public final Flowable doFinally(Action onFinally) { return RxJavaPlugins.onAssembly(new FlowableDoFinally(this, onFinally)); } + /** + * Calls the specified consumer with the current item after this item has been emitted to the downstream. + *

Note that the {@code onAfterNext} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure + * behavior.
+ *
Scheduler:
+ *
{@code doAfterNext} does not operate by default on a particular {@link Scheduler}.
+ * Operator-fusion: + *
This operator supports normal and conditional Subscribers as well as boundary-limited + * synchronous or asynchronous queue-fusion.
+ *
+ * @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream + * @return the new Flowable instance + * @since 2.0.1 - experimental + */ + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable doAfterNext(Consumer onAfterNext) { + ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); + return RxJavaPlugins.onAssembly(new FlowableDoAfterNext(this, onAfterNext)); + } + /** * Registers an {@link Action} to be called when this Publisher invokes either * {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java new file mode 100644 index 0000000000..c847954050 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java @@ -0,0 +1,131 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import org.reactivestreams.*; + +import io.reactivex.annotations.Experimental; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscribers.*; + +/** + * Calls a consumer after pushing the current item to the downstream. + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class FlowableDoAfterNext extends AbstractFlowableWithUpstream { + + final Consumer onAfterNext; + + public FlowableDoAfterNext(Publisher source, Consumer onAfterNext) { + super(source); + this.onAfterNext = onAfterNext; + } + + @Override + protected void subscribeActual(Subscriber s) { + if (s instanceof ConditionalSubscriber) { + source.subscribe(new DoAfterConditionalSubscriber((ConditionalSubscriber)s, onAfterNext)); + } else { + source.subscribe(new DoAfterSubscriber(s, onAfterNext)); + } + } + + static final class DoAfterSubscriber extends BasicFuseableSubscriber { + + final Consumer onAfterNext; + + DoAfterSubscriber(Subscriber actual, Consumer onAfterNext) { + super(actual); + this.onAfterNext = onAfterNext; + } + + @Override + public void onNext(T t) { + actual.onNext(t); + + if (sourceMode == NONE) { + try { + onAfterNext.accept(t); + } catch (Throwable ex) { + fail(ex); + } + } + } + + @Override + public int requestFusion(int mode) { + return transitiveBoundaryFusion(mode); + } + + @Override + public T poll() throws Exception { + T v = qs.poll(); + if (v != null) { + onAfterNext.accept(v); + } + return v; + } + } + + static final class DoAfterConditionalSubscriber extends BasicFuseableConditionalSubscriber { + + final Consumer onAfterNext; + + DoAfterConditionalSubscriber(ConditionalSubscriber actual, Consumer onAfterNext) { + super(actual); + this.onAfterNext = onAfterNext; + } + + @Override + public void onNext(T t) { + actual.onNext(t); + + if (sourceMode == NONE) { + try { + onAfterNext.accept(t); + } catch (Throwable ex) { + fail(ex); + } + } + } + + @Override + public boolean tryOnNext(T t) { + boolean b = actual.tryOnNext(t); + try { + onAfterNext.accept(t); + } catch (Throwable ex) { + fail(ex); + } + return b; + } + + @Override + public int requestFusion(int mode) { + return transitiveBoundaryFusion(mode); + } + + @Override + public T poll() throws Exception { + T v = qs.poll(); + if (v != null) { + onAfterNext.accept(v); + } + return v; + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNextTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNextTest.java new file mode 100644 index 0000000000..7b6f1aea55 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNextTest.java @@ -0,0 +1,273 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.processors.UnicastProcessor; +import io.reactivex.subscribers.*; + +public class FlowableDoAfterNextTest { + + final List values = new ArrayList(); + + final Consumer afterNext = new Consumer() { + @Override + public void accept(Integer e) throws Exception { + values.add(-e); + } + }; + + final TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + FlowableDoAfterNextTest.this.values.add(t); + } + }; + + @Test + public void just() { + Flowable.just(1) + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void range() { + Flowable.range(1, 5) + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void empty() { + Flowable.empty() + .doAfterNext(afterNext) + .subscribeWith(ts) + .assertResult(); + + assertTrue(values.isEmpty()); + } + + @Test + public void syncFused() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.range(1, 5) + .doAfterNext(afterNext) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFusedRejected() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + Flowable.range(1, 5) + .doAfterNext(afterNext) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFused() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + UnicastProcessor up = UnicastProcessor.create(); + + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doAfterNext(afterNext) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test(expected = NullPointerException.class) + public void consumerNull() { + Flowable.just(1).doAfterNext(null); + } + + @Test + public void justConditional() { + Flowable.just(1) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1); + + assertEquals(Arrays.asList(1, -1), values); + } + + @Test + public void rangeConditional() { + Flowable.range(1, 5) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(1, -1, 2, -2, 3, -3, 4, -4, 5, -5), values); + } + + @Test + public void errorConditional() { + Flowable.error(new TestException()) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertFailure(TestException.class); + + assertTrue(values.isEmpty()); + } + + @Test + public void emptyConditional() { + Flowable.empty() + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribeWith(ts) + .assertResult(); + + assertTrue(values.isEmpty()); + } + + @Test + public void syncFusedConditional() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.range(1, 5) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFusedRejectedConditional() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + Flowable.range(1, 5) + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void asyncFusedConditional() { + TestSubscriber ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC); + + UnicastProcessor up = UnicastProcessor.create(); + + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doAfterNext(afterNext) + .filter(Functions.alwaysTrue()) + .subscribe(ts0); + + SubscriberFusion.assertFusion(ts0, QueueSubscription.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(Arrays.asList(-1, -2, -3, -4, -5), values); + } + + @Test + public void consumerThrows() { + Flowable.just(1, 2) + .doAfterNext(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void consumerThrowsConditional() { + Flowable.just(1, 2) + .doAfterNext(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void consumerThrowsConditional2() { + Flowable.just(1, 2).hide() + .doAfterNext(new Consumer() { + @Override + public void accept(Integer e) throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class, 1); + } +} diff --git a/src/test/java/io/reactivex/tck/DoAfterNextTckTest.java b/src/test/java/io/reactivex/tck/DoAfterNextTckTest.java new file mode 100644 index 0000000000..127dde188b --- /dev/null +++ b/src/test/java/io/reactivex/tck/DoAfterNextTckTest.java @@ -0,0 +1,31 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.Flowable; +import io.reactivex.internal.functions.Functions; + +@Test +public class DoAfterNextTckTest extends BaseTck { + + @Override + public Publisher createPublisher(long elements) { + return FlowableTck.wrap( + Flowable.range(0, (int)elements).doAfterNext(Functions.emptyConsumer()) + ); + } +} diff --git a/src/test/java/io/reactivex/tck/DoFinallyTckTest.java b/src/test/java/io/reactivex/tck/DoFinallyTckTest.java new file mode 100644 index 0000000000..68dd94ff5f --- /dev/null +++ b/src/test/java/io/reactivex/tck/DoFinallyTckTest.java @@ -0,0 +1,31 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.Flowable; +import io.reactivex.internal.functions.Functions; + +@Test +public class DoFinallyTckTest extends BaseTck { + + @Override + public Publisher createPublisher(long elements) { + return FlowableTck.wrap( + Flowable.range(0, (int)elements).doFinally(Functions.EMPTY_ACTION) + ); + } +} diff --git a/src/test/java/io/reactivex/tck/DoOnNextTckTest.java b/src/test/java/io/reactivex/tck/DoOnNextTckTest.java new file mode 100644 index 0000000000..699a57cf81 --- /dev/null +++ b/src/test/java/io/reactivex/tck/DoOnNextTckTest.java @@ -0,0 +1,31 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.tck; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.Flowable; +import io.reactivex.internal.functions.Functions; + +@Test +public class DoOnNextTckTest extends BaseTck { + + @Override + public Publisher createPublisher(long elements) { + return FlowableTck.wrap( + Flowable.range(0, (int)elements).doOnNext(Functions.emptyConsumer()) + ); + } +}