diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 045eae96af..a89ac32bd7 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -1239,9 +1239,18 @@ Node getInitialHead() { Node prev = get(); Node next = prev.get(); - while (next != null && ((Timestamped)next.value).getTimestampMillis() <= timeLimit) { - prev = next; - next = next.get(); + while (next != null) { + Object o = next.value; + Object v = leaveTransform(o); + if (NotificationLite.isCompleted(v) || NotificationLite.isError(v)) { + break; + } + if (((Timestamped)o).getTimestampMillis() <= timeLimit) { + prev = next; + next = next.get(); + } else { + break; + } } return prev; diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index e4734c30a6..999e343b8e 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -1577,4 +1577,20 @@ public ConnectableObservable call(Observable o) { }); } + @Test + public void noOldEntries() { + TestScheduler scheduler = new TestScheduler(); + + Observable source = Observable.just(1) + .replay(2, TimeUnit.SECONDS, scheduler) + .autoConnect(); + + source.test().assertResult(1); + + source.test().assertResult(1); + + scheduler.advanceTimeBy(3, TimeUnit.SECONDS); + + source.test().assertResult(); + } } \ No newline at end of file diff --git a/src/test/java/rx/subjects/ReplaySubjectTest.java b/src/test/java/rx/subjects/ReplaySubjectTest.java index 8464cf1b3f..dc624367f3 100644 --- a/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -1175,4 +1175,22 @@ public Boolean call(Integer v) { ts2.assertValues(1, 2, 3, 6, 7); } + @Test + public void noOldEntries() { + TestScheduler scheduler = new TestScheduler(); + + ReplaySubject source = ReplaySubject.createWithTime(2, TimeUnit.SECONDS, scheduler); + + source.onNext(1); + source.onCompleted(); + + source.test().assertResult(1); + + source.test().assertResult(1); + + scheduler.advanceTimeBy(3, TimeUnit.SECONDS); + + source.test().assertResult(); + } + }