diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 68f7530dec..9f0e1134d6 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1818,6 +1818,65 @@ trait Observable[+T] new WithFilter[T](p, asJavaObservable) } + /** + * Returns an Observable that applies the given function to each item emitted by an + * Observable. + * + * @param observer the observer + * + * @return an Observable with the side-effecting behavior applied. + */ + def doOnEach(observer: Observer[T]): Observable[T] = { + Observable[T](asJavaObservable.doOnEach(observer.asJavaObserver)) + } + + /** + * Returns an Observable that applies the given function to each item emitted by an + * Observable. + * + * @param onNext this function will be called whenever the Observable emits an item + * + * @return an Observable with the side-effecting behavior applied. + */ + def doOnEach(onNext: T => Unit): Observable[T] = { + Observable[T](asJavaObservable.doOnEach( + onNext + )) + } + + /** + * Returns an Observable that applies the given function to each item emitted by an + * Observable. + * + * @param onNext this function will be called whenever the Observable emits an item + * @param onError this function will be called if an error occurs + * + * @return an Observable with the side-effecting behavior applied. + */ + def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = { + Observable[T](asJavaObservable.doOnEach( + onNext, + onError + )) + } + + /** + * Returns an Observable that applies the given function to each item emitted by an + * Observable. + * + * @param onNext this function will be called whenever the Observable emits an item + * @param onError this function will be called if an error occurs + * @param onCompleted the action to invoke when the source Observable calls + * + * @return an Observable with the side-effecting behavior applied. + */ + def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = { + Observable[T](asJavaObservable.doOnEach( + onNext, + onError, + onCompleted + )) + } } /**