Implement SyncOnSubscribe and AsyncOnSubscribe#220
Conversation
|
I didn't have time yet for a full review, but maybe let's first discuss some desirable properties of an (A)SyncOnSubscribe solution for RxScala. I have the following in mind:
This PR does well on 1) and 2), but not so well on 3) and 4). Regarding 4), I write I don't have a solution for 3) and 4) yet, but will keep thinking about it... @dhoepelman @zsxwing and others, please let me know what you think, and if you have any ideas. |
|
4 is a good point, I'll fix that. I would add a requirement 5: "It should feel natural so use from a Scala context". For 3 We could add the two (currently misnomered) We then have to think about whether we still want to provide the * We could assume a Scala programmer is sufficiently familiar with similar state concepts through things like state monads, |
|
I agree with 5). A note regarding wildcards in type parameters: All RxScala types have declaration site variance annotations ( Here's an idea for 3):
Like this, discoverability should be good: Users browse for Observable.create, then ask "how can I get an (A)SyncOnSubscribe", go to (A)SyncOnSubscribe, and find the constructors. WDYT? |
rvanheest
left a comment
There was a problem hiding this comment.
Found some small things for you to consider.
| */ | ||
| object AsyncOnSubscribe { | ||
| /** | ||
| * Generates an `Observable` that synchronously calls the provided `next` function |
There was a problem hiding this comment.
shouldn't this be 'asynchronously'? (also in other scaladoc sections in this object)
There was a problem hiding this comment.
The next is still synchronously called (see RxJava javadoc). The async in the name comes from next's resultant Observable[T], which can asynchronously deliver up to requested items.
I'll see if I can improve the docs to make this more clear.
| * @see rx.observables.AsyncOnSubscribe.createStateful | ||
| */ | ||
| @Experimental | ||
| def createStateful[S, T](generator: () => S)(next: (S, Long, Observer[Observable[_ <: T]]) => S, onUnsubscribe: S => Unit = (_:S) => ()): Observable[T] = |
There was a problem hiding this comment.
Have you considered a call-by-name parameter for generator? Would look better in using it, I think... (also in other functions in this object)
There was a problem hiding this comment.
I considered this. However the spec clearly states that generator will be called each time an observer subscribes. If we use call-by-name it will only be executed once very early into the program, this would be confusing if the generator causes a side effect.
It also doesn't fit with the intended use-case, e.g. new subscriber means set up a HTTP connection, for each onNext do a request on that connection.
It's only identical if the generator is a pure function which produces an immutable value.
Maybe worth an overload.
There was a problem hiding this comment.
To make sure the generator is called each time an observer subscribes, you just have to put it into a () => generator, as we did in doOnCompleted, so that's not a problem.
However, I'm not sure if we should use a by name argument here, because if people pass a () => expr instead of a by name argument, the errors (at compile time or sometimes even only at run time) can be confusing, see eg here.
And please be careful with overloads: If you want to use overloading, please add tests ensuring that the overloads don't conflict and don't harm type parameter inference, we've had bad experiences with overloads before...
There was a problem hiding this comment.
Ah thanks! Didn't know/think of that semantic.
Errors should generally less confusing, as generally the state won't be of type Unit so there's no automatic conversion from () => Unit to Unit as in the issue you linked. The stateless variants don't have a generator.
An advantage of () => T is that it makes it very explicit that it is a function (thus it will be executed, possible multiple times). I would not expect bar() to be run multiple times in foo(bar()), even if the signature is foo(x: => T), knowledge of the inner working of foo is necessary to know that.
I'll let you/the maintainers decide.
There was a problem hiding this comment.
Yes, it's not as bad as in the issue I linked, but there still might be confusing compile-time errors, and the readability issue you mention. So I'd prefer () => T over of by-name, even though this is a bit less Scala-idiomatic.
| apply[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe()) | ||
|
|
||
| private[scala] class AsyncOnSubscribeImpl[S,T](val generatorF: () => S, val nextF: (S, Long, Observer[Observable[_ <: T]]) => S, val onUnsubscribeF: S => Unit) { | ||
| import rx.lang.scala.JavaConversions._ |
There was a problem hiding this comment.
Since there were plans to deprecate JavaConversions (see comment by @zsxwing), maybe you should pro-actively use JavaConverters instead. (not sure what the status of this plan currently is, though!)
|
@samuelgruetter Missed your last comment, that sounds like a good solution. I'll change the PR to fit it somewhere in the following days and incorporate the other comments. |
0fcf8e9 to
dee5513
Compare
dee5513 to
5b20d2e
Compare
|
@samuelgruetter Updated the PR. Also added I'm not entirely sure I got the variance right, you talked about Only way I could make it work was with: |
samuelgruetter
left a comment
There was a problem hiding this comment.
I added some minor comments, but otherwise it's great 😃
|
|
||
| def apply(observer: Observer[T]): Unit = accept(observer) | ||
|
|
||
| // TODO: Should this be public or private to rx.lang.scala? |
There was a problem hiding this comment.
I think it's nice to have this public, and remove the TODO.
| def map[U](f: T => U): Notification[U] = { | ||
| this match { | ||
| case Notification.OnNext(value) => Notification.OnNext(f(value)) | ||
| // TODO: Or do we want to cast here? Notification.OnError[T] is not a Notification[U] |
There was a problem hiding this comment.
No cast, because it would be a cast that cannot be checked by the JVM, which might create a compile time warning, which would have to be suppressed etc, to complicated... (Unless someone presents benchmarks suggesting that a cast performs better).
| * Semantics: | ||
| * * `generator` is called to provide an initial state on each new subscription | ||
| * * `next` is called with the last state to provide a data item and a new state for the next `next` call | ||
| * * `onUnsubscribe` is called with the state provides by the last next when the observer unsubscribes |
There was a problem hiding this comment.
`onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes
(also in AsyncOnSubscribe)
| package object observables {} | ||
| package object observables { | ||
| type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T] | ||
| type AsyncOnSubscribe[S, +T] = rx.observables.AsyncOnSubscribe[S, _ <: T] |
There was a problem hiding this comment.
Yes, the variance is correct here.
An OnSubscribe[T] is a function Subscribe[T] => Unit, and the left side of the => is a negative position, but being the argument of Subscriber is also a negative position, and minus times minus = plus 😉
| "create(SyncOnSubscribe[S, T])" -> "[TODO]", | ||
| "create(AsyncOnSubscribe[S, T])" -> "[TODO]", | ||
| "create(SyncOnSubscribe[S, T])" -> "create(SyncOnSubscribe[S, T])", | ||
| "create(AsyncOnSubscribe[S, T])" -> "create(AsyncOnSubscribe[S, T])", |
There was a problem hiding this comment.
Can't you just delete these two lines? This list should only mention correspondences where Scala differs from Java, but here the signatures are the same.
| @Test | ||
| def testStateful(): Unit = { | ||
| val last = 2000L | ||
| val sut = Observable.create(AsyncOnSubscribe(() => 0L)((count,demand) => |
There was a problem hiding this comment.
Why sut? Could you please use a generally understandable name or abbreviation?
There was a problem hiding this comment.
I thought this was a generally well known abbreviation for subject under test in unit tests, but apparently that's only the case in my local bubble. Will change.
| } | ||
| )) | ||
| assertEquals((0L to last).toList, sut.toBlocking.toList) | ||
| } |
There was a problem hiding this comment.
This is a good example, could you please also add something like this to RxScalaDemo? Also, adding (A)SyncOnSubscribe requires some information in RxScalaDemo to be updated: createExampleGood is not good any more, but only "medium good" 😉 It respects unsubscription, but not backpressure, and using (A)SyncOnSubscribe would be better. Could you please review all examples related to Observable.create, and add some comments, and examples for (A)SyncOnSubscribe, so that this is all up to date?
fc55140 to
b515226
Compare
|
@samuelgruetter I incorporated your comments and altered the RxDemo file I rewrote all of the I'm not entirely sure about |
samuelgruetter
left a comment
There was a problem hiding this comment.
LGTM 👍
@zsxwing, could you please have a look at this as well before merging, since this is quite an important change, and also because you originally wrote createExampleFromInputStream.
|
LGTM2. |
|
Thanks! |
See #219 for some discussion.
I implemented both option 2 (as
createX) and 3 (asapplyX), as 2 has some major annoyances when used from scala: type interferencing can't determine theTand the user requires at least 2 statements because you need to call the observer (you can't use a single expression).The effects of this can be seen next to each other in the tests.
However, this causes duplicate very-similar functionality. If we want to avoid this I suggest we drop the java style
createXmethods and rename theapplyXmethods tocreateX.