diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index b258519fdf..06830122ff 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -41,7 +41,7 @@ import rx.internal.producers.ProducerArbiter; import rx.observers.Subscribers; import rx.schedulers.Schedulers; -import rx.subjects.BehaviorSubject; +import rx.subjects.*; import rx.subscriptions.SerialSubscription; public final class OnSubscribeRedo implements OnSubscribe { @@ -202,7 +202,7 @@ public void call(final Subscriber child) { // the source observable. We use a BehaviorSubject because subscribeToSource // may emit a terminal before the restarts observable (transformed terminals) // is subscribed - final BehaviorSubject> terminals = BehaviorSubject.create(); + final Subject, Notification> terminals = BehaviorSubject.>create().toSerialized(); final Subscriber> dummySubscriber = Subscribers.empty(); // subscribe immediately so the last emission will be replayed to the next // subscriber (which is the one we care about)