Closed
Conversation
|
RxJava-pull-requests #582 SUCCESS |
Member
|
This seems like it's a bug inside current Schedulers as recursion should work without memory leaks. Here is the histogram showing the leak: I will dig in to where this leak is occurring. We should not need new subscription or scheduler types to solve this, otherwise anything using schedulers is broken. Code in Java 6 for proving the leak as modified from the original Java 8 bug report: /**
* Generates an observable sequence by iterating a state from an initial
* state until the condition returns false.
*/
public static <TState, R> OnSubscribeFunc<R> generate(
final TState initialState,
final Func1<TState, Boolean> condition,
final Func1<TState, TState> iterate,
final Func1<TState, R> resultSelector,
final Scheduler scheduler) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
return scheduler.schedule(initialState, new Func2<Scheduler, TState, Subscription>() {
@Override
public Subscription call(Scheduler s, TState state) {
boolean hasNext;
try {
hasNext = condition.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
if (hasNext) {
R result;
try {
result = resultSelector.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
observer.onNext(result);
TState nextState;
try {
nextState = iterate.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
return s.schedule(nextState, this);
}
observer.onCompleted();
return Subscriptions.empty();
}
});
}
};
}
public static void main(String[] args) throws Exception {
// Thread.sleep(10000);
Observable<Integer> source = Observable.create(generate(
0, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return true;
}
},
new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t + 1;
}
},
new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t;
}
}, Schedulers.newThread()));
final CountDownLatch latch = new CountDownLatch(1);
Subscription s = source.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(Integer v) {
if (v % 100000 == 0) {
System.out.println(v);
}
if (v >= 10000000) {
latch.countDown();
}
}
});
latch.await();
System.out.println("Wait done.");
s.unsubscribe();
System.out.println("Unsubscribe done.");
}@headinthebox will try in .Net |
Member
|
This memory leak is fixed in #712 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Revised version of PR #643.
scheduleRunnable()overloads toSchedulerdirectly to avoid constant wrapping betweenRunnableandAction0.ReentrantSchedulerto work with a parent scheduler directly.ForwardSubscriptionwithIncrementalSubscriptionas the first one didn't correctly managed the orderly nature of swapping subscriptions: an unfortunate thread scheduling could have swapped in an older subscription before a new subscription.In my opinion, the
Schedulerand its implementations should useRunnableas the internal unit of work instead ofAction0. Since theExecutors requireRunnableanyway, less wrapping means less memory and better performance. TheSchedulerinterface can retain theAction0overloads but no other implementation should need to deal with them.