diff --git a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java new file mode 100644 index 0000000000..5ab703c320 --- /dev/null +++ b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java @@ -0,0 +1,50 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.CompletableObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable. + */ +public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable { + final AtomicReference s = new AtomicReference(); + + @Override + public final void onSubscribe(Disposable s) { + if (DisposableHelper.setOnce(this.s, s)) { + onStart(); + } + } + + /** + * Called once the single upstream Disposable is set via onSubscribe. + */ + protected void onStart() { + } + + @Override + public final boolean isDisposed() { + return s.get() == DisposableHelper.DISPOSED; + } + + @Override + public final void dispose() { + DisposableHelper.dispose(s); + } +} diff --git a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java new file mode 100644 index 0000000000..70de2a2346 --- /dev/null +++ b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java @@ -0,0 +1,52 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.MaybeObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable. + * + * @param the received value type + */ +public abstract class DisposableMaybeObserver implements MaybeObserver, Disposable { + final AtomicReference s = new AtomicReference(); + + @Override + public final void onSubscribe(Disposable s) { + if (DisposableHelper.setOnce(this.s, s)) { + onStart(); + } + } + + /** + * Called once the single upstream Disposable is set via onSubscribe. + */ + protected void onStart() { + } + + @Override + public final boolean isDisposed() { + return s.get() == DisposableHelper.DISPOSED; + } + + @Override + public final void dispose() { + DisposableHelper.dispose(s); + } +} diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index dc1b3acf21..1729093e70 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -20,7 +20,7 @@ import io.reactivex.internal.disposables.*; /** - * An abstract Observer that allows asynchronous cancellation by implementing Disposable. + * An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable. * * @param the received value type */ diff --git a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java new file mode 100644 index 0000000000..ce2a2a498a --- /dev/null +++ b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java @@ -0,0 +1,52 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.observers; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.SingleObserver; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable. + * + * @param the received value type + */ +public abstract class DisposableSingleObserver implements SingleObserver, Disposable { + final AtomicReference s = new AtomicReference(); + + @Override + public final void onSubscribe(Disposable s) { + if (DisposableHelper.setOnce(this.s, s)) { + onStart(); + } + } + + /** + * Called once the single upstream Disposable is set via onSubscribe. + */ + protected void onStart() { + } + + @Override + public final boolean isDisposed() { + return s.get() == DisposableHelper.DISPOSED; + } + + @Override + public final void dispose() { + DisposableHelper.dispose(s); + } +}