diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 48d2f3adc6..e5651e5c2e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -7664,6 +7664,26 @@ public final Observable sample(Observable sampler) { return lift(new OperatorSampleWithObservable(sampler)); } + /** + * Allow the an external signal control the amount of data being set through this Observable chain. + * When the control Observable emits false (closes the valve) requests upstream are stopped and any + * requests from downstream for more data are buffered until the control Observable emits a true + * (opens the valve). Should the control Observable error or complete while closed (last control + * emition was a false) an error is sent down the data stream. The granularity breaks up large requests + * from downstream to limit the number of onNexts that are possible after the control valve has closed. + * The smaller the number the tighter the control on the flow but the more overhead there will be in + * managing the requests. + * + * @param control + * an Observable that dictates if request signals propagate upstream + * @param granularity + * the maximum number of outstanding requests. + * @returns an Observable that mostly stops emiting after the control Observable emits a false. + */ + public final Observable pressureValve(Observable control, long granularity) { + return lift(new OperatorValve(control, granularity)); + } + /** * Returns an Observable that applies a specified accumulator function to the first item emitted by a source * Observable, then feeds the result of that function along with the second item emitted by the source diff --git a/src/main/java/rx/internal/operators/OperatorValve.java b/src/main/java/rx/internal/operators/OperatorValve.java new file mode 100644 index 0000000000..07de8041cf --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorValve.java @@ -0,0 +1,204 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; + +/** + * An {@code Observable} that emits the first {@code num} items emitted by the source {@code Observable}. + *

+ * + *

+ * You can choose to pay attention only to the first {@code num} items emitted by an {@code Observable} by using + * the {@code take} operator. This operator returns an {@code Observable} that will invoke a subscriber's + * {@link Subscriber#onNext onNext} function a maximum of {@code num} times before invoking + * {@link Subscriber#onCompleted onCompleted}. + */ +public final class OperatorValve implements Operator { + private final Observable onByDefault; + private final long _granularity; + + public OperatorValve(Observable onByDefault, long granularity) { + this.onByDefault = onByDefault; + this._granularity = granularity; + } + + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + private final long granularity = _granularity; + private Producer p; + private long backlog;// synchronized access on Producer p + private long outstanding;// synchronized access on Producer p + private boolean isOpen = true;// synchronized access on Producer p + private AtomicBoolean terminated = new AtomicBoolean(); + + @Override + public void onCompleted() { + if (terminated.compareAndSet(false, true)) + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (terminated.compareAndSet(false, true)) + child.onError(e); + } + + @Override + public void onNext(T t) { + child.onNext(t); + final long requestUp; + synchronized (this) { + if (--outstanding == 0 && isOpen) { + // all out and still open; check to see if there is a backlog. + if (backlog > granularity) { + // don't request too much at once + requestUp = granularity; + } else if (backlog > 0) { + // the backlog isn't too big + requestUp = backlog; + } else { + // no backlog + requestUp = 0; + } + } else { + // expecting more or closed + requestUp = 0; + } + if (requestUp > 0) { + // do the last of the accounting inside the synchronized block + backlog -= requestUp; + outstanding += requestUp; + } + } + // do the request work outside the synchronized block + if (requestUp != 0) + p.request(requestUp); + } + + @Override + public void setProducer(final Producer p) { + this.p = p; + + onByDefault.unsafeSubscribe(new Subscriber() { + @Override + public void onCompleted() { + boolean _isOpen; + synchronized (this) { + // make sure to get the latest value of isOpen + _isOpen = isOpen; + } + if (!_isOpen) { + if (terminated.compareAndSet(false, true)) { + child.onError(new IllegalStateException("control signal terminated while valve was closed")); + } + } + unsubscribe(); + } + + @Override + public void onError(Throwable e) { + if (terminated.compareAndSet(false, true)) + child.onError(e); + unsubscribe(); + } + + @Override + public void onNext(Boolean open) { + if (open) { + final long requestUp; + synchronized (this) { + if (!isOpen) { + // opening, check backlog. + if (backlog > granularity) { + // don't request too much at once + requestUp = granularity; + } else if (backlog > 0) { + // the backlog isn't too big + requestUp = backlog; + } else { + // no backlog + requestUp = 0; + } + isOpen = true; + } else { + // was already open + requestUp = 0; + } + if (requestUp > 0) { + // do the last of the accounting inside the synchronized block + backlog -= requestUp; + outstanding += requestUp; + } + } + // do the request work outside the synchronized block + if (requestUp > 0) + p.request(requestUp); + } else { + synchronized (this) { + // closing + isOpen = false; + } + } + } + }); + + super.setProducer(new Producer() { + @Override + public void request(long n) { + if (n < 0) + throw new IllegalArgumentException("n >= 0 required but it was " + n); + final long requestUp; + synchronized (this) { + // increase backlog + backlog += n; + // now figure out if what is going to happen to it. + if (!isOpen) { + // closed; don't send + requestUp = 0; + } else { + if (backlog > granularity) { + // don't request too much at once + requestUp = granularity; + } else if (backlog > 0) { + // the backlog isn't too big + requestUp = backlog; + } else { + // no backlog + requestUp = 0; + } + } + if (requestUp > 0) { + // do the last of the accounting inside the synchronized block + backlog -= requestUp; + outstanding += requestUp; + } + } + // do the request work outside the synchronized block + if (requestUp != 0) + p.request(requestUp); + } + }); + } + }; + } +} diff --git a/src/test/java/rx/internal/operators/OperatorValveTest.java b/src/test/java/rx/internal/operators/OperatorValveTest.java new file mode 100644 index 0000000000..ed983d2669 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorValveTest.java @@ -0,0 +1,205 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Producer; +import rx.Scheduler; +import rx.Subscriber; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class OperatorValveTest { + @Test + public void test() { + // setup + PublishSubject control = PublishSubject.create(); + OperatorValve op = new OperatorValve(control, 2); + Subscriber output = mock(Subscriber.class); + Producer inputProducer = mock(Producer.class); + InOrder order = inOrder(output, inputProducer); + + // invoke + Subscriber input = op.call(output); + + // verify + // send in the input's producer + input.setProducer(inputProducer); + // capture the output's producer + ArgumentCaptor outputProducerCaptor = ArgumentCaptor.forClass(Producer.class); + order.verify(output).setProducer(outputProducerCaptor.capture()); + Producer outputProducer = outputProducerCaptor.getValue(); + assertNotNull(outputProducer); + + // start sending request's and data + int i = 0; + + // small request + outputProducer.request(0); + outputProducer.request(1); + order.verify(inputProducer).request(1); + input.onNext(++i); + order.verify(output).onNext(i); + + // larger + outputProducer.request(2); + order.verify(inputProducer).request(2); + input.onNext(++i); + order.verify(output).onNext(i); + input.onNext(++i); + order.verify(output).onNext(i); + + // way too large + outputProducer.request(5); + order.verify(inputProducer).request(2);// request limited called by producer + input.onNext(++i); + order.verify(output).onNext(i); + input.onNext(++i); + order.verify(output).onNext(i); + // next batch + order.verify(inputProducer).request(2);// request limited called from onNext + input.onNext(++i); + order.verify(output).onNext(i); + input.onNext(++i); + order.verify(output).onNext(i); + // next batch + order.verify(inputProducer).request(1); + control.onNext(false); + input.onNext(++i); + order.verify(output).onNext(i); + + // no request while closed + control.onNext(true); + control.onNext(true); + + // small request while closed + control.onNext(false); + outputProducer.request(1); + control.onNext(true); + order.verify(inputProducer).request(1); + input.onNext(++i); + order.verify(output).onNext(i); + + // larger request while closed + control.onNext(false); + outputProducer.request(2); + control.onNext(true); + order.verify(inputProducer).request(2); + input.onNext(++i); + order.verify(output).onNext(i); + input.onNext(++i); + order.verify(output).onNext(i); + + // too large of a request while closed + control.onNext(false); + outputProducer.request(3); + control.onNext(true); + order.verify(inputProducer).request(2); + input.onNext(++i); + order.verify(output).onNext(i); + input.onNext(++i); + order.verify(output).onNext(i); + order.verify(inputProducer).request(1); + input.onNext(++i); + order.verify(output).onNext(i); + + input.onCompleted(); + order.verify(output).onCompleted(); + + // all done + order.verifyNoMoreInteractions(); + } + + @Test + public void testRequestError() { + TestSubscriber tSub = TestSubscriber.create(-1); + Exception e = new Exception(); + Observable. error(e).pressureValve(Observable. never(), 10).subscribe(tSub); + tSub.assertError(e); + } + + @Test + public void testDataError() { + TestSubscriber tSub = TestSubscriber.create(); + Exception e = new Exception(); + Observable. error(e).pressureValve(Observable. never(), 10).subscribe(tSub); + tSub.assertError(e); + } + + @Test + public void testControlError() { + TestSubscriber tSub = TestSubscriber.create(); + Exception e = new Exception(); + Observable. never().pressureValve(Observable. error(e), 10).subscribe(tSub); + tSub.assertError(e); + } + + @Test + public void testControlCompleteOpen() { + TestSubscriber tSub = TestSubscriber.create(); + Observable.just(1, 2, 3).pressureValve(Observable. empty(), 10).subscribe(tSub); + tSub.assertValues(1, 2, 3); + } + + @Test + public void testControlCompleteClosed() { + TestSubscriber tSub = TestSubscriber.create(); + Observable. never().pressureValve(Observable.just(false), 10).subscribe(tSub); + tSub.assertError(IllegalStateException.class); + } + + /* + @Test + public void testObserveOn() { + final AtomicLong counter = new AtomicLong(); + Observable range = Observable.range(0, Integer.MAX_VALUE); + Observable control = Observable.interval(1, TimeUnit.SECONDS).map(new Func1() { + @Override + public Boolean call(Long i) { + System.out.println(); + counter.set(0); + return i % 2 == 1; + } + }); + long granularity = 10; + TestSubscriber tSub = new TestSubscriber(); + range.pressureValve(control, granularity).observeOn(Schedulers.computation()).toBlocking().forEach(new Action1() { + @Override + public void call(Integer t) { + System.out.print(counter.incrementAndGet()+ " \r"); + } + }); + } + + public static void main(String[] args) { + new OperatorValveTest().testObserveOn(); + } + */ +}