diff --git a/language-adaptors/rxjava-scala/Rationale.md b/language-adaptors/rxjava-scala/Rationale.md index 5afb2392cc..a8cd47d954 100644 --- a/language-adaptors/rxjava-scala/Rationale.md +++ b/language-adaptors/rxjava-scala/Rationale.md @@ -91,7 +91,7 @@ and consumption of Rx values from Java but not for Scala as a producer. If we take that approach, we can make bindings that feels like a completely native Scala library, without needing any complications of the Scala side. ```scala -object Observer { …} +object Observable { …} trait Observable[+T] { def asJavaObservable: rx.Observable[_ <: T] } diff --git a/language-adaptors/rxjava-scala/ReleaseNotes.md b/language-adaptors/rxjava-scala/ReleaseNotes.md new file mode 100644 index 0000000000..4fdb3a06fd --- /dev/null +++ b/language-adaptors/rxjava-scala/ReleaseNotes.md @@ -0,0 +1,228 @@ +RxScala Release Notes +===================== + +This release of the RxScala bindings builds on the previous 0.15 release to make the Rx bindings for Scala +include all Rx types. In particular this release focuses on fleshing out the bindings for the `Subject` and `Scheduler` +types, as well as aligning the constructor functions for `Observable` with those in the RxJava. + +Expect to see ongoing additions to make the Scala binding match the equivalent underlying Java API, +as well as minor changes in the existing API as we keep fine-tuning the experience on our way to a V1.0 release. + +Observer +-------- + +In this release we have made the `asJavaObserver` property in `Observable[T]`as well the the factory method in the +companion object that takes an `rx.Observer` private to the Scala bindings package, thus properly hiding irrelevant +implementation details from the user-facing API. The `Observer[T]` trait now looks like a clean, native Scala type: + +```scala +trait Observer[-T] { + def onNext(value: T): Unit + def onError(error: Throwable): Unit + def onCompleted(): Unit +} + +object Observer {...} +``` + +To create an instance of a specific `Observer`, say `Observer[SensorEvent]` in user code, you can create a new instance +of the `Observer` trait by implementing any of the methods that you care about: +```scala + val printObserver = new Observer[SensorEvent] { + override def onNext(value: SensorEvent): Unit = {...value.toString...} + } +``` + or you can use one of the overloads of the companion `Observer` object by passing in implementations of the `onNext`, + `onError` or `onCompleted` methods. + +Note that typically you do not need to create an `Observer` since all of the methods that accept an `Observer[T]` +(for instance `subscribe`) usually come with overloads that accept the individual methods +`onNext`, `onError`, and `onCompleted` and will automatically create an `Observer` for you under the covers. + +While *technically* it is a breaking change make the `asJavaObserver` property private, you should probably not have +touched `asJavaObserver` in the first place. If you really feel you need to access the underlying `rx.Observer` +call `toJava`. + +Observable +---------- + +Just like for `Observer`, the `Observable` trait now also hides its `asJavaObservable` property and makes the constructor +function in the companion object that takes an `rx.Observable` private (but leaves the companion object itself public). +Again, while *technically* this is a breaking change, this should not have any influence on user code. + +```scala +trait Observable[+T] { + def subscribe(observer: Observer[T]): Subscription = {...} + def apply(observer: Observer[T]): Subscription = {...} + ... +} +object Observable { + def create[T](func: Observer[T] => Subscription): Observable[T] = {...} + ... +} +``` + +The major changes in `Observable` are wrt to the factory methods where too libral use of overloading of the `apply` +method hindered type inference and made Scala code look unnecessarily different than that in other language bindings. +All factory methods now have their own name corresponding to the Java and .NET operators +(plus overloads that take a `Scheduler`). + +* `def from[T](future: Future[T]): Observable[T]`, +* `def from[T](iterable: Iterable[T]): Observable[T]`, +* `def error[T](exception: Throwable): Observable[T]`, +* `def empty[T]: Observable[T]`, +* `def items[T](items: T*): Observable[T], +* Extension method on `toObservable: Observable[T]` on `List[T]`. + +In the *pre-release* of this version, we expose both `apply` and `create` for the mother of all creation functions. +We would like to solicit feedback which of these two names is preferred +(or both, but there is a high probability that only one will be chosen). + +* `def apply[T](subscribe: Observer[T]=>Subscription): Observable[T]` +* `def create[T](subscribe: Observer[T] => Subscription): Observable[T]` + +Subject +------- + +The `Subject` trait now also hides the underlying Java `asJavaSubject: rx.subjects.Subject[_ >: T, _<: T]` +and takes only a single *invariant* type parameter `T`. all existing implementations of `Subject` are parametrized +by a single type, and this reflects that reality. + +```scala +trait Subject[T] extends Observable[T] with Observer[T] {} +object Subject { + def apply(): Subject[T] = {...} +} +``` +For each kind of subject, there is a class with a private constructor and a companion object that you should use +to create a new kind of subject. The subjects that are available are: + +* `AsyncSubject[T]()`, +* `BehaviorSubject[T](value)`, +* `Subject[T]()`, +* `ReplaySubject[T]()`. + +The latter is still missing various overloads http://msdn.microsoft.com/en-us/library/hh211810(v=vs.103).aspx which +you can expect to appear once they are added to the underlying RxJava implementation. + +Compared with release 0.15.1, the breaking changes in `Subject` for this release are +making `asJavaSubject` private, and collapsing its type parameters, neither of these should cause trouble, +and renaming `PublishSubject` to `Subject`. + +Schedulers +---------- + +The biggest breaking change compared to the 0.15.1 release is giving `Scheduler` the same structure as the other types. +The trait itself remains unchanged, except that we made the underlying Java representation hidden as above. +as part of this reshuffling, the scheduler package has been renamed from `rx.lang.scala.concurrency` +to `rx.lang.scala.schedulers`. There is a high probability that this package renaming will also happen in RxJava. + +```scala +trait Scheduler {...} +``` + +In the previous release, you created schedulers by selecting them from the `Schedulers` object, +as in `Schedulers.immediate` or `Schedulers.newThread` where each would return an instance of the `Scheduler` trait. +However, several of the scheduler implementations have additional methods, such as the `TestScheduler`, +which already deviated from the pattern. + +In this release, we changed this to make scheduler more like `Subject` and provide a family of schedulers +that you create using their factory function: + +* `CurrentThreadScheduler()`, +* `ExecutorScheduler(executor)`, +* `ImmediateScheduler()`, +* `NewThreadScheduler()`, +* `ScheduledExecutorServiceScheduler(scheduledExecutorService)`, +* `TestScheduler()`, +* `ThreadPoolForComputationScheduler()`, +* `ThreadPoolForIOScheduler()`. + +In the future we expect that this list will grow further with new schedulers as they are imported from .NET +(http://msdn.microsoft.com/en-us/library/system.reactive.concurrency(v=vs.103).aspx). + +To make your code compile in the new release you will have to change all occurrences of `Schedulers.xxx` +into `XxxScheduler()`, and import `rx.lang.scala.schedulers` instead of `rx.lang.scala.schedulers`. + +Subscriptions +------------- + +The `Subscription` trait in Scala now has `isUnsubscribed` as a member, effectively collapsing the old `Subscription` +and `BooleanSubscription`, and the latter has been removed from the public surface. Pending a bug fix in RxJava, +`SerialSubscription` implements its own `isUnsubscribed`. + + +```scala +trait Subscription { + def unsubscribe(): Unit = { ... } + def isUnsubscribed: Boolean = ... +} + +object Subscription {...} + ``` + + To create a `Subscription` use one of the following factory methods: + + * `Subscription{...}`, `Subscription()`, + * `CompositeSubscription(subscriptions)`, + * `MultipleAssignmentSubscription()`, + * `SerialSubscription()`. + + In case you do feel tempted to call `new Subscription{...}` directly make sure you wire up `isUnsubscribed` + and `unsubscribe()` properly, but for all practical purposes you should just use one of the factory methods. + +Notifications +------------- + +All underlying wrapped `Java` types in the `Notification` trait are made private like all previous types. The companion +objects of `Notification` now have both constructor (`apply`) and extractor (`unapply`) functions: + +```scala +object Notification {...} +trait Notification[+T] { + override def equals(that: Any): Boolean = {...} + override def hashCode(): Int = {...} + def apply[R](onNext: T=>R, onError: Throwable=>R, onCompleted: ()=>R): R = {...} +} +``` +The nested companion objects of `Notification` now have both constructor (`apply`) and extractor (`unapply`) functions: +```scala +object Notification { + object OnNext { def apply(...){}; def unapply(...){...} } + object OnError { def apply(...){}; def unapply(...){...} } + object OnCompleted { def apply(...){}; def unapply(...){...} } +} +``` +To construct a `Notification`, you import `rx.lang.scala.Notification._` and use `OnNext("hello")`, +or `OnError(new Exception("Oops!"))`, or `OnCompleted()`. + +To pattern match on a notification you create a partial function like so: `case Notification.OnNext(v) => { ... v ... }`, +or you use the `apply` function to pass in functions for each possibility. + +There are no breaking changes for notifications. + +Java Interop Helpers +-------------------- + +Since the Scala traits *wrap* the underlying Java types, yoo may occasionally will have to wrap an unwrap +between the two representations. The `JavaConversion` object provides helper functions of the form `toJavaXXX` and +`toScalaXXX` for this purpose, properly hiding how precisely the wrapped types are stored. +Note the (un)wrap conversions are defined as implicits in Scala, but in the unlikely event that you do need them +be kind to the reader of your code and call them explicitly. + +```scala +object JavaConversions { + import language.implicitConversions + + implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = {...} + implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = {...} + implicit def toJavaSubscription(s: Subscription): rx.Subscription = {...} + implicit def toScalaSubscription(s: rx.Subscription): Subscription = {...} + implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = {...} + implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = {...} + implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = {...} + implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = {...} + implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = {...} + implicit def toScalaObservable[T](observable: rx.Observable[_ <: T]): Observable[T] = {...} +} +``` \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala index 7a11bdf539..ceea9c3b8e 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala @@ -21,8 +21,8 @@ import scala.concurrent.duration._ object Olympics { case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String) - def mountainBikeMedals: Observable[Medal] = Observable( - Observable( + def mountainBikeMedals: Observable[Medal] = Observable.items( + Observable.items( Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"), Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"), Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"), @@ -31,7 +31,7 @@ object Olympics { Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America") ), fourYearsEmpty, - Observable( + Observable.items( Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"), Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"), Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"), @@ -40,7 +40,7 @@ object Olympics { Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland") ), fourYearsEmpty, - Observable( + Observable.items( Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"), Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"), Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"), @@ -49,7 +49,7 @@ object Olympics { Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany") ), fourYearsEmpty, - Observable( + Observable.items( Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"), Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"), Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"), @@ -58,7 +58,7 @@ object Olympics { Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland") ), fourYearsEmpty, - Observable( + Observable.items( Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"), Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"), Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"), @@ -80,7 +80,7 @@ object Olympics { // So we don't use this: // Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false) // But we just return empty, which completes immediately - Observable() + Observable.empty[Medal] } } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 3dbd9461fd..7bf832f58e 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -30,7 +30,7 @@ import org.junit.Test import org.scalatest.junit.JUnitSuite import rx.lang.scala._ -import rx.lang.scala.concurrency._ +import rx.lang.scala.schedulers._ @Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { @@ -69,14 +69,14 @@ class RxScalaDemo extends JUnitSuite { } @Test def testSwitchOnObservableOfInt() { - // Correctly rejected with error + // Correctly rejected with error // "Cannot prove that Observable[Int] <:< Observable[Observable[U]]" // val o = Observable(1, 2).switch } @Test def testObservableComparison() { - val first = Observable(10, 11, 12) - val second = Observable(10, 11, 12) + val first = Observable.from(List(10, 11, 12)) + val second = Observable.from(List(10, 11, 12)) val b1 = (first zip second) map (p => p._1 == p._2) forall (b => b) @@ -88,8 +88,8 @@ class RxScalaDemo extends JUnitSuite { } @Test def testObservableComparisonWithForComprehension() { - val first = Observable(10, 11, 12) - val second = Observable(10, 11, 12) + val first = Observable.from(List(10, 11, 12)) + val second = Observable.from(List(10, 11, 12)) val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2) @@ -99,8 +99,8 @@ class RxScalaDemo extends JUnitSuite { } @Test def testStartWithIsUnnecessary() { - val before = Observable(-2, -1, 0) - val source = Observable(1, 2, 3) + val before = List(-2, -1, 0).toObservable + val source = List(1, 2, 3).toObservable println((before ++ source).toBlockingObservable.toList) } @@ -124,11 +124,11 @@ class RxScalaDemo extends JUnitSuite { @Test def fattenSomeExample() { // To merge some observables which are all known already: - Observable( + List( Observable.interval(200 millis), Observable.interval(400 millis), Observable.interval(800 millis) - ).flatten.take(12).toBlockingObservable.foreach(println(_)) + ).toObservable.flatten.take(12).toBlockingObservable.foreach(println(_)) } @Test def rangeAndBufferExample() { @@ -143,7 +143,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def testReduce() { - assertEquals(10, Observable(1, 2, 3, 4).reduce(_ + _).toBlockingObservable.single) + assertEquals(10, List(1, 2, 3, 4).toObservable.reduce(_ + _).toBlockingObservable.single) } @Test def testForeach() { @@ -157,7 +157,7 @@ class RxScalaDemo extends JUnitSuite { } @Test def testForComprehension() { - val observables = Observable(Observable(1, 2, 3), Observable(10, 20, 30)) + val observables = List(List(1, 2, 3).toObservable, List(10, 20, 30).toObservable).toObservable val squares = (for (o <- observables; i <- o if i % 2 == 0) yield i*i) assertEquals(squares.toBlockingObservable.toList, List(4, 100, 400, 900)) } @@ -185,14 +185,14 @@ class RxScalaDemo extends JUnitSuite { } @Test def testGroupByThenFlatMap() { - val m = Observable(1, 2, 3, 4) + val m = List(1, 2, 3, 4).toObservable val g = m.groupBy(i => i % 2) val t = g.flatMap((p: (Int, Observable[Int])) => p._2) assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList) } @Test def testGroupByThenFlatMapByForComprehension() { - val m = Observable(1, 2, 3, 4) + val m = List(1, 2, 3, 4).toObservable val g = m.groupBy(i => i % 2) val t = for ((i, o) <- g; n <- o) yield n assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList) @@ -250,13 +250,13 @@ class RxScalaDemo extends JUnitSuite { } @Test def exampleWithoutPublish() { - val unshared = Observable(1 to 4) + val unshared = List(1 to 4).toObservable unshared.subscribe(n => println(s"subscriber 1 gets $n")) unshared.subscribe(n => println(s"subscriber 2 gets $n")) } @Test def exampleWithPublish() { - val unshared = Observable(1 to 4) + val unshared = List(1 to 4).toObservable val (startFunc, shared) = unshared.publish shared.subscribe(n => println(s"subscriber 1 gets $n")) shared.subscribe(n => println(s"subscriber 2 gets $n")) @@ -288,9 +288,9 @@ class RxScalaDemo extends JUnitSuite { } @Test def testSingleOption() { - assertEquals(None, Observable(1, 2).toBlockingObservable.singleOption) - assertEquals(Some(1), Observable(1) .toBlockingObservable.singleOption) - assertEquals(None, Observable() .toBlockingObservable.singleOption) + assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption) + assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption) + assertEquals(None, List().toObservable.toBlockingObservable.singleOption) } // We can't put a general average method into Observable.scala, because Scala's Numeric @@ -301,58 +301,58 @@ class RxScalaDemo extends JUnitSuite { } @Test def averageExample() { - println(doubleAverage(Observable()).toBlockingObservable.single) - println(doubleAverage(Observable(0)).toBlockingObservable.single) - println(doubleAverage(Observable(4.44)).toBlockingObservable.single) - println(doubleAverage(Observable(1, 2, 3.5)).toBlockingObservable.single) + println(doubleAverage(Observable.empty[Double]).toBlockingObservable.single) + println(doubleAverage(List(0.0).toObservable).toBlockingObservable.single) + println(doubleAverage(List(4.44).toObservable).toBlockingObservable.single) + println(doubleAverage(List(1, 2, 3.5).toObservable).toBlockingObservable.single) } @Test def testSum() { - assertEquals(10, Observable(1, 2, 3, 4).sum.toBlockingObservable.single) - assertEquals(6, Observable(4, 2).sum.toBlockingObservable.single) - assertEquals(0, Observable[Int]().sum.toBlockingObservable.single) + assertEquals(10, List(1, 2, 3, 4).toObservable.sum.toBlockingObservable.single) + assertEquals(6, List(4, 2).toObservable.sum.toBlockingObservable.single) + assertEquals(0, List[Int]().toObservable.sum.toBlockingObservable.single) } @Test def testProduct() { - assertEquals(24, Observable(1, 2, 3, 4).product.toBlockingObservable.single) - assertEquals(8, Observable(4, 2).product.toBlockingObservable.single) - assertEquals(1, Observable[Int]().product.toBlockingObservable.single) + assertEquals(24, List(1, 2, 3, 4).toObservable.product.toBlockingObservable.single) + assertEquals(8, List(4, 2).toObservable.product.toBlockingObservable.single) + assertEquals(1, List[Int]().toObservable.product.toBlockingObservable.single) } @Test def mapWithIndexExample() { // We don't need mapWithIndex because we already have zipWithIndex, which we can easily // combine with map: - Observable("a", "b", "c").zipWithIndex.map(pair => pair._1 + " has index " + pair._2) + List("a", "b", "c").toObservable.zipWithIndex.map(pair => pair._1 + " has index " + pair._2) .toBlockingObservable.foreach(println(_)) // Or even nicer with for-comprehension syntax: - (for ((letter, index) <- Observable("a", "b", "c").zipWithIndex) yield letter + " has index " + index) + (for ((letter, index) <- List("a", "b", "c").toObservable.zipWithIndex) yield letter + " has index " + index) .toBlockingObservable.foreach(println(_)) } // source Observables are all known: @Test def zip3Example() { - val o = Observable.zip(Observable(1, 2), Observable(10, 20), Observable(100, 200)) + val o = Observable.zip(List(1, 2).toObservable, List(10, 20).toObservable, List(100, 200).toObservable) (for ((n1, n2, n3) <- o) yield s"$n1, $n2 and $n3") .toBlockingObservable.foreach(println(_)) } // source Observables are in an Observable: @Test def zipManyObservableExample() { - val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200)) + val observables = List(List(1, 2).toObservable, List(10, 20).toObservable, List(100, 200).toObservable).toObservable (for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")")) .toBlockingObservable.foreach(println(_)) } @Test def takeFirstWithCondition() { val condition: Int => Boolean = _ >= 3 - assertEquals(3, Observable(1, 2, 3, 4).filter(condition).first.toBlockingObservable.single) + assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single) } @Test def firstOrDefaultWithCondition() { val condition: Int => Boolean = _ >= 3 - assertEquals(3, Observable(1, 2, 3, 4).filter(condition).firstOrElse(10).toBlockingObservable.single) - assertEquals(10, Observable(-1, 0, 1).filter(condition).firstOrElse(10).toBlockingObservable.single) + assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).firstOrElse(10).toBlockingObservable.single) + assertEquals(10, List(-1, 0, 1).toObservable.filter(condition).firstOrElse(10).toBlockingObservable.single) } def square(x: Int): Int = { @@ -379,9 +379,9 @@ class RxScalaDemo extends JUnitSuite { } @Test def toSortedList() { - assertEquals(Seq(7, 8, 9, 10), Observable(10, 7, 8, 9).toSeq.map(_.sorted).toBlockingObservable.single) + assertEquals(Seq(7, 8, 9, 10), List(10, 7, 8, 9).toObservable.toSeq.map(_.sorted).toBlockingObservable.single) val f = (a: Int, b: Int) => b < a - assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single) + assertEquals(Seq(10, 9, 8, 7), List(10, 7, 8, 9).toObservable.toSeq.map(_.sortWith(f)).toBlockingObservable.single) } @Test def timestampExample() { @@ -410,7 +410,7 @@ class RxScalaDemo extends JUnitSuite { @Test def materializeExample2() { import Notification._ - Observable(1, 2, 3).materialize.subscribe(n => n match { + List(1, 2, 3).toObservable.materialize.subscribe(n => n match { case OnNext(v) => println("Got value " + v) case OnCompleted() => println("Completed") case OnError(err) => println("Error: " + err.getMessage) @@ -418,17 +418,17 @@ class RxScalaDemo extends JUnitSuite { } @Test def elementAtReplacement() { - assertEquals("b", Observable("a", "b", "c").drop(1).first.toBlockingObservable.single) + assertEquals("b", List("a", "b", "c").toObservable.drop(1).first.toBlockingObservable.single) } @Test def elementAtOrDefaultReplacement() { - assertEquals("b", Observable("a", "b", "c").drop(1).firstOrElse("!").toBlockingObservable.single) - assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single) + assertEquals("b", List("a", "b", "c").toObservable.drop(1).firstOrElse("!").toBlockingObservable.single) + assertEquals("!!", List("a", "b", "c").toObservable.drop(10).firstOrElse("!!").toBlockingObservable.single) } @Test def takeWhileWithIndexAlternative { val condition = true - Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1) + List("a", "b").toObservable.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1) } @Test def createExample() { diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala index bc1817bdf4..2e53f1afe9 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala @@ -1,18 +1,19 @@ package rx.lang.scala.examples +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps + import org.junit.Test +import org.mockito.Matchers._ +import org.mockito.Mockito._ import org.scalatest.junit.JUnitSuite -import scala.concurrent.duration._ -import scala.language.postfixOps -import rx.lang.scala.{ Observable, Observer } -import rx.lang.scala.concurrency.TestScheduler + +import rx.lang.scala._ +import rx.lang.scala.schedulers.TestScheduler class TestSchedulerExample extends JUnitSuite { @Test def testInterval() { - import org.mockito.Matchers._ - import org.mockito.Mockito._ - val scheduler = TestScheduler() // Use a Java Observer for Mockito val observer = mock(classOf[rx.Observer[Long]]) @@ -46,3 +47,5 @@ class TestSchedulerExample extends JUnitSuite { } } + + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala index 276322f935..cc380c463c 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala @@ -9,7 +9,7 @@ package rx.lang.scala object JavaConversions { import language.implicitConversions - implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava + implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJavaNotification implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s) @@ -29,8 +29,7 @@ object JavaConversions { implicit def toScalaObservable[T](observable: rx.Observable[_ <: T]): Observable[T] = { new Observable[T]{ - def asJavaObservable = observable + val asJavaObservable = observable } } - } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala index 8a254af08c..616fff22c2 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala @@ -19,12 +19,50 @@ package rx.lang.scala * Emitted by Observables returned by [[rx.lang.scala.Observable.materialize]]. */ sealed trait Notification[+T] { - def asJava: rx.Notification[_ <: T] + private [scala] val asJavaNotification: rx.Notification[_ <: T] + override def equals(that: Any): Boolean = that match { - case other: Notification[_] => asJava.equals(other.asJava) + case other: Notification[_] => asJavaNotification.equals(other.asJavaNotification) case _ => false } - override def hashCode(): Int = asJava.hashCode() + override def hashCode(): Int = asJavaNotification.hashCode() + + /** + * Invokes the function corresponding to the notification. + * + * @param onNext + * The function to invoke for an [[rx.lang.scala.Notification.OnNext]] notification. + * @param onError + * The function to invoke for an [[rx.lang.scala.Notification.OnError]] notification. + * @param onCompleted + * The function to invoke for an [[rx.lang.scala.Notification.OnCompleted]] notification. + */ + def accept[R](onNext: T=>R, onError: Throwable=>R, onCompleted: ()=>R): R = { + this match { + case Notification.OnNext(value) => onNext(value) + case Notification.OnError(error) => onError(error) + case Notification.OnCompleted() => onCompleted() + } + } + + def apply[R](onNext: T=>R, onError: Throwable=>R, onCompleted: ()=>R): R = + accept(onNext, onError, onCompleted) + + /** + * Invokes the observer corresponding to the notification + * + * @param observer + * The observer that to observe the notification + */ + def accept(observer: Observer[T]): Unit = { + this match { + case Notification.OnNext(value) => observer.onNext(value) + case Notification.OnError(error) => observer.onError(error) + case Notification.OnCompleted() => observer.onCompleted() + } + } + + def apply(observer: Observer[T]): Unit = accept(observer) } /** @@ -34,15 +72,15 @@ sealed trait Notification[+T] { * {{{ * import Notification._ * Observable(1, 2, 3).materialize.subscribe(n => n match { - * case OnNext(v) => println("Got value " + v) + * case OnNext(v) => println("Got value " + v) * case OnCompleted() => println("Completed") - * case OnError(err) => println("Error: " + err.getMessage) + * case OnError(err) => println("Error: " + err.getMessage) * }) * }}} */ object Notification { - def apply[T](n: rx.Notification[_ <: T]): Notification[T] = n.getKind match { + private [scala] def apply[T](n: rx.Notification[_ <: T]): Notification[T] = n.getKind match { case rx.Notification.Kind.OnNext => new OnNext(n) case rx.Notification.Kind.OnCompleted => new OnCompleted(n) case rx.Notification.Kind.OnError => new OnError(n) @@ -50,56 +88,89 @@ object Notification { // OnNext, OnError, OnCompleted are not case classes because we don't want pattern matching // to extract the rx.Notification - - class OnNext[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { - def value: T = asJava.getValue - override def toString = s"OnNext($value)" - } - + object OnNext { + /** + * Constructor for onNext notifications. + * + * @param value + * The item passed to the onNext method. + */ def apply[T](value: T): Notification[T] = { Notification(new rx.Notification[T](value)) } - def unapply[U](n: Notification[U]): Option[U] = n match { - case n2: OnNext[U] => Some(n.asJava.getValue) + /** + * Extractor for onNext notifications. + * @param notification + * The [[rx.lang.scala.Notification]] to be destructed. + * @return + * The item contained in this notification. + */ + def unapply[U](notification: Notification[U]): Option[U] = notification match { + case onNext: OnNext[U] => Some(onNext.value) case _ => None } } - - class OnError[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { - def error: Throwable = asJava.getThrowable - override def toString = s"OnError($error)" + + class OnNext[+T] private[scala] (val asJavaNotification: rx.Notification[_ <: T]) extends Notification[T] { + def value: T = asJavaNotification.getValue + override def toString = s"OnNext($value)" } - + object OnError { + /** + * Constructor for onError notifications. + * + * @param error + * The exception passed to the onNext method. + */ def apply[T](error: Throwable): Notification[T] = { Notification(new rx.Notification[T](error)) } - def unapply[U](n: Notification[U]): Option[Throwable] = n match { - case n2: OnError[U] => Some(n2.asJava.getThrowable) + /** + * Destructor for onError notifications. + * + * @param notification + * The [[rx.lang.scala.Notification]] to be deconstructed + * @return + * The [[java.lang.Throwable]] value contained in this notification. + */ + def unapply[U](notification: Notification[U]): Option[Throwable] = notification match { + case onError: OnError[U] => Some(onError.error) case _ => None } } - - class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { - override def toString = "OnCompleted()" + + class OnError[+T] private[scala] (val asJavaNotification: rx.Notification[_ <: T]) extends Notification[T] { + def error: Throwable = asJavaNotification.getThrowable + override def toString = s"OnError($error)" } - + object OnCompleted { + /** + * Constructor for onCompleted notifications. + */ def apply[T](): Notification[T] = { Notification(new rx.Notification()) } - def unapply[U](n: Notification[U]): Option[Unit] = n match { - case n2: OnCompleted[U] => Some() + /** + * Extractor for onCompleted notifications. + */ + def unapply[U](notification: Notification[U]): Option[Unit] = notification match { + case onCompleted: OnCompleted[U] => Some() case _ => None } } + class OnCompleted[T] private[scala](val asJavaNotification: rx.Notification[_ <: T]) extends Notification[T] { + override def toString = "OnCompleted()" + } + } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 8d6ffbb48e..6db7d41e25 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -19,6 +19,8 @@ package rx.lang.scala import rx.util.functions.FuncN import rx.Observable.OnSubscribeFunc + + /** * The Observable interface that implements the Reactive Pattern. * @@ -79,7 +81,16 @@ trait Observable[+T] import ImplicitFunctionConversions._ import JavaConversions._ - def asJavaObservable: rx.Observable[_ <: T] + private [scala] val asJavaObservable: rx.Observable[_ <: T] + + /** + * $subscribeObserverMain + * + * @return $subscribeAllReturn + */ + def subscribe(): Subscription = { + asJavaObservable.subscribe() + } /** * $subscribeObserverMain @@ -102,9 +113,17 @@ trait Observable[+T] asJavaObservable.subscribe(observer.asJavaObserver) } + /** + * $subscribeObserverMain + * + * @param observer $subscribeObserverParamObserver + * @return $subscribeAllReturn + */ + def apply(observer: Observer[T]): Subscription = subscribe(observer) + /** * $subscribeCallbacksMainNoNotifications - * `` + * * @param onNext $subscribeCallbacksParamOnNext * @return $subscribeAllReturn */ @@ -190,14 +209,13 @@ trait Observable[+T] * * @param subject * the `rx.lang.scala.subjects.Subject` to push source items into - * @tparam R - * result type * @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function * is called, the Observable starts to push results into the specified Subject */ - def multicast[R](subject: rx.lang.scala.Subject[T, R]): (() => Subscription, Observable[R]) = { - val javaCO = asJavaObservable.multicast[R](subject.asJavaSubject) - (() => javaCO.connect(), toScalaObservable[R](javaCO)) + def multicast[R >: T](subject: rx.lang.scala.Subject[R]): (() => Subscription, Observable[R]) = { + val s: rx.subjects.Subject[_ >: T, _<: R] = subject.asJavaSubject + val javaCO: rx.observables.ConnectableObservable[R] = asJavaObservable.multicast(s) + (() => javaCO.connect(), toScalaObservable(javaCO)) } /** @@ -525,7 +543,7 @@ trait Observable[+T] def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = { val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func) - val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => { + val o2 = Observable.items(o1).map((x: rx.Observable[_]) => { val x2 = x.asInstanceOf[rx.Observable[_ <: T]] toScalaObservable[T](x2) }) @@ -836,7 +854,7 @@ trait Observable[+T] // with =:= it does not work, why? def dematerialize[U](implicit evidence: Observable[T] <:< Observable[Notification[U]]): Observable[U] = { val o1: Observable[Notification[U]] = this - val o2: Observable[rx.Notification[_ <: U]] = o1.map(_.asJava) + val o2: Observable[rx.Notification[_ <: U]] = o1.map(_.asJavaNotification) val o3 = o2.asJavaObservable.dematerialize[U]() toScalaObservable[U](o3) } @@ -1140,7 +1158,7 @@ trait Observable[+T] */ def forall(predicate: T => Boolean): Observable[Boolean] = { // type mismatch; found : rx.Observable[java.lang.Boolean] required: rx.Observable[_ <: scala.Boolean] - // new Observable[Boolean](asJava.all(predicate)) + // new Observable[Boolean](asJavaNotification.all(predicate)) // it's more fun in Scala: this.map(predicate).foldLeft(true)(_ && _) } @@ -1906,13 +1924,13 @@ object Observable { private[scala] def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = { - val oScala1: Observable[java.util.List[T]] = new Observable[java.util.List[T]]{ def asJavaObservable = jObs } + val oScala1: Observable[java.util.List[T]] = new Observable[java.util.List[T]]{ val asJavaObservable = jObs } oScala1.map((lJava: java.util.List[T]) => lJava.asScala) } private[scala] def jObsOfJObsToScObsOfScObs[T](jObs: rx.Observable[_ <: rx.Observable[_ <: T]]): Observable[Observable[T]] = { - val oScala1: Observable[rx.Observable[_ <: T]] = new Observable[rx.Observable[_ <: T]]{ def asJavaObservable = jObs } + val oScala1: Observable[rx.Observable[_ <: T]] = new Observable[rx.Observable[_ <: T]]{ val asJavaObservable = jObs } oScala1.map((oJava: rx.Observable[_ <: T]) => oJava) } @@ -1966,6 +1984,48 @@ object Observable { toScalaObservable[T](rx.Observable.error(exception)) } + /** + * Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and + * immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method + * with the specified scheduler. + *

