Conversation
ZacSweers
left a comment
There was a problem hiding this comment.
Looking good so far! Thanks for doing this. Few comments around implementation and some minor nits
| SingleConverter<T, SingleSubscribeProxy<T>>, | ||
| CompletableConverter<CompletableSubscribeProxy> { | ||
| CompletableConverter<CompletableSubscribeProxy>, | ||
| ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>{ |
There was a problem hiding this comment.
nit: let's move this next to FlowableConverter in the declaration
| import io.reactivex.parallel.ParallelFlowable; | ||
|
|
||
| class ParallelFlowableScoper<T> extends Scoper | ||
| implements Function<ParallelFlowable<? extends T>, ParallelFlowableSubscribeProxy<T>> { |
There was a problem hiding this comment.
Since this is never going to be used with to(), let's make this implement the ParallelFlowableConverter directly and skip the indirection of the others. I mostly haven't migrated the others to this pattern yet to give users time to migrate, but they'll all be moved to this for 1.0. This one can start with it since it's new
There was a problem hiding this comment.
Alright, I'll make the change. You want me to open an issue for this and maybe we can work on it as a part of 0.7.0 release?
There was a problem hiding this comment.
Don't worry about it for now. I'll migrate the rest en masse for the 1.0 release when the time comes
|
|
||
| @Override public void subscribe(Subscriber<? super T>[] subscribers) { | ||
| Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length]; | ||
| for (int i = 0; i < subscribers.length; i++) { |
There was a problem hiding this comment.
Interesting... I'm not super familiar with ParallelFlowable, but are there any thread safety concerns here? Is this how other subscribe() calls are handled in its operators?
There was a problem hiding this comment.
Yep, that's what is happening in the subscribe() method of its operators. It iterates over the passed in array of subscribers, creates a new array of subscribers and passes it to the source Flowable.
One very important thing which I forgot to do was checking if the number of subscribers is equal to the parallelism of the flowable (subscribe()'s contract says that they should be equal.) I'll push another commit tonight.
|
|
||
| private static final int DEFAULT_PARALLELISM = 2; | ||
|
|
||
| @Rule public RxErrorsRule rule = new RxErrorsRule(); |
| source.onNext(1); | ||
| source.onNext(2); | ||
| firstSubscriber.assertValue(1); | ||
| secondSubscriber.assertValue(2); |
There was a problem hiding this comment.
Can you explain this part a bit? Not quite sure I followed how this works (probably my lack of familiarity with ParallelFlowable).
There was a problem hiding this comment.
So the ParallelFlowable emits the items to the subscribers in a round robin fashion, and here we are emitting 1 & 2 to two subscribers. This means that the firstSubscriber will only receive 1 and the secondSubscriber would only receive 2 and nothing else.
Summary: Added the missing check in subscribe method which checks if parallelism == subscribers.count(). Also replaced ParallelFlowableScoper with AutoDisposeParallelFlowableConverter.
|
Sidenote - it helps me with reviewing if you address each comment (or logical set of comments) with a single commit and then link its sha as a response to the comment |
|
|
||
| Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length]; | ||
| for (int i = 0; i < subscribers.length; i++) { | ||
| AutoDisposingSubscriberImpl<? super T> subscriber = |
There was a problem hiding this comment.
I kind of wonder if we do need a separate subscriber that manages all the subscribers here. @akarnokd penny for your thoughts?
There was a problem hiding this comment.
Parallel rails are largely independent and act as their own Flowables. That's why many parallel operators just delegate to their sequential versions.
There was a problem hiding this comment.
cool, in that case we can leave this as-is 👍
|
@VisheshVadhera do you have any other changes planned for this? (just checking) |
|
@hzsweers No other changes as of now. We can land this if everything looks fine. |
ZacSweers
left a comment
There was a problem hiding this comment.
Awesome, thanks for this!!
Closes #142