File tree Expand file tree Collapse file tree
main/java/io/reactivex/rxjava3/internal/operators/observable
test/java/io/reactivex/rxjava3/internal/operators Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -349,9 +349,10 @@ public void onSubscribe(Disposable d) {
349349
350350 @ Override
351351 public void onNext (R t ) {
352- if (index == parent .unique ) {
352+ SimpleQueue <R > q = queue ;
353+ if (index == parent .unique && q != null ) {
353354 if (t != null ) {
354- queue .offer (t );
355+ q .offer (t );
355356 }
356357 parent .drain ();
357358 }
Original file line number Diff line number Diff line change @@ -1377,4 +1377,19 @@ Flowable<Integer> createFlowable(AtomicInteger inner) {
13771377 inner .incrementAndGet ();
13781378 });
13791379 }
1380+
1381+ @ Test
1382+ public void innerOnSubscribeOuterCancelRace () {
1383+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
1384+
1385+ Flowable .just (1 )
1386+ .hide ()
1387+ .switchMap (v -> Flowable .just (1 )
1388+ .doOnSubscribe (d -> ts .cancel ())
1389+ .scan (1 , (a , b ) -> a )
1390+ )
1391+ .subscribe (ts );
1392+
1393+ ts .assertEmpty ();
1394+ }
13801395}
Original file line number Diff line number Diff line change @@ -1438,4 +1438,19 @@ Observable<Integer> createObservable(AtomicInteger inner) {
14381438 inner .incrementAndGet ();
14391439 });
14401440 }
1441+
1442+ @ Test
1443+ public void innerOnSubscribeOuterCancelRace () {
1444+ TestObserver <Integer > to = new TestObserver <Integer >();
1445+
1446+ Observable .just (1 )
1447+ .hide ()
1448+ .switchMap (v -> Observable .just (1 )
1449+ .doOnSubscribe (d -> to .dispose ())
1450+ .scan (1 , (a , b ) -> a )
1451+ )
1452+ .subscribe (to );
1453+
1454+ to .assertEmpty ();
1455+ }
14411456}
You can’t perform that action at this time.
0 commit comments