+ * + * + * @param scheduler the scheduler to call the + [[rx.lang.scala.Observer#onCompleted onCompleted]] method + * @param T the type of the items (ostensibly) emitted by the Observable + * @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and + * immediately invokes the [[rx.lang.scala.Observer]]r's + * [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the + * specified scheduler + * @see RxJava Wiki: empty() + * @see MSDN: Observable.Empty Method (IScheduler) + */ + def empty[T]: Observable[T] = { + toScalaObservable(rx.Observable.empty[T]()) + } + + /** + * Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and + * immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method + * with the specified scheduler. + *

+ * + * + * @param scheduler the scheduler to call the + [[rx.lang.scala.Observer#onCompleted onCompleted]] method + * @param T the type of the items (ostensibly) emitted by the Observable + * @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and + * immediately invokes the [[rx.lang.scala.Observer]]r's + * [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the + * specified scheduler + * @see RxJava Wiki: empty() + * @see MSDN: Observable.Empty Method (IScheduler) + */ + def empty[T](scheduler: Scheduler): Observable[T] = { + toScalaObservable(rx.Observable.empty[T](scalaSchedulerToJavaScheduler(scheduler))) + } + /** * Converts a sequence of values into an Observable. * @@ -1982,7 +2042,7 @@ object Observable { * resulting Observable * @return an Observable that emits each item in the source Array */ - def apply[T](items: T*): Observable[T] = { + def items[T](items: T*): Observable[T] = { toScalaObservable[T](rx.Observable.from(items.toIterable.asJava)) } @@ -2015,7 +2075,7 @@ object Observable { * the sequence before it completes. * * @param iterable the source `Iterable` sequence - * @param the type of items in the `Iterable` sequence and the + * @param T the type of items in the `Iterable` sequence and the * type of items to be emitted by the resulting Observable * @return an Observable that emits each item in the source `Iterable` * sequence @@ -2024,6 +2084,20 @@ object Observable { toScalaObservable(rx.Observable.from(iterable.asJava)) } + /** + * + * @param iterable the source `Iterable` sequence + * @param scheduler the scheduler to use + * @tparam T the type of items in the `Iterable` sequence and the + * type of items to be emitted by the resulting Observable + * @return an Observable that emits each item in the source `Iterable` + * sequence + */ + def from[T](iterable: Iterable[T], scheduler: Scheduler): Observable[T] = { + toScalaObservable(rx.Observable.from(iterable.asJava, scheduler.asJavaScheduler)) + } + + /** * Returns an Observable that calls an Observable factory to create its Observable for each * new Observer that subscribes. That is, for each subscriber, the actual Observable is determined diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala index 68f3e95cd1..6e956dd1a1 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala @@ -15,8 +15,6 @@ */ package rx.lang.scala -import rx.joins.ObserverBase - /** Provides a mechanism for receiving push-based notifications. * @@ -26,10 +24,11 @@ import rx.joins.ObserverBase */ trait Observer[-T] { - private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] { - protected def onCompletedCore(): Unit = onCompleted() - protected def onErrorCore(error: Throwable): Unit = onError(error) - protected def onNextCore(value: T): Unit = onNext(value) + // Java calls XXX, Scala receives XXX + private [scala] val asJavaObserver: rx.Observer[_ >: T] = new rx.Observer[T] { + def onNext(value: T): Unit = Observer.this.onNext(value) + def onError(error: Throwable): Unit = Observer.this.onError(error) + def onCompleted(): Unit = Observer.this.onCompleted() } /** @@ -39,37 +38,51 @@ trait Observer[-T] { * * The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`. */ - def onNext(value: T): Unit + def onNext(value: T): Unit = {} /** * Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition. * * If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`. */ - def onError(error: Throwable): Unit + def onError(error: Throwable): Unit= {} /** * Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications. * * The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`. */ - def onCompleted(): Unit + def onCompleted(): Unit = {} } object Observer { + /** - * Assume that the underlying rx.Observer does not need to be wrapped. + * Scala calls XXX; Java receives XXX. */ private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = { new Observer[T] { + override val asJavaObserver = observer - override def asJavaObserver = observer - - def onNext(value: T): Unit = asJavaObserver.onNext(value) - def onError(error: Throwable): Unit = asJavaObserver.onError(error) - def onCompleted(): Unit = asJavaObserver.onCompleted() - + override def onNext(value: T): Unit = asJavaObserver.onNext(value) + override def onError(error: Throwable): Unit = asJavaObserver.onError(error) + override def onCompleted(): Unit = asJavaObserver.onCompleted() } + } + + def apply[T]( ): Observer[T] = apply[T]((v:T)=>(), (e: Throwable)=>(), ()=>()) + def apply[T](onNext: T=>Unit ): Observer[T] = apply[T](onNext, (e: Throwable)=>(), ()=>()) + def apply[T](onNext: T=>Unit, onError: Throwable=>Unit ): Observer[T] = apply[T](onNext, onError, ()=>()) + def apply[T](onNext: T=>Unit, onCompleted: ()=>Unit): Observer[T] = apply[T](onNext, (e: Throwable)=>(), onCompleted) + def apply[T](onNext: T=>Unit, onError: Throwable=>Unit, onCompleted: ()=>Unit): Observer[T] = { + val n = onNext; val e = onError; val c = onCompleted + // Java calls XXX; Scala receives XXX. + Observer(new rx.Observer[T] { + override def onNext(value: T): Unit = n(value) + override def onError(error: Throwable): Unit = e(error) + override def onCompleted(): Unit = c() + }) + } } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala index 4f1f89e808..8b4edde4fa 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala @@ -18,6 +18,7 @@ package rx.lang.scala import java.util.Date import scala.concurrent.duration.Duration import rx.util.functions.{Action0, Action1, Func2} +import rx.lang.scala.schedulers._ /** * Represents an object that schedules units of work. @@ -25,7 +26,7 @@ import rx.util.functions.{Action0, Action1, Func2} trait Scheduler { import rx.lang.scala.ImplicitFunctionConversions._ - val asJavaScheduler: rx.Scheduler + private [scala] val asJavaScheduler: rx.Scheduler /** * Schedules a cancelable action to be executed. @@ -44,7 +45,7 @@ trait Scheduler { * @param action Action to schedule. * @return a subscription to be able to unsubscribe from action. */ - private def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = { + private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = { Subscription(asJavaScheduler.schedule(state, new Func2[rx.Scheduler, T, rx.Subscription] { def call(t1: rx.Scheduler, t2: T): rx.Subscription = { action(Scheduler(t1), t2).asJavaSubscription @@ -74,9 +75,8 @@ trait Scheduler { * Time the action is to be delayed before executing. * @return a subscription to be able to unsubscribe from action. */ - private def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = { - val xxx = schedulerActionToFunc2(action) - Subscription(asJavaScheduler.schedule(state, xxx, delayTime.length, delayTime.unit)) + private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = { + Subscription(asJavaScheduler.schedule(state, schedulerActionToFunc2(action), delayTime.length, delayTime.unit)) } /** @@ -108,7 +108,7 @@ trait Scheduler { * The time interval to wait each time in between executing the action. * @return A subscription to be able to unsubscribe from action. */ - private def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = { + private [scala] def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = { Subscription(asJavaScheduler.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)) } @@ -134,7 +134,7 @@ trait Scheduler { * Time the action is to be executed. If in the past it will be executed immediately. * @return a subscription to be able to unsubscribe from action. */ - private def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = { + private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = { Subscription(asJavaScheduler.schedule(state, action, dueTime)) } @@ -180,17 +180,6 @@ trait Scheduler { work{ t1.call() } } })) - //action1[action0] - -// val subscription = new rx.subscriptions.MultipleAssignmentSubscription() -// -// subscription.setSubscription( -// this.schedule(scheduler => { -// def loop(): Unit = subscription.setSubscription(scheduler.schedule{ work{ loop() }}) -// loop() -// subscription -// })) -// subscription } /** @@ -213,12 +202,16 @@ trait Scheduler { } -object Scheduler { - private [scala] def apply(scheduler: rx.Scheduler): Scheduler = { - new Scheduler() { - val asJavaScheduler = scheduler - } +private [scala] object Scheduler { + def apply(scheduler: rx.Scheduler): Scheduler = scheduler match { + case s: rx.concurrency.CurrentThreadScheduler => new CurrentThreadScheduler(s) + case s: rx.concurrency.ExecutorScheduler => new ExecutorScheduler(s) + case s: rx.concurrency.ImmediateScheduler => new ImmediateScheduler(s) + case s: rx.concurrency.NewThreadScheduler => new NewThreadScheduler(s) + case s: rx.concurrency.TestScheduler => new TestScheduler(s) + case s: rx.Scheduler => new Scheduler{ val asJavaScheduler = s } } + } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala index d60b2f3c3d..5f290404f8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala @@ -20,21 +20,22 @@ import rx.joins.ObserverBase /** * A Subject is an Observable and an Observer at the same time. */ -trait Subject[-T, +R] extends Observable[R] with Observer[T] { - val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R] +trait Subject[T] extends Observable[T] with Observer[T] { + private [scala] val asJavaSubject: rx.subjects.Subject[_ >: T, _<: T] - def asJavaObservable: rx.Observable[_ <: R] = asJavaSubject + val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject - // temporary hack to workaround bugs in rx Subjects - override def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] { - protected def onNextCore(value: T) = asJavaSubject.onNext(value) - protected def onErrorCore(error: Throwable) = asJavaSubject.onError(error) - protected def onCompletedCore() = asJavaSubject.onCompleted() - } - - def onNext(value: T): Unit = asJavaObserver.onNext(value) - def onError(error: Throwable): Unit = asJavaObserver.onError(error) - def onCompleted(): Unit = asJavaObserver.onCompleted() + override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubject + override def onNext(value: T): Unit = { asJavaObserver.onNext(value)} + override def onError(error: Throwable): Unit = { asJavaObserver.onError(error) } + override def onCompleted() { asJavaObserver.onCompleted() } +} +object Subject { + def apply[T](): Subject[T] = new rx.lang.scala.subjects.PublishSubject[T](rx.subjects.PublishSubject.create()) } + + + + diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala index 93e76e4524..a5703c682e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscription.scala @@ -17,59 +17,61 @@ package rx.lang.scala +import java.util.concurrent.atomic.AtomicBoolean + /** * Subscriptions are returned from all `Observable.subscribe` methods to allow unsubscribing. * * This interface is the equivalent of `IDisposable` in the .NET Rx implementation. */ trait Subscription { - val asJavaSubscription: rx.Subscription + + private [scala] val unsubscribed = new AtomicBoolean(false) + private [scala] val asJavaSubscription: rx.Subscription = new rx.Subscription { + override def unsubscribe() { unsubscribed.compareAndSet(false, true) } + } + /** * Call this method to stop receiving notifications on the Observer that was registered when * this Subscription was received. */ - def unsubscribe(): Unit = asJavaSubscription.unsubscribe() + def unsubscribe() = asJavaSubscription.unsubscribe() /** * Checks if the subscription is unsubscribed. */ - def isUnsubscribed: Boolean + def isUnsubscribed = unsubscribed.get() + } object Subscription { - import java.util.concurrent.atomic.AtomicBoolean - import rx.lang.scala.subscriptions._ - - /** - * Creates an [[rx.lang.scala.Subscription]] from an [[rx.Subscription]]. + * Creates an [[rx.lang.scala.Subscription]] from an [[rx.Subscription]]. ß */ private [scala] def apply(subscription: rx.Subscription): Subscription = { subscription match { - case x: rx.subscriptions.BooleanSubscription => new BooleanSubscription(x) - case x: rx.subscriptions.CompositeSubscription => new CompositeSubscription(x) - case x: rx.subscriptions.MultipleAssignmentSubscription => new MultipleAssignmentSubscription(x) - case x: rx.subscriptions.SerialSubscription => new SerialSubscription(x) - case x: rx.Subscription => apply { x.unsubscribe() } + case x: rx.subscriptions.BooleanSubscription => new rx.lang.scala.subscriptions.BooleanSubscription(x) + case x: rx.subscriptions.CompositeSubscription => new rx.lang.scala.subscriptions.CompositeSubscription(x) + case x: rx.subscriptions.MultipleAssignmentSubscription => new rx.lang.scala.subscriptions.MultipleAssignmentSubscription(x) + case x: rx.subscriptions.SerialSubscription => new rx.lang.scala.subscriptions.SerialSubscription(x) + case x: rx.Subscription => apply{ x.unsubscribe } } } /** * Creates an [[rx.lang.scala.Subscription]] that invokes the specified action when unsubscribed. */ - def apply(u: => Unit): Subscription = { - new Subscription() { - - private val unsubscribed = new AtomicBoolean(false) - - def isUnsubscribed = unsubscribed.get() - - val asJavaSubscription = new rx.Subscription { - def unsubscribe() { if(!unsubscribed.get()) { u ; unsubscribed.set(true) }} - } + def apply(u: => Unit): Subscription = new Subscription() { + override val asJavaSubscription = new rx.Subscription { + override def unsubscribe() { if(unsubscribed.compareAndSet(false, true)) { u } } } } + /** + * Creates an empty [[rx.lang.scala.Subscription]]. + */ + def apply(): Subscription = Subscription {} + } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala index 83352c8426..aa6cd4339e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala @@ -20,4 +20,16 @@ package rx.lang * * It basically mirrors the structure of package `rx`, but some changes were made to make it more Scala-idiomatic. */ -package object scala {} +package object scala { + + /** + * Placeholder for extension methods into Observable[T] from other types + */ + implicit class ObservableExtensions[T](val source: Iterable[T]) extends AnyVal { + def toObservable: Observable[T] = { Observable.from(source) } + def toObservable(scheduler: Scheduler): Observable[T] = { Observable.from(source, scheduler) } + } + + + +} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/CurrentThreadScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala similarity index 96% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/CurrentThreadScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala index 500d9c1d33..f3a7898a30 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/CurrentThreadScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/CurrentThreadScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import rx.lang.scala.Scheduler diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ExecutorScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala similarity index 97% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ExecutorScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala index 03c7e79d44..d718c58b55 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ExecutorScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import java.util.concurrent.Executor import rx.lang.scala.Scheduler diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ImmediateScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala similarity index 96% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ImmediateScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala index 91843a5746..7a066a3931 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ImmediateScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ImmediateScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import rx.lang.scala.Scheduler diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/NewThreadScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala similarity index 96% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/NewThreadScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala index dc69578082..674205ba07 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/NewThreadScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import rx.lang.scala.Scheduler diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ScheduledExecutorServiceScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala similarity index 97% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ScheduledExecutorServiceScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala index d6e8b11d98..20c4dc3544 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ScheduledExecutorServiceScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ScheduledExecutorServiceScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import java.util.concurrent.ScheduledExecutorService import rx.lang.scala.Scheduler diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala similarity index 98% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala index 98a0d241b8..f8df7610b8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/TestScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TestScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import scala.concurrent.duration.Duration import rx.lang.scala.Scheduler diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForComputationScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala similarity index 93% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForComputationScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala index 8b51083b89..b9e0791bef 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForComputationScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForComputationScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import rx.lang.scala.Scheduler @@ -26,7 +26,7 @@ object ThreadPoolForComputationScheduler { * * This can be used for event-loops, processing callbacks and other computational work. * - * Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.ThreadPoolForIOScheduler]] instead. + * Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.schedulers.ThreadPoolForIOScheduler]] instead. */ def apply(): ThreadPoolForComputationScheduler = { new ThreadPoolForComputationScheduler(rx.concurrency.Schedulers.threadPoolForComputation()) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForIOScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala similarity index 92% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForIOScheduler.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala index 8869cf96b3..63b7e6b659 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/ThreadPoolForIOScheduler.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ThreadPoolForIOScheduler.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.concurrency +package rx.lang.scala.schedulers import rx.lang.scala.Scheduler @@ -26,7 +26,7 @@ object ThreadPoolForIOScheduler { * * This can be used for asynchronously performing blocking IO. * - * Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.ThreadPoolForComputationScheduler]] instead. + * Do not perform computational work on this scheduler. Use [[rx.lang.scala.schedulers.ThreadPoolForComputationScheduler]] instead. */ def apply(): ThreadPoolForIOScheduler = { new ThreadPoolForIOScheduler(rx.concurrency.Schedulers.threadPoolForIO()) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/AsyncSubject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/AsyncSubject.scala index 80892b9ac1..44cb6adb79 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/AsyncSubject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/AsyncSubject.scala @@ -23,4 +23,4 @@ object AsyncSubject { } } -class AsyncSubject[T] private[scala] (val asJavaSubject: rx.subjects.AsyncSubject[T]) extends Subject[T,T] {} \ No newline at end of file +class AsyncSubject[T] private[scala] (val asJavaSubject: rx.subjects.AsyncSubject[T]) extends Subject[T] {} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala index 9ee8ba9db4..64e0ac4671 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/BehaviorSubject.scala @@ -23,7 +23,7 @@ object BehaviorSubject { } } -class BehaviorSubject[T] private[scala] (val asJavaSubject: rx.subjects.BehaviorSubject[T]) extends Subject[T,T] {} +class BehaviorSubject[T] private[scala] (val asJavaSubject: rx.subjects.BehaviorSubject[T]) extends Subject[T] {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala index 7c06101460..129717d8cc 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/PublishSubject.scala @@ -17,11 +17,8 @@ package rx.lang.scala.subjects import rx.lang.scala.Subject -object PublishSubject { - def apply[T](): PublishSubject[T] = { - new PublishSubject[T](rx.subjects.PublishSubject.create()) - } +private [scala] object PublishSubject { + def apply[T](): PublishSubject[T] = new PublishSubject[T](rx.subjects.PublishSubject.create[T]()) } -class PublishSubject[T] private[scala] (val asJavaSubject: rx.subjects.PublishSubject[T]) extends Subject[T,T] { - } +private [scala] class PublishSubject[T] private [scala] (val asJavaSubject: rx.subjects.PublishSubject[T]) extends Subject[T] {} diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/ReplaySubject.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/ReplaySubject.scala index f88fb65280..3f64bf1e7d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/ReplaySubject.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/ReplaySubject.scala @@ -23,7 +23,7 @@ object ReplaySubject { } } -class ReplaySubject[T] private[scala] (val asJavaSubject: rx.subjects.ReplaySubject[T]) extends Subject[T,T] { +class ReplaySubject[T] private[scala] (val asJavaSubject: rx.subjects.ReplaySubject[T]) extends Subject[T] { } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala index a9f2070345..d7e0431d27 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/BooleanSubscription.scala @@ -17,39 +17,22 @@ package rx.lang.scala.subscriptions import rx.lang.scala._ -object BooleanSubscription { - - /** - * Creates a [[rx.lang.scala.subscriptions.BooleanSubscription]]. - */ - def apply(): BooleanSubscription = { - new BooleanSubscription(new rx.subscriptions.BooleanSubscription()) - } - - /** - * Creates a [[rx.lang.scala.subscriptions.BooleanSubscription]] that invokes the specified action when unsubscribed. - */ - def apply(u: => Unit): BooleanSubscription = { - new BooleanSubscription(new rx.subscriptions.BooleanSubscription { - override def unsubscribe(): Unit = { - if(!super.isUnsubscribed) { - u - super.unsubscribe() - } - } - }) - } +private [scala] object BooleanSubscription { + def apply(): BooleanSubscription = new BooleanSubscription(new rx.subscriptions.BooleanSubscription()) } /** * Represents a [[rx.lang.scala.Subscription]] that can be checked for status. */ -class BooleanSubscription private[scala] (val asJavaSubscription: rx.subscriptions.BooleanSubscription) +private [scala] class BooleanSubscription private[scala] (boolean: rx.subscriptions.BooleanSubscription) extends Subscription { - /** - * Checks whether the subscription has been unsubscribed. - */ - def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed - + override val asJavaSubscription: rx.subscriptions.BooleanSubscription = new rx.subscriptions.BooleanSubscription() { + override def unsubscribe(): Unit = { + if(unsubscribed.compareAndSet(false, true)) { + if(!boolean.isUnsubscribed) { boolean.unsubscribe() } + } + } + override def isUnsubscribed(): Boolean = unsubscribed.get() || boolean.isUnsubscribed + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala index 1fe4a4afa5..abe2e721c4 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala @@ -36,7 +36,7 @@ object CompositeSubscription { /** * Creates a [[rx.lang.scala.subscriptions.CompositeSubscription]]. */ - def apply(subscription: rx.subscriptions.CompositeSubscription): CompositeSubscription = { + private [scala] def apply(subscription: rx.subscriptions.CompositeSubscription): CompositeSubscription = { new CompositeSubscription(subscription) } } @@ -44,9 +44,10 @@ object CompositeSubscription { /** * Represents a group of [[rx.lang.scala.Subscription]] that are disposed together. */ -class CompositeSubscription private[scala] (val asJavaSubscription: rx.subscriptions.CompositeSubscription) - extends Subscription +class CompositeSubscription private[scala] (override val asJavaSubscription: rx.subscriptions.CompositeSubscription) extends Subscription { + //override def asJavaSubscription = subscription + /** * Adds a subscription to the group, * or unsubscribes immediately is the [[rx.subscriptions.CompositeSubscription]] is unsubscribed. @@ -68,9 +69,7 @@ class CompositeSubscription private[scala] (val asJavaSubscription: rx.subscript this } - /** - * Checks whether the subscription has been unsubscribed. - */ - def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed + override def unsubscribe(): Unit = asJavaSubscription.unsubscribe() + override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala index 84740870c7..8fa87c3ff5 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/MultiAssignmentSubscription.scala @@ -41,9 +41,9 @@ object MultipleAssignmentSubscription { /** * Represents a [[rx.lang.scala.Subscription]] whose underlying subscription can be swapped for another subscription. */ -class MultipleAssignmentSubscription private[scala] (val asJavaSubscription: rx.subscriptions.MultipleAssignmentSubscription) - extends Subscription { +class MultipleAssignmentSubscription private[scala] (override val asJavaSubscription: rx.subscriptions.MultipleAssignmentSubscription) extends Subscription { + //override def asJavaSubscription = s /** * Gets the underlying subscription. */ @@ -59,10 +59,8 @@ class MultipleAssignmentSubscription private[scala] (val asJavaSubscription: rx. this } - /** - * Checks whether the subscription has been unsubscribed. - */ - def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed + override def unsubscribe(): Unit = asJavaSubscription.unsubscribe() + override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala index 94b50f93e8..dfb51cedac 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala @@ -15,48 +15,41 @@ */ package rx.lang.scala.subscriptions -import rx.lang.scala.Subscription -import java.util.concurrent.atomic.AtomicBoolean - +import rx.lang.scala._ object SerialSubscription { /** * Creates a [[rx.lang.scala.subscriptions.SerialSubscription]]. */ - def apply(): SerialSubscription = { - new SerialSubscription(new rx.subscriptions.SerialSubscription()) - } + def apply(): SerialSubscription = new SerialSubscription(new rx.subscriptions.SerialSubscription()) /** * Creates a [[rx.lang.scala.subscriptions.SerialSubscription]] that invokes the specified action when unsubscribed. */ def apply(unsubscribe: => Unit): SerialSubscription = { - val s= SerialSubscription() - s.subscription = Subscription{ unsubscribe } - s + SerialSubscription().subscription = Subscription(unsubscribe) } } /** * Represents a [[rx.lang.scala.Subscription]] that can be checked for status. */ -class SerialSubscription private[scala] (val asJavaSubscription: rx.subscriptions.SerialSubscription) - extends Subscription { - - private val unsubscribed = new AtomicBoolean(false) - - /** - * Checks whether the subscription has been unsubscribed. - */ - def isUnsubscribed: Boolean = unsubscribed.get() +class SerialSubscription private[scala] (serial: rx.subscriptions.SerialSubscription) extends Subscription { - /** - * Unsubscribes this subscription, setting isUnsubscribed to true. + /* + * As long as rx.subscriptions.SerialSubscription has no isUnsubscribed, + * we need to intercept and do it ourselves. */ - override def unsubscribe(): Unit = { super.unsubscribe(); unsubscribed.set(true) } + override val asJavaSubscription: rx.subscriptions.SerialSubscription = new rx.subscriptions.SerialSubscription() { + override def unsubscribe(): Unit = { + if(unsubscribed.compareAndSet(false, true)) { serial.unsubscribe() } + } + override def setSubscription(subscription: rx.Subscription): Unit = serial.setSubscription(subscription) + override def getSubscription(): rx.Subscription = serial.getSubscription() + } - def subscription_=(value: Subscription): Unit = asJavaSubscription.setSubscription(value.asJavaSubscription) + def subscription_=(value: Subscription): this.type = { asJavaSubscription.setSubscription(value.asJavaSubscription); this } def subscription: Subscription = Subscription(asJavaSubscription.getSubscription) } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 7a161c0f8f..41bbdca347 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -15,37 +15,37 @@ import org.scalatest.junit.JUnitSuite /** * These tests can be used to check if all methods of the Java Observable have a corresponding * method in the Scala Observable. - * + * * These tests don't contain any assertions, so they will always succeed, but they print their * results to stdout. */ class CompletenessTest extends JUnitSuite { - + // some frequently used comments: val unnecessary = "[considered unnecessary in Scala land]" val deprecated = "[deprecated in RxJava]" val averageProblem = "[We can't have a general average method because Scala's `Numeric` does not have " + - "scalar multiplication (we would need to calculate `(1.0/numberOfElements)*sum`). " + + "scalar multiplication (we would need to calculate `(1.0/numberOfElements)*sum`). " + "You can use `fold` instead to accumulate `sum` and `numberOfElements` and divide at the end.]" val commentForFirstWithPredicate = "[use `.filter(condition).first`]" val fromFuture = "[TODO: Decide how Scala Futures should relate to Observables. Should there be a " + "common base interface for Future and Observable? And should Futures also have an unsubscribe method?]" - + /** * Maps each method from the Java Observable to its corresponding method in the Scala Observable */ val correspondence = defaultMethodCorrespondence ++ correspondenceChanges // ++ overrides LHS with RHS - + /** * Creates default method correspondence mappings, assuming that Scala methods have the same * name and the same argument types as in Java */ def defaultMethodCorrespondence: Map[String, String] = { - val allMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.Observable[_]]) + val allMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.Observable[_]]) val tuples = for (javaM <- allMethods) yield (javaM, javaMethodSignatureToScala(javaM)) tuples.toMap } - + /** * Manually added mappings from Java Observable methods to Scala Observable methods */ @@ -61,7 +61,7 @@ class CompletenessTest extends JUnitSuite { "elementAt(Int)" -> "[use `.drop(index).first`]", "elementAtOrDefault(Int, T)" -> "[use `.drop(index).firstOrElse(default)`]", "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, - "firstOrDefault(T)" -> "firstOrElse(=> U)", + "firstOrDefault(T)" -> "firstOrElse(=> U)", "firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])", @@ -90,7 +90,7 @@ class CompletenessTest extends JUnitSuite { "where(Func1[_ >: T, Boolean])" -> "filter(T => Boolean)", "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", - + // manually added entries for Java static methods "average(Observable[Integer])" -> averageProblem, "averageDoubles(Observable[Double])" -> averageProblem, @@ -103,7 +103,7 @@ class CompletenessTest extends JUnitSuite { "empty()" -> "apply(T*)", "error(Throwable)" -> "apply(Throwable)", "from(Array[T])" -> "apply(T*)", - "from(Iterable[_ <: T])" -> "apply(T*)", + "from(Iterable[_ <: T])" -> "apply(T*)", "from(Future[_ <: T])" -> fromFuture, "from(Future[_ <: T], Long, TimeUnit)" -> fromFuture, "from(Future[_ <: T], Scheduler)" -> fromFuture, @@ -151,9 +151,9 @@ class CompletenessTest extends JUnitSuite { val funcParams = (1 to i).map(j => s"_ >: T$j, ").mkString("") ("combineLatest(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", "[If C# doesn't need it, Scala doesn't need it either ;-)]") }).toMap - + def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2") - + def methodMembersToMethodStrings(members: Iterable[Symbol]): Iterable[String] = { for (member <- members; alt <- member.asTerm.alternatives) yield { val m = alt.asMethod @@ -167,18 +167,18 @@ class CompletenessTest extends JUnitSuite { name + paramListStrs.mkString("") } } - + def getPublicInstanceMethods(tp: Type): Iterable[String] = { // declarations: => only those declared in Observable // members => also those of superclasses methodMembersToMethodStrings(tp.declarations.filter(m => m.isMethod && m.isPublic)) - // TODO how can we filter out instance methods which were put into companion because + // TODO how can we filter out instance methods which were put into companion because // of extends AnyVal in a way which does not depend on implementation-chosen name '$extension'? .filter(! _.contains("$extension")) } - + // also applicable for Java types - def getPublicInstanceAndCompanionMethods(tp: Type): Iterable[String] = + def getPublicInstanceAndCompanionMethods(tp: Type): Iterable[String] = getPublicInstanceMethods(tp) ++ getPublicInstanceMethods(tp.typeSymbol.companionSymbol.typeSignature) @@ -187,31 +187,31 @@ class CompletenessTest extends JUnitSuite { println(title.map(_ => '-') + "\n") getPublicInstanceMethods(tp).toList.sorted.foreach(println(_)) } - + @Ignore // because spams output @Test def printJavaInstanceMethods(): Unit = { - printMethodSet("Instance methods of rx.Observable", + printMethodSet("Instance methods of rx.Observable", typeOf[rx.Observable[_]]) } - + @Ignore // because spams output @Test def printScalaInstanceMethods(): Unit = { - printMethodSet("Instance methods of rx.lang.scala.Observable", + printMethodSet("Instance methods of rx.lang.scala.Observable", typeOf[rx.lang.scala.Observable[_]]) } - + @Ignore // because spams output @Test def printJavaStaticMethods(): Unit = { - printMethodSet("Static methods of rx.Observable", + printMethodSet("Static methods of rx.Observable", typeOf[rx.Observable[_]].typeSymbol.companionSymbol.typeSignature) } - + @Ignore // because spams output @Test def printScalaCompanionMethods(): Unit = { printMethodSet("Companion methods of rx.lang.scala.Observable", typeOf[rx.lang.scala.Observable.type]) } - + def javaMethodSignatureToScala(s: String): String = { s.replaceAllLiterally("Long, TimeUnit", "Duration") .replaceAll("Action0", "() => Unit") @@ -233,20 +233,20 @@ class CompletenessTest extends JUnitSuite { val c = SortedMap(defaultMethodCorrespondence.toSeq : _*) val len = c.keys.map(_.length).max + 2 for ((javaM, scalaM) <- c) { - println(s""" %-${len}s -> %s,""".format("\"" + javaM + "\"", "\"" + scalaM + "\"")) + println(s""" %-${len}s -> %s,""".format("\"" + javaM + "\"", "\"" + scalaM + "\"")) } } - + @Ignore // because spams output @Test def printCorrectedMethodCorrespondence(): Unit = { println("\nCorrected Method Correspondence") println( "-------------------------------\n") val c = SortedMap(correspondence.toSeq : _*) for ((javaM, scalaM) <- c) { - println("%s -> %s,".format("\"" + javaM + "\"", "\"" + scalaM + "\"")) + println("%s -> %s,".format("\"" + javaM + "\"", "\"" + scalaM + "\"")) } } - + def checkMethodPresence(expectedMethods: Iterable[String], tp: Type): Unit = { val actualMethods = getPublicInstanceAndCompanionMethods(tp).toSet val expMethodsSorted = expectedMethods.toList.sorted @@ -261,15 +261,15 @@ class CompletenessTest extends JUnitSuite { val status = if (bad == 0) "SUCCESS" else "BAD" println(s"$status: $bad out of ${bad+good} methods were not found in $tp") } - + @Test def checkScalaMethodPresenceVerbose(): Unit = { println("\nTesting that all mentioned Scala methods exist") println( "----------------------------------------------\n") - + val actualMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.lang.scala.Observable[_]]).toSet var good = 0 var bad = 0 - for ((javaM, scalaM) <- SortedMap(correspondence.toSeq :_*)) { + for ((javaM, scalaM) <- SortedMap(correspondence.toSeq :_*)) { if (actualMethods.contains(scalaM) || scalaM.charAt(0) == '[') { good += 1 } else { @@ -282,40 +282,40 @@ class CompletenessTest extends JUnitSuite { val status = if (bad == 0) "SUCCESS" else "BAD" println(s"\n$status: $bad out of ${bad+good} methods were not found in Scala Observable") } - + def setTodoForMissingMethods(corresp: Map[String, String]): Map[String, String] = { val actualMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.lang.scala.Observable[_]]).toSet for ((javaM, scalaM) <- corresp) yield (javaM, if (actualMethods.contains(scalaM) || scalaM.charAt(0) == '[') scalaM else "[**TODO: missing**]") } - + @Test def checkJavaMethodPresence(): Unit = { println("\nTesting that all mentioned Java methods exist") println( "---------------------------------------------\n") checkMethodPresence(correspondence.keys, typeOf[rx.Observable[_]]) } - + @Ignore // because we prefer the verbose version @Test def checkScalaMethodPresence(): Unit = { checkMethodPresence(correspondence.values, typeOf[rx.lang.scala.Observable[_]]) } - - def scalaToJavaSignature(s: String) = + + def scalaToJavaSignature(s: String) = s.replaceAllLiterally("_ <:", "? extends") .replaceAllLiterally("_ >:", "? super") .replaceAllLiterally("[", "<") .replaceAllLiterally("]", ">") .replaceAllLiterally("Array", "T[]") - + def escapeJava(s: String) = s.replaceAllLiterally("<", "<") .replaceAllLiterally(">", ">") - + @Ignore // because spams output @Test def printMarkdownCorrespondenceTable() { def isInteresting(p: (String, String)): Boolean = p._1.replaceAllLiterally("()", "") != p._2 - def groupingKey(p: (String, String)): (String, String) = + def groupingKey(p: (String, String)): (String, String) = (if (p._1.startsWith("average")) "average" else p._1.takeWhile(_ != '('), p._2) def formatJavaCol(name: String, alternatives: Iterable[String]): String = { alternatives.toList.sorted.map(scalaToJavaSignature(_)).map(s => { @@ -327,21 +327,21 @@ class CompletenessTest extends JUnitSuite { } }).mkString("
") } - def formatScalaCol(s: String): String = + def formatScalaCol(s: String): String = if (s.startsWith("[") && s.endsWith("]")) s.drop(1).dropRight(1) else "`" + s + "`" def escape(s: String) = s.replaceAllLiterally("[", "<").replaceAllLiterally("]", ">") - + println(""" ## Comparison of Scala Observable and Java Observable - -Note: + +Note: * This table contains both static methods and instance methods. * If a signature is too long, move your mouse over it to get the full signature. - + | Java Method | Scala Method | |-------------|--------------|""") - + val ps = setTodoForMissingMethods(correspondence) (for (((javaName, scalaCol), pairs) <- ps.groupBy(groupingKey(_)).toList.sortBy(_._1._1)) yield { @@ -350,5 +350,5 @@ Note: println(s"\nThis table was generated on ${Calendar.getInstance().getTime}.") println(s"**Do not edit**. Instead, edit `${getClass.getCanonicalName}`.") } - + } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala new file mode 100644 index 0000000000..e2822405b2 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ConstructorTest.scala @@ -0,0 +1,20 @@ +package rx.lang.scala +import scala.language.postfixOps +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +class ConstructorTest extends JUnitSuite { + + @Test def toObservable() { + val xs = List(1,2,3).toObservable.toBlockingObservable.toList + assertEquals(List(1,2,3), xs) + + val ys = Observable.from(List(1,2,3)).toBlockingObservable.toList + assertEquals(List(1,2,3), xs) + + val zs = Observable.items(1,2,3).toBlockingObservable.toList + assertEquals(List(1,2,3), xs) + + } +} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala new file mode 100644 index 0000000000..759ca8b7ec --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/NotificationTests.scala @@ -0,0 +1,43 @@ +package rx.lang.scala + + +import org.junit.{Assert, Test} +import org.junit.Assert._ +import org.scalatest.junit.JUnitSuite +import scala.concurrent.duration._ +import scala.language.postfixOps +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import rx.lang.scala.Notification.{OnCompleted, OnError, OnNext} + + +class NotificationTests extends JUnitSuite { + @Test + def creation() { + + val onNext = OnNext(42) + assertEquals(42, onNext match { case OnNext(value) => value }) + + val oops = new Exception("Oops") + val onError = OnError(oops) + assertEquals(oops, onError match { case OnError(error) => error }) + + val onCompleted = OnCompleted() + assertEquals((), onCompleted match { case OnCompleted() => () }) + } + + @Test + def accept() { + + val onNext = OnNext(42) + assertEquals(42, onNext(x=>42, e=>4711,()=>13)) + + val oops = new Exception("Oops") + val onError = OnError(oops) + assertEquals(4711, onError(x=>42, e=>4711,()=>13)) + + val onCompleted = OnCompleted() + assertEquals(13, onCompleted(x=>42, e=>4711,()=>13)) + + } +} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index 48c4423373..0b755833da 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -6,6 +6,12 @@ import scala.concurrent.ExecutionContext.Implicits.global import org.junit.Assert._ import org.junit.{ Ignore, Test } import org.scalatest.junit.JUnitSuite +import scala.concurrent.duration._ +import scala.language.postfixOps +import rx.lang.scala.schedulers.TestScheduler +import rx.lang.scala.subjects.BehaviorSubject +import org.mockito.Mockito._ +import org.mockito.Matchers._ class ObservableTests extends JUnitSuite { @@ -15,7 +21,7 @@ class ObservableTests extends JUnitSuite { def testCovariance = { //println("hey, you shouldn't run this test") - val o1: Observable[Nothing] = Observable() + val o1: Observable[Nothing] = Observable.empty val o2: Observable[Int] = o1 val o3: Observable[App] = o1 val o4: Observable[Any] = o2 @@ -26,28 +32,28 @@ class ObservableTests extends JUnitSuite { @Test def testDematerialize() { - val o = Observable(1, 2, 3) + val o = List(1, 2, 3).toObservable val mat = o.materialize val demat = mat.dematerialize - // correctly rejected: + //correctly rejected: //val wrongDemat = Observable("hello").dematerialize assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3)) - } +} // Test that Java's firstOrDefault propagates errors. // If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse // should be changed accordingly. @Test def testJavaFirstOrDefault() { - assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable.single) - assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable.single) + assertEquals(1, rx.Observable.from(1, 2).firstOrDefault(10).toBlockingObservable().single) + assertEquals(10, rx.Observable.empty().firstOrDefault(10).toBlockingObservable().single) val msg = "msg6251" var receivedMsg = "none" try { - rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable.single + rx.Observable.error(new Exception(msg)).firstOrDefault(10).toBlockingObservable().single } catch { - case e: Exception => receivedMsg = e.getCause.getMessage + case e: Exception => receivedMsg = e.getCause().getMessage() } assertEquals(receivedMsg, msg) } @@ -55,17 +61,17 @@ class ObservableTests extends JUnitSuite { @Test def testFirstOrElse() { def mustNotBeCalled: String = sys.error("this method should not be called") def mustBeCalled: String = "this is the default value" - assertEquals("hello", Observable("hello").firstOrElse(mustNotBeCalled).toBlockingObservable.single) - assertEquals("this is the default value", Observable().firstOrElse(mustBeCalled).toBlockingObservable.single) + assertEquals("hello", Observable.items("hello").firstOrElse(mustNotBeCalled).toBlockingObservable.single) + assertEquals("this is the default value", Observable.empty.firstOrElse(mustBeCalled).toBlockingObservable.single) } - @Test def testFirstOrElseWithError() { + @Test def testTestWithError() { val msg = "msg6251" var receivedMsg = "none" try { Observable.error[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single } catch { - case e: Exception => receivedMsg = e.getCause.getMessage + case e: Exception => receivedMsg = e.getCause().getMessage() } assertEquals(receivedMsg, msg) } @@ -106,10 +112,4 @@ class ObservableTests extends JUnitSuite { } */ - @Test def testTest() = { - val a: Observable[Int] = Observable() - assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last) - //println("This UnitTestSuite.testTest() for rx.lang.scala.Observable") - } - } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala new file mode 100644 index 0000000000..50773d5582 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubjectTests.scala @@ -0,0 +1,308 @@ +package rx.lang.scala + +import org.junit.{Assert, Test} +import org.scalatest.junit.JUnitSuite +import scala.concurrent.duration._ +import scala.language.postfixOps +import rx.lang.scala.schedulers.TestScheduler +import rx.lang.scala.subjects.{AsyncSubject, ReplaySubject, BehaviorSubject} +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Assert.assertFalse +import org.junit.Ignore +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +class SubjectTest extends JUnitSuite { + + @Test def SubjectIsAChannel() { + + var lastA: Integer = null + var errorA: Throwable = null + var completedA: Boolean = false + val observerA = Observer[Integer]( + (next: Integer) => { lastA = next }, + (error: Throwable) => { errorA = error }, + () => { completedA = true } + ) + + var lastB: Integer = null + var errorB: Throwable = null + var completedB: Boolean = false + val observerB = Observer[Integer]( + (next: Integer) => { lastB = next }, + (error: Throwable) => { errorB = error }, + () => { completedB = true } + ) + + var lastC: Integer = null + var errorC: Throwable = null + var completedC: Boolean = false + val observerC = Observer[Integer]( + (next: Integer) => { lastC = next }, + (error: Throwable) => { errorC = error }, + () => { completedC = true } + ) + + val channel: Subject[Integer] = Subject[Integer]() + + val a = channel(observerA) + val b = channel(observerB) + + assertEquals(null, lastA) + assertEquals(null, lastB) + + channel.onNext(42) + + assertEquals(42, lastA) + assertEquals(42, lastB) + + a.unsubscribe() + channel.onNext(4711) + + assertEquals(42, lastA) + assertEquals(4711, lastB) + + channel.onCompleted() + + assertFalse(completedA) + assertTrue(completedB) + assertEquals(42, lastA) + assertEquals(4711, lastB) + + val c = channel.subscribe(observerC) + channel.onNext(13) + + assertEquals(null, lastC) + assertTrue(completedC) + + assertFalse(completedA) + assertTrue(completedB) + assertEquals(42, lastA) + assertEquals(4711, lastB) + + channel.onError(new Exception("!")) + + assertEquals(null, lastC) + assertTrue(completedC) + + assertFalse(completedA) + assertTrue(completedB) + assertEquals(42, lastA) + assertEquals(4711, lastB) + } + + @Test def ReplaySubjectIsAChannel() { + + val channel = ReplaySubject[Integer] + + var lastA: Integer = null + var errorA, completedA: Boolean = false + val a = channel.subscribe(x => { lastA = x}, e => { errorA = true} , () => { completedA = true }) + + var lastB: Integer = null + var errorB, completedB: Boolean = false + + val b = channel(new Observer[Integer] { + override def onNext(value: Integer): Unit = { lastB = value } + override def onError(error: Throwable): Unit = { errorB = true } + override def onCompleted(): Unit = { completedB = true } + }) + + channel.onNext(42) + + assertEquals(42, lastA) + assertEquals(42, lastB) + + a.unsubscribe() + + channel.onNext(4711) + + assertEquals(42, lastA) + assertEquals(4711, lastB) + + channel.onCompleted() + + assertEquals(42, lastA) + assertFalse(completedA) + assertFalse(errorA) + + assertEquals(4711, lastB) + assertTrue(completedB) + assertFalse(errorB) + + var lastC: Integer = null + var errorC, completedC: Boolean = false + val c = channel.subscribe(x => { lastC = x}, e => { errorC = true} , () => { completedC = true }) + + assertEquals(4711, lastC) + assertTrue(completedC) + assertFalse(errorC) + + channel.onNext(13) + + assertEquals(42, lastA) + assertFalse(completedA) + assertFalse(errorA) + + assertEquals(4711, lastB) + assertTrue(completedB) + assertFalse(errorB) + + assertEquals(4711, lastC) + assertTrue(completedC) + assertFalse(errorC) + + channel.onError(new Exception("Boom")) + + assertEquals(42, lastA) + assertFalse(completedA) + assertFalse(errorA) + + assertEquals(4711, lastB) + assertTrue(completedB) + assertFalse(errorB) + + assertEquals(4711, lastC) + assertTrue(completedC) + assertFalse(errorC) + } + + @Test def BehaviorSubjectIsACache() { + + val channel = BehaviorSubject(2013) + + var lastA: Integer = null + var errorA, completedA: Boolean = false + val a = channel.subscribe(x => { lastA = x}, e => { errorA = true} , () => { completedA = true }) + + var lastB: Integer = null + var errorB, completedB: Boolean = false + val b = channel.subscribe(x => { lastB = x}, e => { errorB = true} , () => { completedB = true }) + + assertEquals(2013, lastA) + assertEquals(2013, lastB) + + channel.onNext(42) + + assertEquals(42, lastA) + assertEquals(42, lastB) + + a.unsubscribe() + + channel.onNext(4711) + + assertEquals(42, lastA) + assertEquals(4711, lastB) + + channel.onCompleted() + + var lastC: Integer = null + var errorC, completedC: Boolean = false + val c = channel.subscribe(x => { lastC = x}, e => { errorC = true} , () => { completedC = true }) + + assertEquals(null, lastC) + assertTrue(completedC) + assertFalse(errorC) + + channel.onNext(13) + + assertEquals(42, lastA) + assertFalse(completedA) + assertFalse(errorA) + + assertEquals(4711, lastB) + assertTrue(completedB) + assertFalse(errorB) + + assertEquals(null, lastC) + assertTrue(completedC) + assertFalse(errorC) + + channel.onError(new Exception("Boom")) + + assertEquals(42, lastA) + assertFalse(completedA) + assertFalse(errorA) + + assertEquals(4711, lastB) + assertTrue(completedB) + assertFalse(errorB) + + assertEquals(null, lastC) + assertTrue(completedC) + assertFalse(errorC) + + } + + @Test def AsyncSubjectIsAFuture() { + + val channel = AsyncSubject[Int]() + + var lastA: Integer = null + var errorA, completedA: Boolean = false + val a = channel.subscribe(x => { lastA = x}, e => { errorA = true} , () => { completedA = true }) + + var lastB: Integer = null + var errorB, completedB: Boolean = false + val b = channel.subscribe(x => { lastB = x}, e => { errorB = true} , () => { completedB = true }) + + channel.onNext(42) + + Assert.assertEquals(null, lastA) + Assert.assertEquals(null, lastB) + + a.unsubscribe() + channel.onNext(4711) + channel.onCompleted() + + Assert.assertEquals(null, lastA) + Assert.assertFalse(completedA) + Assert.assertFalse(errorA) + + Assert.assertEquals(4711, lastB) + Assert.assertTrue(completedB) + Assert.assertFalse(errorB) + + + var lastC: Integer = null + var errorC, completedC: Boolean = false + val c = channel.subscribe(x => { lastC = x}, e => { errorC = true} , () => { completedC = true }) + + Assert.assertEquals(4711, lastC) + Assert.assertTrue(completedC) + Assert.assertFalse(errorC) + + channel.onNext(13) + + Assert.assertEquals(null, lastA) + Assert.assertFalse(completedA) + Assert.assertFalse(errorA) + + Assert.assertEquals(4711, lastB) + Assert.assertTrue(completedB) + Assert.assertFalse(errorB) + + Assert.assertEquals(4711, lastC) + Assert.assertTrue(completedC) + Assert.assertFalse(errorC) + + channel.onError(new Exception("Boom")) + + Assert.assertEquals(null, lastA) + Assert.assertFalse(completedA) + Assert.assertFalse(errorA) + + Assert.assertEquals(4711, lastB) + Assert.assertTrue(completedB) + Assert.assertFalse(errorB) + + Assert.assertEquals(4711, lastC) + Assert.assertTrue(completedC) + Assert.assertFalse(errorC) + + } + +} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala new file mode 100644 index 0000000000..71ea4d4a6e --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriptionTests.scala @@ -0,0 +1,171 @@ +package rx.lang.scala + + +import org.junit.{Assert, Test} +import org.junit.Assert +import org.scalatest.junit.JUnitSuite +import scala.concurrent.duration._ +import scala.language.postfixOps +import org.mockito.Mockito._ +import org.mockito.Matchers._ +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Assert.assertFalse +import rx.lang.scala.subscriptions.{SerialSubscription, MultipleAssignmentSubscription, CompositeSubscription} + + + +class SubscriptionTests extends JUnitSuite { + @Test + def subscriptionCreate() { + + val subscription = Subscription() + + assertFalse(subscription.isUnsubscribed) + + subscription.unsubscribe() + + assertTrue(subscription.isUnsubscribed) + } + + @Test + def subscriptionUnsubscribeIdempotent() { + + var called = false + + val subscription = Subscription{ called = !called } + + assertFalse(called) + assertFalse(subscription.isUnsubscribed) + + subscription.unsubscribe() + + assertTrue(called) + assertTrue(subscription.isUnsubscribed) + + subscription.unsubscribe() + + assertTrue(called) + assertTrue(subscription.isUnsubscribed) + } + + @Test + def compositeSubscriptionAdd() { + + val s0 = Subscription() + val s1 = Subscription() + + val composite = CompositeSubscription() + + assertFalse(composite.isUnsubscribed) + + composite += s0 + composite += s1 + + composite.unsubscribe() + + assertTrue(composite.isUnsubscribed) + assertTrue(s0.isUnsubscribed) + assertTrue(s1.isUnsubscribed) + + val s2 = Subscription{} + + assertFalse(s2.isUnsubscribed) + + composite += s2 + + assertTrue(s2.isUnsubscribed) + + } + + @Test + def compositeSubscriptionRemove() { + + val s0 = Subscription() + val composite = CompositeSubscription() + + composite += s0 + assertFalse(s0.isUnsubscribed) + + composite -= s0 + assertTrue(s0.isUnsubscribed) + + composite.unsubscribe() + + assertTrue(composite.isUnsubscribed) + assertTrue(s0.isUnsubscribed) + } + + @Test + def multiAssignmentSubscriptionAdd() { + + val s0 = Subscription() + val s1 = Subscription() + val multiple = MultipleAssignmentSubscription() + + assertFalse(multiple.isUnsubscribed) + assertFalse(s0.isUnsubscribed) + assertFalse(s1.isUnsubscribed) + + multiple.subscription = s0 + + assertFalse(s0.isUnsubscribed) + assertFalse(s1.isUnsubscribed) + + multiple.subscription = s1 + + assertFalse(s0.isUnsubscribed) // difference with SerialSubscription + assertFalse(s1.isUnsubscribed) + + multiple.unsubscribe() + + assertTrue(multiple.isUnsubscribed) + assertFalse(s0.isUnsubscribed) + assertTrue(s1.isUnsubscribed) + + val s2 = Subscription() + + assertFalse(s2.isUnsubscribed) + + multiple.subscription = s2 + + assertTrue(s2.isUnsubscribed) + assertFalse(s0.isUnsubscribed) + } + + @Test + def serialSubscriptionAdd() { + + val s0 = Subscription() + val s1 = Subscription() + val serial = SerialSubscription() + + assertFalse(serial.isUnsubscribed) + assertFalse(s0.isUnsubscribed) + assertFalse(s1.isUnsubscribed) + + serial.subscription = s0 + + assertFalse(s0.isUnsubscribed) + assertFalse(s1.isUnsubscribed) + + serial.subscription = s1 + + assertTrue(s0.isUnsubscribed) // difference with MultipleAssignmentSubscription + assertFalse(s1.isUnsubscribed) + + serial.unsubscribe() + + assertTrue(serial.isUnsubscribed) + assertTrue(s1.isUnsubscribed) + + val s2 = Subscription() + + assertFalse(s2.isUnsubscribed) + + serial.subscription = s2 + + assertTrue(s2.isUnsubscribed) + } + +} diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/subscriptions/SubscriptionTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/subscriptions/SubscriptionTests.scala deleted file mode 100644 index 4309967c0a..0000000000 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/subscriptions/SubscriptionTests.scala +++ /dev/null @@ -1,118 +0,0 @@ -package rx.lang.scala.subscriptions - -import org.junit.Assert._ -import org.junit.Test -import org.scalatest.junit.JUnitSuite - -import rx.lang.scala.Subscription - -class SubscriptionTests extends JUnitSuite { - - @Test - def anonymousSubscriptionCreate() { - val subscription = Subscription{} - assertNotNull(subscription) - } - - @Test - def anonymousSubscriptionDispose() { - var unsubscribed = false - val subscription = Subscription{ unsubscribed = true } - assertFalse(unsubscribed) - subscription.unsubscribe() - assertTrue(unsubscribed) - } - - @Test - def emptySubscription() { - val subscription = Subscription() - subscription.unsubscribe() - } - - @Test - def booleanSubscription() { - val subscription = BooleanSubscription() - assertFalse(subscription.isUnsubscribed) - subscription.unsubscribe() - assertTrue(subscription.isUnsubscribed) - subscription.unsubscribe() - assertTrue(subscription.isUnsubscribed) - } - - @Test - def compositeSubscriptionAdd() { - - var u0 = false - val s0 = BooleanSubscription{ u0 = true } - - var u1 = false - val s1 = Subscription{ u1 = true } - - val composite = CompositeSubscription() - - assertFalse(composite.isUnsubscribed) - - composite += s0 - composite += s1 - - composite.unsubscribe() - - assertTrue(composite.isUnsubscribed) - assertTrue(s0.isUnsubscribed) - assertTrue(u0) - assertTrue(u1) - - val s2 = BooleanSubscription() - assertFalse(s2.isUnsubscribed) - composite += s2 - assertTrue(s2.isUnsubscribed) - - } - - @Test - def compositeSubscriptionRemove() { - - val s0 = BooleanSubscription() - val composite = CompositeSubscription() - - composite += s0 - assertFalse(s0.isUnsubscribed) - composite -= s0 - assertTrue(s0.isUnsubscribed) - - composite.unsubscribe() - - assertTrue(composite.isUnsubscribed) - } - - @Test - def multiAssignmentSubscriptionAdd() { - - val s0 = BooleanSubscription() - val s1 = BooleanSubscription() - val multiple = MultipleAssignmentSubscription() - - assertFalse(multiple.isUnsubscribed) - - multiple.subscription = s0 - assertEquals(s0.asJavaSubscription, multiple.subscription.asJavaSubscription) - - multiple.subscription = s1 - assertEquals(s1.asJavaSubscription, multiple.subscription.asJavaSubscription) - - assertFalse(s0.isUnsubscribed) - assertFalse(s1.isUnsubscribed) - - multiple.unsubscribe() - - assertTrue(multiple.isUnsubscribed) - assertFalse(s0.isUnsubscribed) - assertTrue(s1.isUnsubscribed) - - val s2 = BooleanSubscription() - assertFalse(s2.isUnsubscribed) - multiple.subscription = s2 - assertTrue(s2.isUnsubscribed) - } - -}